Backend Development 10 min read

RocketMQ Consumer Scaling and MessageQueue Allocation Strategies Explained

The article explains when adding consumers can reduce RocketMQ backlog, the reasons for delayed message pulling, handling slow external services, and details six different MessageQueue allocation strategies—including average, round‑robin, custom, machine‑room, nearby, and consistent‑hash—accompanied by Java code examples.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
RocketMQ Consumer Scaling and MessageQueue Allocation Strategies Explained

The interview begins by asking whether increasing the number of consumers helps when RocketMQ messages are backlogged. The answer clarifies that if the consumer count is less than the number of MessageQueues, adding consumers can speed up consumption; if the consumer count equals or exceeds the MessageQueue count, adding more consumers brings no benefit.

The discussion then covers cases where adding consumers does not guarantee faster consumption. Consumer pull speed also depends on local message processing speed; slow processing causes a delay before the next pull.

Three ProcessQueue thresholds can pause pulling: the number of stored messages exceeds a configurable limit (default 1000), the total size exceeds a limit (default 100 MB), or, for non‑ordered consumption, the offset difference between the first and last messages exceeds a limit (default 2000). For ordered consumption, a lock failure on the ProcessQueue adds a fixed 3‑second delay.

Root causes of slow consumer processing include complex business logic, slow database queries, sluggish cache (e.g., Redis), or slow external service calls. Mitigation strategies involve asynchronous notifications with retries, caching default results with graceful degradation, or persisting messages locally and acknowledging success to the broker.

When adding consumers while external services remain healthy, the article warns that the increased request volume may exceed the external service's throughput, and local database or cache pressure may also become bottlenecks.

Consumer‑to‑MessageQueue assignment is performed by a periodic load balancer (default every 20 seconds). Six allocation strategies are described:

1. Average Allocation Strategy

// AllocateMessageQueueAveragely logic
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

2. Round‑Robin (Circle) Allocation Strategy

// AllocateMessageQueueAveragelyByCircle logic
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
    if (i % cidAll.size() == index) {
        result.add(mqAll.get(i));
    }
}

3. Custom Allocation Strategy – users can specify which MessageQueues a consumer should handle.

AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1", "broker1", 0)));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
consumer.start();

4. Machine‑Room Allocation Strategy – consumers only consume queues from designated data‑center rooms.

AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1", "room2")));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
consumer.start();

The implementation first filters queues whose broker name matches the configured rooms (format "room@brokerName") and then applies the average allocation on the filtered set.

5. Machine‑Room Nearby Allocation Strategy – similar to the previous one but also distributes queues from rooms without consumers to the existing consumer pool.

6. Consistent‑Hash Allocation Strategy – consumers are placed on a hash ring; each MessageQueue is hashed and bound to the nearest consumer clockwise.

// AllocateMessageQueueConsistentHash logic
Collection
cidNodes = new ArrayList<>();
for (String cid : cidAll) {
    cidNodes.add(new ClientNode(cid));
}
ConsistentHashRouter
router = (customHashFunction != null) ?
    new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction) :
    new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
List
results = new ArrayList<>();
for (MessageQueue mq : mqAll) {
    ClientNode clientNode = router.routeNode(mq.toString());
    if (clientNode != null && currentCID.equals(clientNode.getKey())) {
        results.add(mq);
    }
}

The interview concludes with the candidate passing the session.

backendJavaload balancingRocketMQMessageQueueconsumer scaling
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.