How to Use RocketMQ with Spring Cloud Stream: A Beginner’s Guide

This tutorial explains why microservices need a message queue, outlines RocketMQ’s core concepts, shows how to set up RocketMQ with Docker Compose, integrates it into Spring Boot projects with detailed producer and consumer code, provides testing steps, and lists common pitfalls and solutions.

Coder Trainee
Coder Trainee
Coder Trainee
How to Use RocketMQ with Spring Cloud Stream: A Beginner’s Guide

1. Why a Message Queue?

Microservice calls that rely on synchronous RPC (e.g., Feign) suffer from tight coupling, poor performance, and inability to handle traffic spikes because each service must wait for downstream responses.

Problems

Severe coupling – a downstream failure blocks the upstream service.

Performance – total latency equals the sum of all service latencies.

Traffic spikes – sudden peaks can overwhelm downstream services.

No buffering – the system cannot smooth burst traffic.

Benefits After Introducing a Message Queue

Decoupling – the producer does not need to know who consumes the message.

Peak‑shaving – the queue buffers traffic and smooths spikes.

Asynchrony – the producer can return immediately after sending.

Reliability – messages are persisted and not lost.

2. Core Concepts of RocketMQ

Producer : the message sender.

Consumer : the message receiver.

Topic : logical channel for categorizing messages.

Message : the payload.

Broker : the server that stores and forwards messages.

NameServer : the routing registry.

3. Environment Setup

Use Docker Compose to launch a NameServer, a Broker, and the RocketMQ Dashboard.

version: '3.8'
services:
  namesrv:
    image: apache/rocketmq:5.1.4
    container_name: rmqnamesrv
    ports:
      - "9876:9876"
    command: sh mqnamesrv
    networks:
      - rocketmq-network

  broker:
    image: apache/rocketmq:5.1.4
    container_name: rmqbroker
    ports:
      - "10911:10911"
      - "10912:10912"
    environment:
      - NAMESRV_ADDR=namesrv:9876
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-5.1.4/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
    depends_on:
      - namesrv
    networks:
      - rocketmq-network

  dashboard:
    image: apache/rocketmq-dashboard:latest
    container_name: rmqdashboard
    ports:
      - "8088:8080"
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq-network

networks:
  rocketmq-network:
    driver: bridge

Broker configuration (broker.conf) must contain the correct host IP, e.g.:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
brokerIP1 = 192.168.1.100

4. Spring Boot Integration

Project Structure

spring-cloud-mq-ep01/
├── mq-producer/          # message producer
│   ├── pom.xml
│   └── src/main/java/.../producer/
│       ├── ProducerApplication.java
│       ├── config/RocketMQConfig.java
│       ├── controller/MessageController.java
│       └── service/OrderEventProducer.java
├── mq-consumer/          # message consumer
│   ├── pom.xml
│   └── src/main/java/.../consumer/
│       ├── ConsumerApplication.java
│       ├── config/RocketMQConfig.java
│       └── listener/OrderEventListener.java
└── docker-compose.yml

Parent POM Dependencies

<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.4</version>
  </parent>
  <properties>
    <java.version>17</java.version>
    <rocketmq.version>2.2.3</rocketmq.version>
  </properties>
  <modules>
    <module>mq-producer</module>
    <module>mq-consumer</module>
  </modules>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq.version}</version>
      </dependency>
    </dependencies>
  </dependencyManagement>
</project>

Producer Module

Key configuration (application.yml):

server:
  port: 8081
spring:
  application:
    name: mq-producer
rocketmq:
  name-server: localhost:9876
  producer:
    group: order-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2

Producer entry point:

package com.teaching.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
        System.out.println("✅ 消息生产者启动成功!");
    }
}

OrderEventProducer demonstrates synchronous, asynchronous, and delayed sending using RocketMQTemplate and MessageBuilder:

public void sendOrderCreatedMessage(Long orderId, Long userId, Long productId, Integer quantity) {
    OrderMessage messageBody = new OrderMessage(orderId, userId, productId, quantity, "CREATED");
    Message<OrderMessage> message = MessageBuilder.withPayload(messageBody)
        .setHeader("messageId", UUID.randomUUID().toString())
        .build();
    rocketMQTemplate.syncSend(ORDER_TOPIC, message);
    log.info("消息发送成功: orderId={}", orderId);
}

public void sendOrderCreatedMessageAsync(Long orderId, Long userId, Long productId, Integer quantity) {
    OrderMessage messageBody = new OrderMessage(orderId, userId, productId, quantity, "CREATED");
    rocketMQTemplate.asyncSend(ORDER_TOPIC, messageBody, sendResult -> {
        if (sendResult.getSendStatus().name().equals("SEND_OK")) {
            log.info("异步消息发送成功: orderId={}", orderId);
        } else {
            log.error("异步消息发送失败: orderId={}", orderId);
        }
    });
}

public void sendDelayMessage(Long orderId, int delayLevel) {
    OrderMessage messageBody = new OrderMessage(orderId, 1L, 1L, 1, "DELAY");
    rocketMQTemplate.syncSend(ORDER_TOPIC, MessageBuilder.withPayload(messageBody).build(), 3000, delayLevel);
    log.info("延迟消息已发送: orderId={}, delayLevel={}", orderId, delayLevel);
}

REST controller exposes three endpoints for normal, async, and delayed messages.

Consumer Module

Consumer configuration (application.yml) mirrors the producer’s name‑server address.

server:
  port: 8082
spring:
  application:
    name: mq-consumer
rocketmq:
  name-server: localhost:9876

Listener implementation processes the incoming map, logs the payload, and simulates business handling with a short sleep.

@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group", selectorExpression = "*")
public class OrderEventListener implements RocketMQListener<Map<String, Object>> {
    @Override
    public void onMessage(Map<String, Object> message) {
        log.info("收到订单消息: {}", message);
        Long orderId = (Long) message.get("orderId");
        // ... extract other fields ...
        log.info("处理订单事件: orderId={}, userId={}, productId={}, quantity={}, status={}",
                 orderId, userId, productId, quantity, status);
        try {
            Thread.sleep(100);
            log.info("订单事件处理完成: orderId={}", orderId);
        } catch (InterruptedException e) {
            log.error("处理失败", e);
        }
    }
}

5. Testing and Verification

Start the infrastructure and applications:

# Start RocketMQ stack
cd spring-cloud-mq-ep01
docker-compose up -d

# Start producer
cd mq-producer
mvn spring-boot:run

# Start consumer
cd ../mq-consumer
mvn spring-boot:run

Send test messages with curl:

# Normal order message
curl -X POST "http://localhost:8081/api/message/order?orderId=1001&userId=1&productId=100&quantity=2"

# Asynchronous message
curl -X POST "http://localhost:8081/api/message/order/async?orderId=1002"

# Delayed message (delayLevel=3 ≈ 10 s)
curl -X POST "http://localhost:8081/api/message/order/delay?orderId=1003&delayLevel=3"

Consumer logs show receipt and processing, e.g.:

2025-06-01 10:30:15.123 INFO 收到订单消息: {orderId=1001, userId=1, productId=100, quantity=2, status=CREATED}
2025-06-01 10:30:15.124 INFO 处理订单事件: orderId=1001, userId=1, productId=100, quantity=2, status=CREATED
2025-06-01 10:30:15.225 INFO 订单事件处理完成: orderId=1001

6. Common Issues and Pitfalls

Pitfall 1 – Broker IP Misconfiguration

Symptom: producer cannot connect to the broker.

Solution: set the correct brokerIP1 in broker.conf.

Pitfall 2 – Message Send Timeout

Symptom: messages fail to send.

Solution: increase rocketmq.producer.send-message-timeout, e.g. to 5000 ms.

rocketmq:
  producer:
    send-message-timeout: 5000

Pitfall 3 – Consumer Receives No Messages

Checklist:

Verify the topic exists.

Ensure the consumer group matches the producer’s configuration.

Confirm the NameServer address is correct.

7. Next Episode Preview

The upcoming article, "Spring Cloud Messaging (Part 2): Spring Cloud Stream Unified Programming Model", will cover the abstraction layer provided by Spring Cloud Stream, binder configuration, unified programming concepts, and multi‑broker switching.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavamicroservicesSpring Bootrocketmqmessagingspring-clouddocker-compose
Coder Trainee
Written by

Coder Trainee

Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.