Understanding RocketMQ Consumer Mechanisms: Consumption Modes, Idempotent Handling, Load Balancing, and Rebalancing
This article explains RocketMQ consumer fundamentals, covering concurrent and ordered consumption modes, how to achieve idempotent processing with Redis locks, the load‑balancing strategy implemented by RebalanceImpl, and the complete pull‑process‑consume workflow including code examples.
RocketMQ consumer fundamentals are introduced, covering consumption modes, concurrent and ordered consumption, idempotent handling, load balancing, and the internal rebalancing process.
Consumption Modes
Two modes: concurrent (default, using a thread pool with 20 threads) and ordered (ensuring message order per queue).
Concurrent Consumption
Describes default thread pool and shows the constructor of ConsumeMessageConcurrentlyService with its thread pool and scheduled executors.
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}Ordered Consumption
Explains that ordered consumption relies on sending messages to the same Topic and MessageQueue, and consumers process them sequentially, retrying on failure.
Idempotent Consumption
RocketMQ does not guarantee idempotency; applications must ensure that repeated consumption does not affect data, using Redis locks and status storage to achieve idempotent behavior.
Key points include lock key composition, state transitions, lock expiration handling, and cleaning up on failure.
Load Balancing
Illustrates how the pull request queue may contain multiple messages, and how the consumer uses RebalanceImpl to balance load across instances.
Getting MessageQueue List
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);Fetching Online Consumer IDs
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);Allocating MessageQueues
List<MessageQueue> allocateResult = strategy.allocate(this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);Describes the average allocation algorithm (e.g., 8 queues across 2 servers → 4‑4, 8 queues across 3 servers → 3‑3‑2).
Refreshing Local Cache & Building Pull Requests
Shows the updateProcessQueueTableInRebalance method that creates ProcessQueue objects for each assigned MessageQueue and builds PullRequest objects.
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
// ...
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.pullRequestQueue.put(pullRequest);
}Background Pull Thread
The PullMessageService runs a loop pulling messages from the broker and placing them into the pull request queue.
@Override
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
}Message Consumption
After pulling, a callback registers the business logic via messageListener . Successful consumption returns ConsumeReturnType.SUCCESS ; failures trigger retry mechanisms (concurrent mode retries up to 16 times then dead‑letter, ordered mode retries locally until success).
Summary
The article reviews consumption modes, idempotent design, load‑balancing strategy, and the complete flow from pulling messages to processing and acknowledging them, providing code snippets for key classes such as ConsumeMessageConcurrentlyService , RebalanceImpl , and PullMessageService .
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.