- Published on
Simple task queue with Redis Streams
- Christopher Schleiden
Lately I've done a bit more work with Redis. While I had used it via abstractions at previous jobs, I had never interacted with it directly so there were a few things to catch up on.
One of the scenarios I needed it for, was a simple task queue for go-workflows. Basic requirements where the ability to push and pop tasks, not having to do polling so use blocking (
*BL*) Redis commands, and resiliency to worker failures. So if a worker pops a task off the queue and doesn't finish or heartbeats it within a certain timeframe, it should become available again for another worker.
Using lists and
The first idea, and how many task queues were originally implemented in Redis, was to use two lists. One for pending tasks, one for tasks being processed (processing) and move items between them atomically with
BRPOPLPUSH or its more generic successor
This allows to push tasks, pop tasks, and keep track of the tasks being worked on currently. The problem is when a worker crashes, we need to somehow move tasks from the processing list back to the pending list. One option is to keep track of when a worker picked up a task, or sent a heartbeat, in a separate sorted set
ZSET. The score is the time the worker acquired or renewed the lease last. We can then occasionally check the processing list for tasks that have been in there for too long, and move them back to the pending list and delete them from the sorted set.
One problem with this approach is that to pick up task, we now have to perform two operations:
BLMOVE pending processing RIGHT LEFT <timeout>to wait for a task in the pending list and move it to processing, and then
ZADD time <now + lock_timeout> <task id>to update the score.
We cannot execute the
ZADD in a single, atomic operation like a lua script, since we do want the blocking behavior or
BLMOVE -- which you cannot use in a script. This leaves the possibility of a race condition with the periodic heartbeat check. What could happen is that we execute
BLMOVE, and then before we can do the
ZADD, the heartbeat check starts running. It would encounter a task in the processing list without a score. To work around that, we can give up the blocking
BLMOVE and use
ZADD in a script and polling instead, but that increases load on the redis server, especially with multiple workers.
We can also take this scenario into account for the heartbeat check, and ignore any entry in pending which doesn't have a score, yet, but that complicates the cleanup logic.
Using Redis streams
What I ended up using were Redis Streams. Streams have a number of interesting properties, that make them a good fit for the requirements mentioned above.
When starting up, we create a stream
task-queue, and a single consumer group for all workers:
task-workers. Then we generate a unique name for each worker, and treat each as a single consumer in the consumer group.
To add new tasks to the queue, we
XADD a new message:
XADD task-queue * id <id of task>
Then each worker executes
XREADGROUP for reading new messages as part of the consumer group. The consumer group behavior for Redis Streams is similar to the one of Kafka, where only one consumer in the group receives the message:
XREADGROUP <group> <consumer> COUNT 1 BLOCK <timeout in ms> STREAMS task-queue
Marking tasks as finished
Whenever a worker has finished processing a task, we use
XACK to acknowledge the message and then
XDEL to delete it from the stream. We don't have to delete the message, we could occasionally trim the whole stream, but removing it right away works well enough for this scenario.
Recovering abandoned tasks
Once a worker has read a message, it starts processing it. In order to support the recovery of abandoned tasks, we take advantage of the additional
XACK/XPENDING features of Redis.
By not specifying the
NOACK parameter for the
XREADGROUP command, Redis requires the worker to explicitly acknowledge the message, before it is marked as delivered. Until it is acknowledged, it is not delivered to any other consumer in the group, and it is kept in a Pending Entries List (PEL). The PEL also tracks for each pending message, when it was read of claimed last. Redis calls that the idle time.
To recover abandoned tasks, we can use the
XAUTOCLAIM command. With
XAUTOCLAIM task-queue task-workers <consumer> <min-idle-time> 0 COUNT 1
we can transfer at most one message from the PEl which has a higher idle time than the threshold specified to the calling worker.
So checking for work is actually
XAUTOCLAIM and if that doesn't return any message, then the
Heartbeats for long running tasks
To prevent long running tasks from being acquired by another worker, workers periodically execute
XCLAIM on their already claimed message. This resets the idle time and serves as a heartbeat for the task.
XCLAIM task-queue task-workers <consumer> 0 <message id task>
I'm not using it yet, but we increase the retry counter every time so that we know how often a task was picked up again. Also pass
0 as the min-idle time, to always claim the message - even though it's already claimed by this worker.
I had an additional requirement to support only unique task ids in a give task queue, but that was easy to add by combining the
XDEL commands with an additional set and the
An implementation for this in Go is available here.