Backend Development 21 min read

Implementing Message Queues with Redis: Lists, Streams, and Pub/Sub

This article explains how Redis can be used to build message queues by leveraging List commands with LPUSH/RPOP and BRPOPLPUSH for simple queues, the Stream data type introduced in Redis 5.0 for durable, ordered, consumer‑group‑based queues, and the Pub/Sub model for real‑time broadcasting, while also analyzing the underlying source code structures such as quicklist, radix trees, and stream internals.

IT Architects Alliance
IT Architects Alliance
IT Architects Alliance
Implementing Message Queues with Redis: Lists, Streams, and Pub/Sub

Redis can serve as a message‑queue platform, but typical MQ challenges—message loss, duplicate delivery, and ordering—must be addressed, often by using specialized Redis features or data types.

List‑based queue : By pushing items with LPUSH and popping with RPOP , a simple FIFO queue is created. Blocking reads use BRPOP , and reliable processing can be achieved with BRPOPLPUSH , which moves a message to a backup list before consumption.

127.0.0.1:6379> LPUSH test "ceshi-1"
(integer) 1
127.0.0.1:6379> RPOP test
"ceshi-1"

When a consumer crashes after reading a message, BRPOPLPUSH ensures the message is also stored in a secondary list, allowing recovery after restart.

127.0.0.1:6379> BRPOPLPUSH test a-test 100
"ceshi-1"

Quicklist implementation : Prior to Redis 3.2, lists were stored as ziplist or linkedlist; Redis 3.2 introduced quicklist to split data across multiple ziplist nodes, reducing chain updates. Core functions such as listTypePush , quicklistPush , and quicklistPushHead manage insertion, while the quicklist struct holds head/tail pointers, element counts, and compression settings.

void listTypePush(robj *subject, robj *value, int where) {
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
        int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
        if (value->encoding == OBJ_ENCODING_INT) {
            char buf[32];
            ll2string(buf, 32, (long)value->ptr);
            quicklistPush(subject->ptr, buf, strlen(buf), pos);
        } else {
            quicklistPush(subject->ptr, value->ptr, sdslen(value->ptr), pos);
        }
    } else {
        serverPanic("Unknown list encoding");
    }
}

Streams : Introduced in Redis 5.0, the STREAM type provides a persistent, ordered log with built‑in consumer groups. Commands include XADD (adds a message with an auto‑generated ID), XREAD (reads messages), XGROUP (creates consumer groups), XREADGROUP (reads for a group), XPENDING and XACK (track and acknowledge processing).

$ XADD teststream * name xiaohong surname xiaobai
"1646650328883-0"

The internal stream struct stores entries in a radix tree ( rax ), with each entry identified by a streamID (timestamp + sequence). Consumer‑group state is kept in streamCG (last ID, pending entries, consumer list), and each consumer is represented by streamConsumer (last seen time, pending list).

typedef struct stream {
    rax *rax;               /* radix tree holding the stream */
    uint64_t length;        /* number of elements */
    streamID last_id;       /* last ID in the stream */
    rax *cgroups;           /* consumer groups */
} stream;

Pub/Sub : The publish/subscribe model enables real‑time message broadcasting. Clients subscribe to channels with SUBSCRIBE or pattern‑based channels with PSUBSCRIBE , and publishers send messages via PUBLISH . Redis maintains pubsub_channels (channel → client list) and pubsub_patterns (pattern → client list) dictionaries. Subscription, unsubscription, and message delivery are handled by functions such as pubsubSubscribeChannel , pubsubUnsubscribeChannel , and pubsubPublishMessage .

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;
    if (dictAdd(c->pubsub_channels, channel, NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        de = dictFind(server.pubsub_channels, channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels, channel, clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients, c);
    }
    addReplyPubsubSubscribed(c, channel);
    return retval;
}

Conclusion : Redis offers three mechanisms for messaging—simple List queues (no consumer groups), Pub/Sub (no persistence), and Streams (persistent, ordered, consumer‑group‑aware). Selecting the appropriate method depends on durability, ordering, and consumption semantics required by the application.

backendRedisMessage QueueData StructuresstreamsPubSub
IT Architects Alliance
Written by

IT Architects Alliance

Discussion and exchange on system, internet, large‑scale distributed, high‑availability, and high‑performance architectures, as well as big data, machine learning, AI, and architecture adjustments with internet technologies. Includes real‑world large‑scale architecture case studies. Open to architects who have ideas and enjoy sharing.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.