Deep Dive into RocketMQ: Storage Architecture and Consumer Processing Flow
RocketMQ stores messages in a sequential CommitLog while using a ConsumerQueue index for fast random access, and employs two looping threads—RebalanceService for load‑balancing queue assignments and PullMessageService for continuously pulling messages and dispatching them to user callbacks via a thread‑pooled consumer pipeline.
This article explores the inner workings of RocketMQ, focusing on how messages are stored and how consumers retrieve them.
Key questions addressed:
What is the storage architecture of messages?
How do consumers pull messages?
What load‑balancing strategy does RocketMQ use?
Storage architecture
RocketMQ uses two core files: CommitLog (stores the raw message body sequentially) and ConsumerQueue (an index file that records the CommitLog offset, message length, and tag hashcode). The ConsumerQueue enables fast random access for consumers while preserving the high write throughput of the CommitLog.
Consumer processing flow
The consumer lifecycle is driven by two looping threads:
RebalanceService – periodically executes a load‑balancing algorithm to assign queues to each consumer instance.
PullMessageService – continuously takes PullRequest objects from a queue, fetches messages from the broker, and hands them to the user‑defined callback.
Key code snippets are shown below.
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
this.mQClientAPIImpl.start();
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
}
}
}The rebalance loop iterates over all subscribed topics, obtains the set of queues ( mqAll ) and consumer IDs ( cidAll ), and applies the AllocateMessageQueueStrategy (default allocateAveragely ) to compute the queues assigned to the current consumer.
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}The pull thread extracts PullRequest objects and invokes the broker to fetch messages.
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}After a successful pull, the consumer creates a ConsumeRequest and submits it to a thread pool for execution. The request ultimately calls the user‑provided MessageListenerConcurrently to process the messages.
public void submitConsumeRequest(final List
msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// split into batches ...
}
}In summary, the article demonstrates how RocketMQ achieves high‑throughput storage with CommitLog/ConsumerQueue, how load balancing distributes queues among consumers, and how the pull‑and‑consume pipeline is implemented in Java.
DaTaobao Tech
Official account of DaTaobao Technology
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.