Understanding RocketMQ Scheduled Tasks and Their Implementation
This article explains the various scheduled tasks in RocketMQ, covering architecture overview, producer and consumer periodic operations, broker maintenance jobs, and NameServer housekeeping, while providing concrete Java code examples to illustrate how each task contributes to message handling, monitoring, and system reliability.
Today we share RocketMQ's scheduled tasks. By studying these tasks, we can better understand RocketMQ's message processing mechanism and design philosophy.
1. Architecture Review
RocketMQ consists of a NameServer cluster (nodes store full routing data and do not synchronize with each other), a Broker cluster that can be deployed in master‑slave mode for high availability, and Producers/Consumers that maintain long‑lived connections to NameServers to fetch routing information.
2. Producer and Consumer Tasks
2.1 Fetch NameServer Address
Both Producer and Consumer periodically (every 2 minutes) fetch the NameServer address list and update the local cache.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);2.2 Update Routing Information
Producers and Consumers refresh routing info from the NameServer every 30 seconds (configurable).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);2.3 Send Heartbeat to Brokers
Producers/Consumers clean offline brokers and send heartbeats at a default interval of 30 seconds.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);2.4 Persist Consumer Offsets
Consumers periodically persist their MessageQueue offsets (default every 5 seconds).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);2.5 Adjust Core Thread Count (Push Consumer)
For push‑mode consumers, the core thread pool can be adjusted based on pending messages (default every 1 minute). In version 4.9.4 the method is a placeholder.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);2.6 Expire Stale Requests
Both Producer and Consumer scan locally cached requests; if a request's start time plus timeout (plus 1 s) is earlier than the current time, it is marked expired and the callback is triggered.
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);2.7 Producer Performance Recording
Producers record success/failure status and latency to calculate TPS, RT, etc.
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
this.printStats();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 10000, 10000, TimeUnit.MILLISECONDS);2.8 Consumer Details
2.8.1 MessageQueue Locking
For ordered messages, a consumer periodically (default 20 s) locks the MessageQueue on the broker to ensure exclusive consumption.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);2.8.2 Performance Snapshots
Consumers capture performance snapshots every second, then aggregate and print statistics every 10 seconds.
// Snapshot collection
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
// Statistics printing
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
printStats();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 10000, 10000, TimeUnit.MILLISECONDS);2.8.3 Clear Expired Messages
Consumers periodically (default 15 minutes) check for expired messages, resend them to the broker, and remove them locally.
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);2.8.4 Refresh Topic Metadata
Consumers fetch Topic‑MessageQueue information from the NameServer every 30 seconds (configurable) and update the local cache.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
}
}
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);3. Broker Maintenance Tasks
3.1 State Sampling
The broker samples statistics (message count, size, etc.) per second, minute, and hour, printing them periodically.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
samplingInSeconds();
} catch (Throwable ignored) {}
}
}, 0, 10, TimeUnit.SECONDS);3.2 Record Message Delay
Broker records the time difference between persisting a message and reading it, printing the delay every 5 minutes.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
printAtMinutes();
} catch (Throwable ignored) {}
}
}, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);3.3 Persist Data
Broker persists consumer offsets, topic configurations, and subscription group data every 10 seconds (configurable).
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);3.4 Expire Stale Requests (Broker)
Broker scans cached requests and expires those whose timeout has passed (default every 3 seconds).
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);3.5 Filter Service Creation
If a consumer registers a filter class, the broker creates a FilterServer periodically.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
FilterServerManager.this.createFilterServer();
} catch (Exception e) {
log.error("", e);
}
}
}, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);3.6 Record Daily Message Volume
Broker records the total number of messages sent/received each day (period = 1 day).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);3.7 Persist Offset (Broker)
Broker persists message offsets every 5 seconds (configurable).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);3.8 Persist Filter Parameters
Filter data registered by consumers is persisted to disk periodically.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);3.9 Broker Self‑Protection
If a consumer reads messages too slowly, the broker marks the consumer as unreadable to protect itself (runs every 3 minutes).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);3.10 Print WaterMark
Broker prints water‑mark metrics (send/receive/transaction/query latency) every second.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);3.11 Print Offset Gap
Broker logs the gap between the latest commit log offset and the offset dispatched to MessageQueue/Index every 10 seconds.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);3.12 Update NameServer Addresses
Broker periodically fetches NameServer addresses and updates its local cache.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);3.13 Print Master‑Slave Offset Difference
Broker prints the offset difference between master and slave nodes periodically.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);3.14 Register with NameServer
Broker registers itself with all NameServers every 30 seconds (configurable, max 60 seconds).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);3.15 Sync Slave
Master synchronizes configuration and offset data to slaves every 10 seconds.
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);3.16 Delete Expired Files
Broker periodically (default every 10 seconds) cleans up expired CommitLog and ConsumeQueue files.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);3.17 File Size Check
Every 10 minutes the broker checks that the size of CommitLog files matches the expected offset difference.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);3.18 Record Stack Mapping (Debug)
If debugLockEnable is true, the broker records stack traces of threads holding locks every second.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {
String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" +
DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
} catch (Exception e) {}
}
}
}, 1, 1, TimeUnit.SECONDS);3.19 Disk Space Check
Every 10 seconds the broker checks whether the disk storing CommitLog is full.
this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
}
}, 1000L, 10000L, TimeUnit.MILLISECONDS);3.20 Persist Delayed Message Offsets
RocketMQ defines 18 delay levels; the broker periodically (default every 10 seconds) persists the offset table for each level.
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);3.21 Close Abnormal Connections
Broker scans all long‑lived connections (producers, consumers, FilterServers) and closes those that are inactive.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);3.22 Clean Expired Requests (Fast Failure)
If fast‑failure mode is enabled, the broker cleans expired requests every 10 ms, but only when the system is not busy.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
}
}
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);4. NameServer Tasks
4.1 Scan Expired Brokers
NameServer periodically (every 10 seconds) scans its broker list and removes entries whose registration has not been refreshed for more than 120 seconds.
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);4.2 Print Configuration
NameServer loads KV configuration files at startup and prints the current config table every minute.
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);5. Summary
RocketMQ contains numerous scheduled tasks that enhance its design by handling business processing, monitoring, heartbeats, cleanup, connection management, and data persistence. Understanding these tasks provides deeper insight into RocketMQ's architecture and operational principles.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.