Backend Development 11 min read

Understanding RocketMQ Consumer Scaling and MessageQueue Allocation Strategies

This article explains when adding consumers to a RocketMQ topic improves message consumption, the reasons for consumer pull delays, and details six MessageQueue allocation strategies—including average, round‑robin, custom, machine‑room, nearby, and consistent‑hash—accompanied by code examples.

Architect
Architect
Architect
Understanding RocketMQ Consumer Scaling and MessageQueue Allocation Strategies

In an interview‑style discussion, the author explains that increasing the number of consumers for a RocketMQ topic can reduce message backlog only when the consumer count is less than the number of MessageQueues; otherwise, adding consumers has no effect.

When consumers are fewer than MessageQueues, each additional consumer can increase the pull frequency, as illustrated with a 4‑queue, 2‑consumer scenario and a 4‑queue, 3‑consumer scenario (images shown).

The author also notes that consumer pull speed depends on local processing speed. If local consumption is slow, the consumer will delay subsequent pulls. Three ProcessQueue thresholds can trigger this delay:

Message count exceeds the default threshold (1000, configurable).

Message size exceeds the default threshold (100 MB, configurable).

For non‑ordered consumption, the offset difference between the first and last message exceeds the default threshold (2000, configurable).

Additional delay can occur when the ProcessQueue lock fails in ordered consumption, causing a fixed 3‑second delay.

Root causes of slow consumer processing include complex business logic, slow database queries, slow cache (e.g., Redis), and slow external service calls. For slow external calls, the author suggests asynchronous retries, caching default values with fallback, or storing messages locally and acknowledging success to the broker.

When adding consumers, one must consider the impact on external system throughput and local database/cache pressure.

MessageQueue allocation to new consumers is performed by a periodic load balancer (default every 20 seconds). RocketMQ provides six allocation strategies:

1. Average Allocation Strategy

Consumers are sorted, and MessageQueues are evenly divided; any remainder is distributed one‑by‑one to the first few consumers.

int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

2. Round‑Robin (Circle) Allocation Strategy

Iterates over consumers, assigning each MessageQueue in turn; multiple passes are made if queues outnumber consumers.

int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
    if (i % cidAll.size() == index) {
        result.add(mqAll.get(i));
    }
}

3. Custom Allocation Strategy

Allows explicit binding of specific MessageQueues to a consumer via configuration.

AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1", "broker1", 0)));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
consumer.start();

4. Machine‑Room Allocation Strategy

Consumers only consume MessageQueues belonging to specified machine rooms; unassigned rooms are later handled by the average strategy.

AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1", "room2")));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
consumer.start();

5. Machine‑Room Nearby Allocation Strategy

Extends the machine‑room strategy by also allocating queues from rooms without consumers to the existing consumer pool.

(Source class: AllocateMachineRoomNearby )

6. Consistent‑Hash Allocation Strategy

Consumers are placed on a hash ring; each MessageQueue is hashed and assigned to the nearest consumer clockwise.

Collection
cidNodes = new ArrayList<>();
for (String cid : cidAll) {
    cidNodes.add(new ClientNode(cid));
}
ConsistentHashRouter
router = (customHashFunction != null) ?
    new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction) :
    new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
List
results = new ArrayList<>();
for (MessageQueue mq : mqAll) {
    ClientNode clientNode = router.routeNode(mq.toString());
    if (clientNode != null && currentCID.equals(clientNode.getKey())) {
        results.add(mq);
    }
}

The interview concludes with the candidate passing the assessment.

Javaload balancingRocketMQMessageQueueconsumer scaling
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.