Backend Development 13 min read

Design and Implementation of a Distributed KV‑Based Message Queue

This article explains the core concepts and detailed design of a custom message queue built on a distributed key‑value store, covering terminology, architecture, broker metadata, topic metadata, message format, sending, storage, retrieval, delay handling, retry mechanisms, dead‑letter queues, and TTL policies.

ByteDance Dali Intelligent Technology Team
ByteDance Dali Intelligent Technology Team
ByteDance Dali Intelligent Technology Team
Design and Implementation of a Distributed KV‑Based Message Queue

Background

Our team is developing a workflow orchestration engine that requires MQ features such as delayed messages and consumer retries. After evaluating options, we chose a compute‑storage separation architecture and built a custom MQ on top of a distributed KV store, preserving the main MQ capabilities.

Terminology

Message: The message itself.

Topic: Logical classification of messages.

Partition: A topic can contain multiple partitions; each message is ultimately sent to a specific partition.

Partition Offset: The offset of a partition increments by one when a message is stored.

Producer: Sends messages to a specific partition of a topic.

Consumer Group: A group of consumers that can consume all messages of a topic; multiple groups may exist.

Consumer: Consumes messages; if a group has one consumer it receives all messages, otherwise each consumer receives a subset.

Consumer Group Offset: Records the consumption position of a group for each partition.

Broker: Message broker that receives messages from producers and delivers them to consumers.

Broker Cluster: A set of broker instances providing high availability.

Rebalance: Traditional consumer‑partition assignment limitation, which we eliminate; consumer count is not limited by partition count.

Delay Message: A message that becomes consumable only after a specified delay.

Retry Message: A message that is re‑delivered after a consumption failure according to a retry policy.

Overall Architecture

KV Storage

We illustrate the design using Redis Cluster as the KV storage; all broker instances in the broker cluster share this Redis Cluster. Any KV store that supports ordered key scans can be used.

For persistence, an in‑memory Redis mode is not suitable; in production we use a RocksDB‑based distributed KV that is compatible with the Redis protocol.

We omit discussion of Redis Cluster scaling, slot migration, etc., focusing on the core principles of building an MQ on a distributed KV.

Network Communication

Producers and consumers communicate with brokers via gRPC, a widely adopted framework also used by Apache RocketMQ 5.x.

Detailed Design

Broker Cluster Metadata

When a broker starts, it registers its information in Redis for service discovery, e.g., using a hash structure:

Key

[cluster]$cluster_name

Value

field   value
broker1   ip:port
broker2   ip:port

Topic Metadata

Topic metadata maintains the number of partitions and their distribution across the Redis Cluster. Users specify partition count when creating a topic.

Redis Cluster has 16384 slots; partitions are mapped to slots, effectively assigning them to different Redis shards.

After a topic is created, partitions are assigned to brokers (e.g., 10 partitions → 10 brokers, each handling one partition).

Mapping is stored in a hash:

Key

[topic_metadata]$topic_name

Value

field   value
partition1   broker1
partition2   broker2

Message

Messages are defined with protobuf:

message Message{
  google.protobuf.Struct metadata = 1; // metadata
  string partition = 2; // partition
  int64 offset = 3; // offset
  string msgId = 4; // unique id
  string topic = 5; // topic
  string key = 6; // routing key
  bytes body = 7; // payload
  google.protobuf.Timestamp born_time = 8; // creation time
  google.protobuf.Timestamp expireTime = 9; // deadline for delayed messages
}

Producers normally specify only topic and body; optional fields include key (hash‑based partitioning), partition (explicit), and expireTime (delayed delivery).

Message Sending

From the producer side:

Retry: Sending may fail; configure retry count and timeout.

Partition Selection: Determine partition before sending; the message is routed to the broker responsible for that partition.

Aggregation: Batch messages per partition to reduce network I/O.

Broker Selection: For unordered messages, choose brokers based on health or latency.

From the broker side:

When a message arrives, the broker increments the partition offset. Offsets are stored in a hash:

Key

[topic_offset]$topic

Value

field   value
partition1   offset1
partition2   offset2

To improve efficiency, brokers keep offsets in memory and periodically persist them to KV. On startup, brokers may scan forward from the last saved offset to correct any gaps caused by crashes.

Message Storage

Messages are stored in Redis with keys formatted as:

[topic]$topic_$partition$offset

where the prefix identifies the topic, the embedded partition name uses Redis hash tags, and the suffix is the offset.

Message Retrieval

Consumers pull messages via Redis SCAN, starting from the consumer‑group offset stored in a hash:

Key

[consumer_offset]$group_$topic

Value

field   value
partition1   offset1
partition2   offset2

When a consumer connects, the broker reads the stored offset for its assigned partitions and begins scanning from that position.

Delayed Messages

Delayed messages are first placed into a special delay topic. The original topic and partition are recorded in origin_topic and origin_partition . The delay‑topic key includes the broker ID and the expiration timestamp:

[delay]$broker_id}$expireTime

A delay‑message forwarder continuously scans for expired entries, rewrites the target topic/partition fields, and delivers the message to its final destination.

Retry Mechanism

Failed consumption also follows the delayed‑message path, but retry messages are routed to a dedicated retry topic per consumer group, e.g.:

[topic]retry_$consumer_group

This isolates retries to the affected group without impacting others.

Dead‑Letter Queue

If a message exceeds the maximum retry count, it is moved to a dead‑letter queue:

[topic]dead_$consumer_group

Message TTL

To reclaim storage, messages are automatically deleted: normal messages after 3 days, delayed messages 3 days after their expiration time.

Conclusion

The article presents the core ideas for building a message queue on top of a distributed KV store, omitting many production concerns such as disaster recovery, high availability, and performance optimizations; substantial engineering effort is required before using it in a real environment.

RedisgRPCretryMessage QueueDistributed KVConsumer GroupDelay Message
ByteDance Dali Intelligent Technology Team
Written by

ByteDance Dali Intelligent Technology Team

Technical practice sharing from the ByteDance Dali Intelligent Technology Team

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.