Backend Development 13 min read

Understanding RocketMQ Consumer Mechanisms: Consumption Modes, Idempotent Handling, Load Balancing, and Rebalancing

This article explains RocketMQ consumer fundamentals, covering concurrent and ordered consumption modes, how to achieve idempotent processing with Redis locks, the load‑balancing strategy implemented by RebalanceImpl, and the complete pull‑process‑consume workflow including code examples.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Understanding RocketMQ Consumer Mechanisms: Consumption Modes, Idempotent Handling, Load Balancing, and Rebalancing

RocketMQ consumer fundamentals are introduced, covering consumption modes, concurrent and ordered consumption, idempotent handling, load balancing, and the internal rebalancing process.

Consumption Modes

Two modes: concurrent (default, using a thread pool with 20 threads) and ordered (ensuring message order per queue).

Concurrent Consumption

Describes default thread pool and shows the constructor of ConsumeMessageConcurrentlyService with its thread pool and scheduled executors.

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

Ordered Consumption

Explains that ordered consumption relies on sending messages to the same Topic and MessageQueue, and consumers process them sequentially, retrying on failure.

Idempotent Consumption

RocketMQ does not guarantee idempotency; applications must ensure that repeated consumption does not affect data, using Redis locks and status storage to achieve idempotent behavior.

Key points include lock key composition, state transitions, lock expiration handling, and cleaning up on failure.

Load Balancing

Illustrates how the pull request queue may contain multiple messages, and how the consumer uses RebalanceImpl to balance load across instances.

Getting MessageQueue List

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

Fetching Online Consumer IDs

List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

Allocating MessageQueues

List<MessageQueue> allocateResult = strategy.allocate(this.consumerGroup,
            this.mQClientFactory.getClientId(),
            mqAll,
            cidAll);

Describes the average allocation algorithm (e.g., 8 queues across 2 servers → 4‑4, 8 queues across 3 servers → 3‑3‑2).

Refreshing Local Cache & Building Pull Requests

Shows the updateProcessQueueTableInRebalance method that creates ProcessQueue objects for each assigned MessageQueue and builds PullRequest objects.

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
        boolean changed = false;
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();
            // ...
        }
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            }
        }
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }

    @Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        }
    }

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        this.pullRequestQueue.put(pullRequest);
    }

Background Pull Thread

The PullMessageService runs a loop pulling messages from the broker and placing them into the pull request queue.

@Override
public void run() {
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
}

Message Consumption

After pulling, a callback registers the business logic via messageListener . Successful consumption returns ConsumeReturnType.SUCCESS ; failures trigger retry mechanisms (concurrent mode retries up to 16 times then dead‑letter, ordered mode retries locally until success).

Summary

The article reviews consumption modes, idempotent design, load‑balancing strategy, and the complete flow from pulling messages to processing and acknowledging them, providing code snippets for key classes such as ConsumeMessageConcurrentlyService , RebalanceImpl , and PullMessageService .

Javaconcurrencyload balancingmessage queueRocketMQConsumeridempotent
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.