Implementing a job queue in Nodejs

Rajkumar Gaur
SliceOfDev

--

Using Redis and bullmq

Photo by Zichao Zhang on Unsplash

What is a job queue and when will you need one?

A job queue holds a list of processes waiting to be executed after some condition is met or the workers/processors are ready to execute them.
This is pretty useful when you want to execute a time-consuming process at a given pace.

For example, your server receives 10 requests in a minute, but you can execute only 5 requests per minute because of computing power limitations. In this case, if you try to execute all 10 processes in a minute, your system might not take the load and possibly crash. In order to solve this, you can execute 5 of the requests in a minute and add the other 5 requests to a queue. These will be executed once the worker is idle or have extra computing power.

The Naive approach

The first approach that came to me was using an array to store the job payload and pop the jobs one by one from the array.
The code:

naive queue implementation

The above code gets the job done in an ideal world. But, this will fail in the real world.

What happens when you restart the server? You lose the track of currently running processes and also the current jobs in the queue.
What happens when a job fails during processing? You lose that job forever because it's already been popped from the queue.
What about multiple workers/instances? You cant actually share the queue with other instances because it lives in your server and is only accessible on your server. So no chance of a distributed system.

To take care of the above scenario we have to use persistent storage to solve the above problems. Let's use Redis.

What is Redis?

Redis is a key-value pair based database. We are using Redis, particularly for the reason that it provides features like data structures (list, set, map, etc) and TTL (time to live) out of the box.
We will be using the list data structure of Redis to simulate a queue. To learn more about Redis, head to https://redis.io/docs/
And for interacting with Redis we will be using the ioredis library for Nodejs https://github.com/luin/ioredis

Here, we will still be using the same pattern as before, only the code about creating and handling the queue will change. We will also be now having two queues todo and processing . The todo queue will contain jobs waiting to get executed and processing will contain jobs that are currently getting processed. We are doing this to have a fail-safe mechanism.

When a job is ready to get executed we move that job from todo to processing and only remove the job from processing if the execution was successful. If the execution fails, the job will be moved back to the todo list at the back of the queue. This way we never really lose the failed jobs and can execute them again.

Let's revisit the three questions again:
What happens when you restart the server? No data is lost as the job data is stored in a persistent database. On a restart, you can still access the job data waiting to get processed.
What happens when a job fails during processing? If the system crashes or job execution fails, the failed job is again moved to the todo list from the processing list and it gets processed again after a while.
What about multiple workers/instances? Multiple workers and instances can connect to Redis and execute the jobs without relying on your main server.

We will be having two extra parameters for each job to handle failures.
We will use tries to keep track of how many times the job has been sent to the processing list. We will just delete the job after 3 tries if it doesn't succeed.
Another parameter will be timestamp to store check if a job has been in the processing queue for a long time (most probably the system crashed during the execution). If the difference between the current time and the timestamp is more than 10 minutes, move this process back to the todo queue.

Establishing Redis connection

Let's also change the index.js file to use Redis instead of an array.

First, we update the code to use a Redis list instead of an array. We import the file above into our main file to use the Redis connection and push new jobs to the back of the Redis todoqueue

Adding new jobs to the queue

A worker will always choose to execute the first/oldest job in the queue. But suppose there are two workers that are executed at the same time and they are accessing the same job i.e. first element of the queue.

We don't want this to be happening otherwise there can be multiple executions of the same job or other unexpected behaviors.

For this reason, we are going to use a locking system that will lock the queue for the current worker and other workers will wait before this worker has finished accessing the job (note that I said accessing the job and not executing it, we will release the lock as soon as the job is accessed/popped from the queue).

Don't worry if you are seeing this concept for the first time. The implementation is quite simple using Redis.
We are using a simple SET command in Redis to keep a flag if a queue is locked or not. We check every 5 seconds if the flag is not set, if it's not set then we can acquire the lock and set the flag to locked (this is just a string and can be anything you want!).
Also, we set the lock to expire after 10 seconds because that's more than enough to access the job.

Let's also update the execute function. This will first try to acquire the lock for the todo queue, then access the element and transfer it to processing queue, then release the lock for todo queue and execute the job.
This will also delete the job from the processing queue after acquiring the lock.

Execute jobs from the queue

At this point, we have a system that will check for new jobs and execute them. But what about the failed jobs that crashed due to system crashes? We got that covered!
Every 10 minutes (change this wrt your need), we check if there is a job that's been in the the processing queue for more than 10 minutes (again, change this wrt your need) using the timestamp we provided.
Also, delete the job altogether if the tries have exceeded 3 times (this is optional for your use case).
Also, this doesn't have to be on the same server as your above code, but for simplicity, I have deployed this on the same server.

Retry mechanism for failed jobs

That was all for the code, we now have a working queue with a failsafe mechanism.
The whole code:

We have successfully implemented a job queue from scratch. But if this was too much for you, there are many Nodejs libraries that provide this feature, one of them is BullMQ https://docs.bullmq.io/
This library also uses Redis behind the scenes to implement tons of features. Let's look at an example of how we can use it.

That's all folks! Let me know in the comments if you need help with anything.

And make sure to leave a 👏 if this helped you. Thanks for reading!

Find more content like this at SliceOfDev.com

--

--