Backend Development 22 min read

Load Balancing Mechanism in RocketMQ Producer and Consumer

The article explains RocketMQ’s load‑balancing for producers and consumers, detailing how producers obtain routing info from the NameServer, select MessageQueues via round‑robin while avoiding failed brokers, and how consumers periodically rebalance queue allocation using strategies such as AllocateMessageQueueAveragely to ensure even distribution and customizable assignment.

vivo Internet Technology
vivo Internet Technology
vivo Internet Technology
Load Balancing Mechanism in RocketMQ Producer and Consumer

This article introduces the load‑balancing mechanisms of Apache RocketMQ during message production and consumption. It starts with an overview of RocketMQ’s components – Producer, Broker, Consumer, NameServer, Producer Group, and Consumer Group – and explains how messages are stored in MessageQueue objects on specific Brokers.

Overall Architecture

RocketMQ’s architecture consists of four main parts: Producer, Consumer, NameServer, and Broker. Producers publish messages to Brokers selected by a load‑balancing module, while Consumers pull or push messages from Brokers. NameServer acts as a lightweight routing registry.

Producer Message Production Process

The producer first contacts the NameServer to obtain routing information for a Topic, parses this information into local routing data, and then sends messages to a selected Broker.

public class MQClientInstance {
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // omitted code
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            // generate producer’s TopicPublishInfo
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator
> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry
entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            }
        } catch (InterruptedException e) {
        }
        return false;
    }
}

Routing synchronization is a prerequisite for both producers and consumers; without up‑to‑date routing data, a client cannot determine which Broker to interact with.

Routing Parsing Process

TopicRouteData contains QueueData (queue information per Broker) and BrokerData (address information). TopicPublishInfo holds a list of MessageQueue objects representing the finest‑grained queues.

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List
queueDatas;
    private List
brokerDatas;
    private HashMap
/* Filter Server */> filterServerTable;
}

public class QueueData implements Comparable
{
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSynFlag;
}

public class BrokerData implements Comparable
{
    private String cluster;
    private String brokerName;
    private HashMap
brokerAddrs;
    private final Random random = new Random();
}

The producer selects a MessageQueue using a round‑robin index, skipping the last failed Broker if necessary.

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0) pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

Load‑Balancing Process for Producers

During sending, the producer obtains TopicPublishInfo, selects a MessageQueue, and attempts to send. If sending fails, it retries with another queue, avoiding the previously failed Broker.

private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode,
        SendCallback sendCallback, long timeout) throws MQClientException, RemotingException,
        MQBrokerException, InterruptedException {
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = mq == null ? null : mq.getBrokerName();
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback,
                            topicPublishInfo, timeout - costTime);
                    if (communicationMode == CommunicationMode.SYNC) {
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                continue;
                            }
                        }
                        return sendResult;
                    }
                } catch (MQBrokerException e) {
                    // omitted
                } catch (InterruptedException e) {
                    // omitted
                }
            } else {
                break;
            }
        }
        if (sendResult != null) {
            return sendResult;
        }
    }
    return null;
}

Consumer Message Consumption Process

Consumers also synchronize routing information from the NameServer, parse it locally, and then perform a rebalance to determine which MessageQueues they are responsible for.

private void startScheduledTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}

public void updateTopicRouteInfoFromNameServer() {
    Set
topicList = new HashSet
();
    for (Entry
entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            Set
subList = impl.subscriptions();
            if (subList != null) {
                for (SubscriptionData subData : subList) {
                    topicList.add(subData.getTopic());
                }
            }
        }
    }
    for (String topic : topicList) {
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}

The rebalance service periodically redistributes MessageQueues among consumers in a consumer group.

public class RebalanceService extends ServiceThread {
    private static long waitInterval = Long.parseLong(System.getProperty(
        "rocketmq.client.rebalance.waitInterval", "20000"));
    private final MQClientInstance mqClientFactory;
    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }
    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}

The core allocation algorithm is implemented in AllocateMessageQueueAveragely , which distributes queues evenly among consumers.

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    @Override
    public List
allocate(String consumerGroup, String currentCID,
        List
mqAll, List
cidAll) {
        List
result = new ArrayList
();
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ?
            mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    @Override
    public String getName() {
        return "AVG";
    }
}

To enforce consumption on a specific machine, the allocation logic can be customized to return an empty list for non‑target IPs.

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    @Override
    public List
allocate(String consumerGroup, String currentCID,
        List
mqAll, List
cidAll) {
        List
result = new ArrayList
();
        // If the current consumer ID is not in the allowed list, return empty
        if (!cidAll.contains(currentCID)) {
            return result;
        }
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ?
            mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

The article concludes that understanding RocketMQ’s load‑balancing mechanisms helps developers design reliable messaging solutions and customize consumer allocation when needed.

Javaload balancingMessage QueuerocketmqConsumerproducer
vivo Internet Technology
Written by

vivo Internet Technology

Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.