Backend Development 13 min read

Using RocketMQ for Peak Shaving in Spring Boot: Configuration, Consumer Tuning, and Batch Consumption

This article explains how to integrate RocketMQ with Spring Boot to implement peak‑shaving, detailing core components, consumer pull parameters, one‑way sending, batch consumption configuration, and practical performance testing with code examples and deployment tips.

Architecture Digest
Architecture Digest
Architecture Digest
Using RocketMQ for Peak Shaving in Spring Boot: Configuration, Consumer Tuning, and Batch Consumption

RocketMQ’s main characteristics are decoupling, asynchronous processing, and peak‑shaving; the article records personal experience using RocketMQ to reduce database pressure in a high‑traffic "like" service.

Core components : Producer (sends messages), Broker (stores messages), Consumer (pulls and processes messages), NameServer (routes producers/consumers to brokers).

Consumer flow notes :

Consumers pull messages from brokers rather than brokers pushing them.

Messages are evenly distributed across queues; many configuration parameters target queues, not topics.

Each broker’s queue count (ConsumeQueue) can be adjusted in real‑time via RocketMQ Dashboard.

rocketmq‑spring‑boot‑starter Overview

For quick integration, rocketmq-spring-boot-starter can be used, but it does not cover all configurations (e.g., batch consumption requires a custom DefaultMQPushConsumer bean).

RocketMQListener – interface that consumer classes must implement; method onMessage(msg) .

RocketMQPushConsumerLifecycleListener – allows custom consumer configuration when @RocketMQMessageListener defaults are insufficient.

@RocketMQMessageListener – annotates a bean as a consumer, specifying topic and consumerGroup ; properties can be injected via placeholders.

Business Case

A "like" feature allows unlimited rapid clicks; inserting each click directly into the database would overwhelm it. By sending each click as a message to RocketMQ and configuring the consumer to pull a limited number of messages per interval, the system smooths traffic spikes.

Environment Setup

Example environment: 1 NameServer + 2 Brokers + 1 Consumer

Adding Maven Dependency

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

application.yml Configuration

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: praise-group
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: tiger
    url: jdbc:mysql://localhost:3306/wilson
  swagger:
    docket:
      base-package: io.rocket.consumer.controller

Like Interface

@Data
public class PraiseRecord implements Serializable {
    private Long id;
    private Long uid;
    private Long liveId;
    private LocalDateTime createTime;
}
@RestController
@RequestMapping("/message")
public class MessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/praise")
    public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
        rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC,
            MessageBuilder.withPayload(vo).build());
        return ServerResponse.success();
    }
}

Because the business tolerates occasional message loss, sendOneWay() is used for higher throughput.

PraiseListener – Consumer

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC,
    consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener
,
        RocketMQPushConsumerLifecycleListener {
    @Resource
    private PraiseRecordService praiseRecordService;

    @Override
    public void onMessage(PraiseRecordVO vo) {
        praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setPullInterval(2000); // ms
        consumer.setPullBatchSize(16);
    }
}

The pull interval is set to 2 seconds and each pull fetches 16 messages per queue. With 2 brokers each having 4 queues, the theoretical maximum per pull is 16 × 2 × 4 = 128 messages.

Consumer Parameter Details

pullInterval : interval between pulls (ms).

pullBatchSize : number of messages fetched per queue per pull.

consumeMessageBatchMaxSize : maximum number of messages processed in a single batch (default 1; batch consumption not supported in starter version 2.1.0).

When pullBatchSize exceeds the broker’s MessageStoreConfig.maxTransferCountOnMessageInMemory (default 32), the broker parameter must be increased.

Dynamic Adjustment of Consumption Rate

If the broker’s max transfer count is already at 32 and you need higher throughput without restarting the broker, increase the number of queues per broker (e.g., from 4 to 8) to raise the theoretical pull capacity.

Batch Consumption

Although the starter does not provide batch consumption, a custom DefaultMQPushConsumer can be configured with consumeMessageBatchMaxSize . Example bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
    consumer.setPullInterval(1000);
    consumer.setPullBatchSize(24);
    consumer.setConsumeMessageBatchMaxSize(12);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        List
userInfos = new ArrayList<>(msgs.size());
        // convert and process batch here
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }));
    consumer.start();
    return consumer;
}

When consumeMessageBatchMaxSize is larger than pullBatchSize , the effective batch size is limited by pullBatchSize .

Testing Results

Performance tests show that, after the initial pull, the consumer processes roughly 128 messages every 2 seconds, matching the configured parameters. The system successfully smooths traffic spikes, keeping database insert rates stable during peak periods.

For full source code and deployment scripts, see the GitHub repository linked at the end of the original article.

Backend DevelopmentSpring BootMessage QueueRocketMQpeak shavingBatch ConsumptionConsumer Tuning
Architecture Digest
Written by

Architecture Digest

Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.