Design and Implementation of a Distributed KV‑Based Message Queue
This article explains the core concepts and detailed design of a custom message queue built on a distributed key‑value store, covering terminology, architecture, broker metadata, topic metadata, message format, sending, storage, retrieval, delay handling, retry mechanisms, dead‑letter queues, and TTL policies.
Background
Our team is developing a workflow orchestration engine that requires MQ features such as delayed messages and consumer retries. After evaluating options, we chose a compute‑storage separation architecture and built a custom MQ on top of a distributed KV store, preserving the main MQ capabilities.
Terminology
Message: The message itself.
Topic: Logical classification of messages.
Partition: A topic can contain multiple partitions; each message is ultimately sent to a specific partition.
Partition Offset: The offset of a partition increments by one when a message is stored.
Producer: Sends messages to a specific partition of a topic.
Consumer Group: A group of consumers that can consume all messages of a topic; multiple groups may exist.
Consumer: Consumes messages; if a group has one consumer it receives all messages, otherwise each consumer receives a subset.
Consumer Group Offset: Records the consumption position of a group for each partition.
Broker: Message broker that receives messages from producers and delivers them to consumers.
Broker Cluster: A set of broker instances providing high availability.
Rebalance: Traditional consumer‑partition assignment limitation, which we eliminate; consumer count is not limited by partition count.
Delay Message: A message that becomes consumable only after a specified delay.
Retry Message: A message that is re‑delivered after a consumption failure according to a retry policy.
Overall Architecture
KV Storage
We illustrate the design using Redis Cluster as the KV storage; all broker instances in the broker cluster share this Redis Cluster. Any KV store that supports ordered key scans can be used.
For persistence, an in‑memory Redis mode is not suitable; in production we use a RocksDB‑based distributed KV that is compatible with the Redis protocol.
We omit discussion of Redis Cluster scaling, slot migration, etc., focusing on the core principles of building an MQ on a distributed KV.
Network Communication
Producers and consumers communicate with brokers via gRPC, a widely adopted framework also used by Apache RocketMQ 5.x.
Detailed Design
Broker Cluster Metadata
When a broker starts, it registers its information in Redis for service discovery, e.g., using a hash structure:
Key
[cluster]$cluster_nameValue
field value
broker1 ip:port
broker2 ip:portTopic Metadata
Topic metadata maintains the number of partitions and their distribution across the Redis Cluster. Users specify partition count when creating a topic.
Redis Cluster has 16384 slots; partitions are mapped to slots, effectively assigning them to different Redis shards.
After a topic is created, partitions are assigned to brokers (e.g., 10 partitions → 10 brokers, each handling one partition).
Mapping is stored in a hash:
Key
[topic_metadata]$topic_nameValue
field value
partition1 broker1
partition2 broker2Message
Messages are defined with protobuf:
message Message{
google.protobuf.Struct metadata = 1; // metadata
string partition = 2; // partition
int64 offset = 3; // offset
string msgId = 4; // unique id
string topic = 5; // topic
string key = 6; // routing key
bytes body = 7; // payload
google.protobuf.Timestamp born_time = 8; // creation time
google.protobuf.Timestamp expireTime = 9; // deadline for delayed messages
}Producers normally specify only topic and body; optional fields include key (hash‑based partitioning), partition (explicit), and expireTime (delayed delivery).
Message Sending
From the producer side:
Retry: Sending may fail; configure retry count and timeout.
Partition Selection: Determine partition before sending; the message is routed to the broker responsible for that partition.
Aggregation: Batch messages per partition to reduce network I/O.
Broker Selection: For unordered messages, choose brokers based on health or latency.
From the broker side:
When a message arrives, the broker increments the partition offset. Offsets are stored in a hash:
Key
[topic_offset]$topicValue
field value
partition1 offset1
partition2 offset2To improve efficiency, brokers keep offsets in memory and periodically persist them to KV. On startup, brokers may scan forward from the last saved offset to correct any gaps caused by crashes.
Message Storage
Messages are stored in Redis with keys formatted as:
[topic]$topic_$partition$offsetwhere the prefix identifies the topic, the embedded partition name uses Redis hash tags, and the suffix is the offset.
Message Retrieval
Consumers pull messages via Redis SCAN, starting from the consumer‑group offset stored in a hash:
Key
[consumer_offset]$group_$topicValue
field value
partition1 offset1
partition2 offset2When a consumer connects, the broker reads the stored offset for its assigned partitions and begins scanning from that position.
Delayed Messages
Delayed messages are first placed into a special delay topic. The original topic and partition are recorded in origin_topic and origin_partition . The delay‑topic key includes the broker ID and the expiration timestamp:
[delay]$broker_id}$expireTimeA delay‑message forwarder continuously scans for expired entries, rewrites the target topic/partition fields, and delivers the message to its final destination.
Retry Mechanism
Failed consumption also follows the delayed‑message path, but retry messages are routed to a dedicated retry topic per consumer group, e.g.:
[topic]retry_$consumer_groupThis isolates retries to the affected group without impacting others.
Dead‑Letter Queue
If a message exceeds the maximum retry count, it is moved to a dead‑letter queue:
[topic]dead_$consumer_groupMessage TTL
To reclaim storage, messages are automatically deleted: normal messages after 3 days, delayed messages 3 days after their expiration time.
Conclusion
The article presents the core ideas for building a message queue on top of a distributed KV store, omitting many production concerns such as disaster recovery, high availability, and performance optimizations; substantial engineering effort is required before using it in a real environment.
ByteDance Dali Intelligent Technology Team
Technical practice sharing from the ByteDance Dali Intelligent Technology Team
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.