Backend Development 13 min read

Implementing a Reliable Delay Queue with Redis and Go

This article explains how to build a precise, persistent delay queue using Redis data structures and Lua scripts, demonstrates a Go client library with code examples for sending, consuming, acknowledging, and retrying delayed messages, and discusses the design requirements such as durability, retry mechanisms, and timing accuracy.

Top Architect
Top Architect
Top Architect
Implementing a Reliable Delay Queue with Redis and Go

When an order stays unpaid or a newly created shop needs to trigger activation messages, simple periodic table scans cause unacceptable latency and heavy database load; therefore a more accurate and reliable delayed‑task solution is required.

The required characteristics of a delay queue, ordered by importance, are:

Persistence – tasks must survive service restarts or crashes.

Retry mechanism – failed or timed‑out tasks should be retried.

Precise timing – delivery should be as close to the scheduled time as possible.

Using Redis, each message is stored in a separate string key (identified by a UUID) and several auxiliary data structures are used:

msgKey : the actual message content.

pendingKey : a sorted set where the score is the delivery timestamp.

readyKey : a list of message IDs ready for consumption.

unAckKey : a sorted set of messages that have been delivered but not yet acknowledged.

retryKey , garbageKey , retryCountKey : structures supporting retries and garbage collection.

Four atomic Lua scripts move messages between these structures:

pending2ReadyScript – moves messages whose delivery time has arrived from pendingKey to readyKey :

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if #msgs == 0 then return end
local args2 = {'LPush', KEYS[2]}
for _,v in ipairs(msgs) do table.insert(args2, v) end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])

ready2UnackScript – pops a message from readyKey (or retryKey ) and adds it to unAckKey with a retry timestamp:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if not msg then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript – moves expired un‑acknowledged messages to the retry list, decreasing the remaining retry count and moving exhausted messages to garbageKey :

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if #msgs == 0 then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs))
for i,v in ipairs(retryCounts) do
  local k = msgs[i]
  if tonumber(v) > 0 then
    redis.call('HIncrBy', KEYS[2], k, -1)
    redis.call('LPush', KEYS[3], k)
  else
    redis.call('HDel', KEYS[2], k)
    redis.call('SAdd', KEYS[4], k)
  end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])

ack – permanently removes a successfully processed message:

func (q *DelayQueue) ack(idStr string) error {
  ctx := context.Background()
  if err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err(); err != nil {
    return fmt.Errorf("remove from unack failed: %v", err)
  }
  _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
  q.redisCli.HDel(ctx, q.retryCountKey, idStr)
  return nil
}

nack – schedules a retry by resetting the retry timestamp:

func (q *DelayQueue) nack(idStr string) error {
  ctx := context.Background()
  err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{Member: idStr, Score: float64(time.Now().Unix())}).Err()
  if err != nil { return fmt.Errorf("negative ack failed: %v", err) }
  return nil
}

The core consume loop runs every second, invoking the scripts in order, pulling messages, calling a user‑provided callback, and performing ack/nack handling, retry processing, and garbage collection:

func (q *DelayQueue) consume() error {
  if err := q.pending2Ready(); err != nil { return err }
  var fetchCount uint
  for {
    idStr, err := q.ready2Unack()
    if err == redis.Nil { break }
    if err != nil { return err }
    fetchCount++
    ack, err := q.callback(idStr)
    if err != nil { return err }
    if ack { err = q.ack(idStr) } else { err = q.nack(idStr) }
    if err != nil { return err }
    if fetchCount >= q.fetchLimit { break }
  }
  if err := q.unack2Retry(); err != nil { return err }
  if err := q.garbageCollect(); err != nil { return err }
  // repeat for retry queue …
  return nil
}

To start using the library, install it with:

go get github.com/hdt3213/delayqueue

and run a simple program that registers a callback, sends delayed messages, and starts the consumer:

package main
import (
  "github.com/go-redis/redis/v8"
  "github.com/hdt3213/delayqueue"
  "strconv"
  "time"
)
func main() {
  redisCli := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
  queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { return true })
  for i := 0; i < 10; i++ {
    _ = queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
  }
  done := queue.StartConsume()
  <-done
}

With this design, the delay queue provides at‑least‑once delivery, persistence, automatic retries, and can be deployed across multiple consumer instances without distributed locks.

RedisGoMessage QueueDelay QueueLua
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.