Backend Development 13 min read

Implementing a Reliable Delay Queue with Redis and Go

This article explains how to build a precise, persistent, and retry‑capable delayed task queue using Redis ordered sets, Lua scripts, and a Go client, addressing common e‑commerce scenarios like auto‑closing unpaid orders and sending activation messages.

Top Architect
Top Architect
Top Architect
Implementing a Reliable Delay Queue with Redis and Go

The article starts by presenting two typical business scenarios—automatically closing unpaid orders and sending activation SMS for newly created stores—and points out that simple periodic table scans either cause unacceptable timing errors or overload the database.

It then enumerates the essential requirements for a delay queue, ordered by importance: persistence across service restarts, a reliable retry mechanism for failed or timed‑out tasks, and as‑accurate timing as possible.

To meet these requirements, the author proposes a Redis‑based solution that uses a set of keys (msgKey, pendingKey, readyKey, unAckKey, retryKey, garbageKey, retryCountKey) and ordered‑set structures to store messages, schedule delivery timestamps, and track retries, ensuring at‑least‑once delivery without external components.

The Go client library (github.com/hdt3213/delayqueue) is introduced; after installing it with go get github.com/hdt3213/delayqueue , users register a callback function and start the queue with queue.StartConsume() . An example of sending delayed messages is provided:

package main
import (
    "github.com/go-redis/redis/v8"
    "github.com/hdt3213/delayqueue"
    "strconv"
    "time"
)
func main() {
    redisCli := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
    queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { return true })
    for i := 0; i < 10; i++ {
        err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
        if err != nil { panic(err) }
    }
    done := queue.StartConsume()
    <-done
}

The core of the system is a series of Lua scripts that run atomically inside Redis:

-- pending2ReadyScript
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if #msgs == 0 then return end
local args2 = {'LPush', KEYS[2]}
for _,v in ipairs(msgs) do table.insert(args2, v) end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])
-- ready2UnackScript
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if not msg then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
-- unack2RetryScript
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if #msgs == 0 then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs))
for i,v in ipairs(retryCounts) do
    local k = msgs[i]
    if tonumber(v) > 0 then
        redis.call('HIncrBy', KEYS[2], k, -1)
        redis.call('LPush', KEYS[3], k)
    else
        redis.call('HDel', KEYS[2], k)
        redis.call('SAdd', KEYS[4], k)
    end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])

Additional Go functions handle acknowledgment, negative acknowledgment, garbage collection of expired messages, and the main consume loop that moves messages from pending to ready, pulls them for processing, retries failed ones, and cleans up.

func (q *DelayQueue) ack(idStr string) error {
    ctx := context.Background()
    if err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err(); err != nil { return err }
    _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
    q.redisCli.HDel(ctx, q.retryCountKey, idStr)
    return nil
}

func (q *DelayQueue) consume() error {
    if err := q.pending2Ready(); err != nil { return err }
    var fetchCount uint
    for {
        idStr, err := q.ready2Unack()
        if err == redis.Nil { break }
        if err != nil { return err }
        fetchCount++
        ack, err := q.callback(idStr)
        if err != nil { return err }
        if ack { err = q.ack(idStr) } else { err = q.nack(idStr) }
        if err != nil { return err }
        if fetchCount >= q.fetchLimit { break }
    }
    if err := q.unack2Retry(); err != nil { return err }
    if err := q.garbageCollect(); err != nil { return err }
    // retry queue processing omitted for brevity
    return nil
}

By satisfying atomicity, avoiding duplicate processing, and keeping the queue state consistent, the implementation can be deployed across multiple instances without needing distributed locks, offering a simple yet robust delayed messaging solution.

distributed systemsRedisGoMessage QueueDelay QueueLua
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.