Backend Development 25 min read

Understanding RocketMQ Scheduled Tasks and Their Implementation

This article explains the various scheduled tasks in RocketMQ, covering architecture overview, producer and consumer periodic operations, broker maintenance jobs, and NameServer housekeeping, while providing concrete Java code examples to illustrate how each task contributes to message handling, monitoring, and system reliability.

Architect
Architect
Architect
Understanding RocketMQ Scheduled Tasks and Their Implementation

Today we share RocketMQ's scheduled tasks. By studying these tasks, we can better understand RocketMQ's message processing mechanism and design philosophy.

1. Architecture Review

RocketMQ consists of a NameServer cluster (nodes store full routing data and do not synchronize with each other), a Broker cluster that can be deployed in master‑slave mode for high availability, and Producers/Consumers that maintain long‑lived connections to NameServers to fetch routing information.

2. Producer and Consumer Tasks

2.1 Fetch NameServer Address

Both Producer and Consumer periodically (every 2 minutes) fetch the NameServer address list and update the local cache.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
        } catch (Exception e) {
            log.error("ScheduledTask fetchNameServerAddr exception", e);
        }
    }
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);

2.2 Update Routing Information

Producers and Consumers refresh routing info from the NameServer every 30 seconds (configurable).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.updateTopicRouteInfoFromNameServer();
        } catch (Exception e) {
            log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
        }
    }
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

2.3 Send Heartbeat to Brokers

Producers/Consumers clean offline brokers and send heartbeats at a default interval of 30 seconds.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

2.4 Persist Consumer Offsets

Consumers periodically persist their MessageQueue offsets (default every 5 seconds).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

2.5 Adjust Core Thread Count (Push Consumer)

For push‑mode consumers, the core thread pool can be adjusted based on pending messages (default every 1 minute). In version 4.9.4 the method is a placeholder.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.adjustThreadPool();
        } catch (Exception e) {
            log.error("ScheduledTask adjustThreadPool exception", e);
        }
    }
}, 1, 1, TimeUnit.MINUTES);

2.6 Expire Stale Requests

Both Producer and Consumer scan locally cached requests; if a request's start time plus timeout (plus 1 s) is earlier than the current time, it is marked expired and the callback is triggered.

this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            NettyRemotingClient.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);

2.7 Producer Performance Recording

Producers record success/failure status and latency to calculate TPS, RT, etc.

executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        snapshotList.addLast(statsBenchmark.createSnapshot());
        if (snapshotList.size() > 10) {
            snapshotList.removeFirst();
        }
    }
}, 1000, 1000, TimeUnit.MILLISECONDS);

executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            this.printStats();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}, 10000, 10000, TimeUnit.MILLISECONDS);

2.8 Consumer Details

2.8.1 MessageQueue Locking

For ordered messages, a consumer periodically (default 20 s) locks the MessageQueue on the broker to ensure exclusive consumption.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            ConsumeMessageOrderlyService.this.lockMQPeriodically();
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
        }
    }
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);

2.8.2 Performance Snapshots

Consumers capture performance snapshots every second, then aggregate and print statistics every 10 seconds.

// Snapshot collection
executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
        if (snapshotList.size() > 10) {
            snapshotList.removeFirst();
        }
    }
}, 1000, 1000, TimeUnit.MILLISECONDS);

// Statistics printing
executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            printStats();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}, 10000, 10000, TimeUnit.MILLISECONDS);

2.8.3 Clear Expired Messages

Consumers periodically (default 15 minutes) check for expired messages, resend them to the broker, and remove them locally.

this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            cleanExpireMsg();
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
        }
    }
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);

2.8.4 Refresh Topic Metadata

Consumers fetch Topic‑MessageQueue information from the NameServer every 30 seconds (configurable) and update the local cache.

scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            fetchTopicMessageQueuesAndCompare();
        } catch (Exception e) {
            log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
        }
    }
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);

3. Broker Maintenance Tasks

3.1 State Sampling

The broker samples statistics (message count, size, etc.) per second, minute, and hour, printing them periodically.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            samplingInSeconds();
        } catch (Throwable ignored) {}
    }
}, 0, 10, TimeUnit.SECONDS);

3.2 Record Message Delay

Broker records the time difference between persisting a message and reading it, printing the delay every 5 minutes.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            printAtMinutes();
        } catch (Throwable ignored) {}
    }
}, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);

3.3 Persist Data

Broker persists consumer offsets, topic configurations, and subscription group data every 10 seconds (configurable).

this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            if (started.get()) {
                ScheduleMessageService.this.persist();
            }
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate flush exception", e);
        }
    }
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);

3.4 Expire Stale Requests (Broker)

Broker scans cached requests and expires those whose timeout has passed (default every 3 seconds).

this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            NettyRemotingServer.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);

3.5 Filter Service Creation

If a consumer registers a filter class, the broker creates a FilterServer periodically.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            FilterServerManager.this.createFilterServer();
        } catch (Exception e) {
            log.error("", e);
        }
    }
}, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);

3.6 Record Daily Message Volume

Broker records the total number of messages sent/received each day (period = 1 day).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.getBrokerStats().record();
        } catch (Throwable e) {
            log.error("schedule record error.", e);
        }
    }
}, initialDelay, period, TimeUnit.MILLISECONDS);

3.7 Persist Offset (Broker)

Broker persists message offsets every 5 seconds (configurable).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

3.8 Persist Filter Parameters

Filter data registered by consumers is persisted to disk periodically.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerFilterManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumer filter error.", e);
        }
    }
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

3.9 Broker Self‑Protection

If a consumer reads messages too slowly, the broker marks the consumer as unreadable to protect itself (runs every 3 minutes).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.protectBroker();
        } catch (Throwable e) {
            log.error("protectBroker error.", e);
        }
    }
}, 3, 3, TimeUnit.MINUTES);

3.10 Print WaterMark

Broker prints water‑mark metrics (send/receive/transaction/query latency) every second.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.printWaterMark();
        } catch (Throwable e) {
            log.error("printWaterMark error.", e);
        }
    }
}, 10, 1, TimeUnit.SECONDS);

3.11 Print Offset Gap

Broker logs the gap between the latest commit log offset and the offset dispatched to MessageQueue/Index every 10 seconds.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
        } catch (Throwable e) {
            log.error("schedule dispatchBehindBytes error.", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

3.12 Update NameServer Addresses

Broker periodically fetches NameServer addresses and updates its local cache.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
        } catch (Throwable e) {
            log.error("ScheduledTask fetchNameServerAddr exception", e);
        }
    }
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);

3.13 Print Master‑Slave Offset Difference

Broker prints the offset difference between master and slave nodes periodically.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.printMasterAndSlaveDiff();
        } catch (Throwable e) {
            log.error("schedule printMasterAndSlaveDiff error.", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

3.14 Register with NameServer

Broker registers itself with all NameServers every 30 seconds (configurable, max 60 seconds).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

3.15 Sync Slave

Master synchronizes configuration and offset data to slaves every 10 seconds.

slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.slaveSynchronize.syncAll();
        } catch (Throwable e) {
            log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
        }
    }
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);

3.16 Delete Expired Files

Broker periodically (default every 10 seconds) cleans up expired CommitLog and ConsumeQueue files.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

3.17 File Size Check

Every 10 minutes the broker checks that the size of CommitLog files matches the expected offset difference.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.checkSelf();
    }
}, 1, 10, TimeUnit.MINUTES);

3.18 Record Stack Mapping (Debug)

If debugLockEnable is true, the broker records stack traces of threads holding locks every second.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
            try {
                if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
                    long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
                    if (lockTime > 1000 && lockTime < 10000000) {
                        String stack = UtilAll.jstack();
                        final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" +
                                DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
                        MixAll.string2FileNotSafe(stack, fileName);
                    }
                }
            } catch (Exception e) {}
        }
    }
}, 1, 1, TimeUnit.SECONDS);

3.19 Disk Space Check

Every 10 seconds the broker checks whether the disk storing CommitLog is full.

this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
    }
}, 1000L, 10000L, TimeUnit.MILLISECONDS);

3.20 Persist Delayed Message Offsets

RocketMQ defines 18 delay levels; the broker periodically (default every 10 seconds) persists the offset table for each level.

this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            if (started.get()) {
                ScheduleMessageService.this.persist();
            }
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate flush exception", e);
        }
    }
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);

3.21 Close Abnormal Connections

Broker scans all long‑lived connections (producers, consumers, FilterServers) and closes those that are inactive.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            ClientHousekeepingService.this.scanExceptionChannel();
        } catch (Throwable e) {
            log.error("Error occurred when scan not active client channels.", e);
        }
    }
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

3.22 Clean Expired Requests (Fast Failure)

If fast‑failure mode is enabled, the broker cleans expired requests every 10 ms, but only when the system is not busy.

scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            fetchTopicMessageQueuesAndCompare();
        } catch (Exception e) {
            log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
        }
    }
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);

4. NameServer Tasks

4.1 Scan Expired Brokers

NameServer periodically (every 10 seconds) scans its broker list and removes entries whose registration has not been refreshed for more than 120 seconds.

this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

4.2 Print Configuration

NameServer loads KV configuration files at startup and prints the current config table every minute.

this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);

5. Summary

RocketMQ contains numerous scheduled tasks that enhance its design by handling business processing, monitoring, heartbeats, cleanup, connection management, and data persistence. Understanding these tasks provides deeper insight into RocketMQ's architecture and operational principles.

backenddistributed systemsJavaMessage Queuerocketmqscheduled tasks
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.