Published on

Simple task queue with Redis Streams

Authors

Lately I've done a bit more work with Redis. While I had used it via abstractions at previous jobs, I had never interacted with it directly so there were a few things to catch up on.

One of the scenarios I needed it for, was a simple task queue for go-workflows. Basic requirements where the ability to push and pop tasks, not having to do polling so use blocking (*BL*) Redis commands, and resiliency to worker failures. So if a worker pops a task off the queue and doesn't finish or heartbeats it within a certain timeframe, it should become available again for another worker.

Using lists and BLMOVE

The first idea, and how many task queues were originally implemented in Redis, was to use two lists. One for pending tasks, one for tasks being processed (processing) and move items between them atomically with BRPOPLPUSH or its more generic successor BLMOVE.

This allows to push tasks, pop tasks, and keep track of the tasks being worked on currently. The problem is when a worker crashes, we need to somehow move tasks from the processing list back to the pending list. One option is to keep track of when a worker picked up a task, or sent a heartbeat, in a separate sorted set ZSET. The score is the time the worker acquired or renewed the lease last. We can then occasionally check the processing list for tasks that have been in there for too long, and move them back to the pending list and delete them from the sorted set.

One problem with this approach is that to pick up task, we now have to perform two operations:

  1. BLMOVE pending processing RIGHT LEFT <timeout> to wait for a task in the pending list and move it to processing, and then
  2. ZADD time <now + lock_timeout> <task id> to update the score.

We cannot execute the BLMOVE and ZADD in a single, atomic operation like a lua script, since we do want the blocking behavior or BLMOVE -- which you cannot use in a script. This leaves the possibility of a race condition with the periodic heartbeat check. What could happen is that we execute BLMOVE, and then before we can do the ZADD, the heartbeat check starts running. It would encounter a task in the processing list without a score. To work around that, we can give up the blocking BLMOVE and use LMOVE with ZADD in a script and polling instead, but that increases load on the redis server, especially with multiple workers.

We can also take this scenario into account for the heartbeat check, and ignore any entry in pending which doesn't have a score, yet, but that complicates the cleanup logic.

Using Redis streams

What I ended up using were Redis Streams. Streams have a number of interesting properties, that make them a good fit for the requirements mentioned above.

When starting up, we create a stream task-queue, and a single consumer group for all workers: task-workers. Then we generate a unique name for each worker, and treat each as a single consumer in the consumer group.

Adding tasks

To add new tasks to the queue, we XADD a new message:

XADD task-queue * id <id of task>

Retrieving tasks

Then each worker executes XREADGROUP for reading new messages as part of the consumer group. The consumer group behavior for Redis Streams is similar to the one of Kafka, where only one consumer in the group receives the message:

XREADGROUP <group> <consumer> COUNT 1 BLOCK <timeout in ms> STREAMS task-queue

Marking tasks as finished

Whenever a worker has finished processing a task, we use XACK to acknowledge the message and then XDEL to delete it from the stream. We don't have to delete the message, we could occasionally trim the whole stream, but removing it right away works well enough for this scenario.

Recovering abandoned tasks

Once a worker has read a message, it starts processing it. In order to support the recovery of abandoned tasks, we take advantage of the additional XACK/XPENDING features of Redis.

By not specifying the NOACK parameter for the XREADGROUP command, Redis requires the worker to explicitly acknowledge the message, before it is marked as delivered. Until it is acknowledged, it is not delivered to any other consumer in the group, and it is kept in a Pending Entries List (PEL). The PEL also tracks for each pending message, when it was read of claimed last. Redis calls that the idle time.

To recover abandoned tasks, we can use the XAUTOCLAIM command. With

XAUTOCLAIM task-queue task-workers <consumer> <min-idle-time> 0 COUNT 1

we can transfer at most one message from the PEl which has a higher idle time than the threshold specified to the calling worker.

So checking for work is actually XAUTOCLAIM and if that doesn't return any message, then the XREADGROUP command.

Heartbeats for long running tasks

To prevent long running tasks from being acquired by another worker, workers periodically execute XCLAIM on their already claimed message. This resets the idle time and serves as a heartbeat for the task.

XCLAIM task-queue task-workers <consumer> 0 <message id task>

I'm not using it yet, but we increase the retry counter every time so that we know how often a task was picked up again. Also pass 0 as the min-idle time, to always claim the message - even though it's already claimed by this worker.

Enforcing uniqueness

I had an additional requirement to support only unique task ids in a give task queue, but that was easy to add by combining the XADD and XACK / XDEL commands with an additional set and the SADD / SREM commands.

Implementation

An implementation for this in Go is available here.