Unified Messaging with Spring Cloud Stream: One Codebase for Multiple MQs
This article demonstrates how Spring Cloud Stream provides a unified programming model that decouples business logic from the underlying message broker, allowing a single codebase to work with RocketMQ, Kafka, or RabbitMQ by only changing configuration, and walks through project structure, implementation, conditional routing, MQ switching, testing, and common pitfalls.
Goal
Show how to use Spring Cloud Stream to achieve a unified programming model where business code is independent of the specific MQ implementation, enabling a single codebase to support RocketMQ, Kafka, or RabbitMQ with only configuration changes.
Core Concepts
Spring Cloud Stream introduces three main abstractions:
Binder : integration component for a concrete MQ (e.g., RocketMQ, Kafka).
Binding : connects a message channel to a Binder.
Input/Output : represent consumer and producer channels respectively.
These concepts are analogous to database drivers, connections, and tables.
Project Structure
spring-cloud-stream-ep02/
├── stream-producer/ # producer module
│ ├── pom.xml
│ └── src/main/java/.../producer/
│ ├── ProducerApplication.java
│ ├── config/StreamConfig.java
│ ├── source/OrderSource.java
│ ├── controller/MessageController.java
│ └── service/OrderEventService.java
├── stream-consumer/ # consumer module
│ ├── pom.xml
│ └── src/main/java/.../consumer/
│ ├── ConsumerApplication.java
│ ├── config/StreamConfig.java
│ ├── sink/OrderSink.java
│ └── listener/OrderEventListener.java
└── docker-compose.ymlImplementation Details
Parent POM
<project xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<groupId>com.teaching</groupId>
<artifactId>stream-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
</parent>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2023.0.1</spring-cloud.version>
</properties>
<modules>
<module>stream-producer</module>
<module>stream-consumer</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>Producer Module Dependencies
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>Application Configuration (RocketMQ)
server:
port: 8081
spring:
application:
name: stream-producer
cloud:
stream:
binders:
rocketmq-binder:
type: rocketmq
environment:
spring.cloud.stream.rocketmq.binder.name-server: localhost:9876
bindings:
order-output:
destination: order-topic
content-type: application/json
binder: rocketmq-binder
producer:
required-groups: order-group
rocketmq:
binder:
name-server: localhost:9876
bindings:
order-output:
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
logging:
level:
org.springframework.cloud.stream: DEBUGProducer Application
package com.teaching.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.teaching.producer.source.OrderSource;
@SpringBootApplication
@EnableBinding(OrderSource.class)
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
System.out.println("✅ Stream 生产者启动成功!");
}
}OrderSource (Output Channel)
package com.teaching.producer.source;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OrderSource {
String OUTPUT = "order-output";
@Output(OUTPUT)
MessageChannel orderOutput();
}OrderEventService (Message Builder)
package com.teaching.producer.service;
import com.teaching.producer.source.OrderSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventService {
private final OrderSource orderSource;
public void sendOrderCreatedMessage(Long orderId, Long userId, Long productId, Integer quantity) {
OrderMessage messageBody = OrderMessage.builder()
.messageId(UUID.randomUUID().toString())
.orderId(orderId)
.userId(userId)
.productId(productId)
.quantity(quantity)
.status("CREATED")
.timestamp(LocalDateTime.now())
.build();
Message<OrderMessage> message = MessageBuilder.withPayload(messageBody)
.setHeader("X-ORDER-TYPE", "CREATE")
.build();
boolean sent = orderSource.orderOutput().send(message);
if (sent) {
log.info("消息发送成功: orderId={}, messageId={}", orderId, messageBody.getMessageId());
} else {
log.error("消息发送失败: orderId={}", orderId);
}
}
public void sendOrderPaidMessage(Long orderId, Long userId, Long amount) {
OrderMessage messageBody = OrderMessage.builder()
.messageId(UUID.randomUUID().toString())
.orderId(orderId)
.userId(userId)
.amount(amount)
.status("PAID")
.timestamp(LocalDateTime.now())
.build();
Message<OrderMessage> message = MessageBuilder.withPayload(messageBody)
.setHeader("X-ORDER-TYPE", "PAY")
.build();
orderSource.orderOutput().send(message);
log.info("支付消息发送成功: orderId={}", orderId);
}
@lombok.Data
@lombok.Builder
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public static class OrderMessage {
private String messageId;
private Long orderId;
private Long userId;
private Long productId;
private Integer quantity;
private Long amount;
private String status;
private LocalDateTime timestamp;
}
}MessageController (REST API)
package com.teaching.producer.controller;
import com.teaching.producer.service.OrderEventService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/api/message")
@RequiredArgsConstructor
public class MessageController {
private final OrderEventService orderEventService;
@PostMapping("/order")
public Map<String, Object> sendOrderMessage(@RequestParam Long orderId,
@RequestParam Long userId,
@RequestParam Long productId,
@RequestParam Integer quantity) {
orderEventService.sendOrderCreatedMessage(orderId, userId, productId, quantity);
Map<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("message", "消息发送成功");
result.put("orderId", orderId);
return result;
}
@PostMapping("/pay")
public Map<String, Object> sendPayMessage(@RequestParam Long orderId,
@RequestParam Long userId,
@RequestParam Long amount) {
orderEventService.sendOrderPaidMessage(orderId, userId, amount);
Map<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("message", "支付消息发送成功");
return result;
}
}Consumer Side
Consumer Dependencies
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>Consumer Application Configuration (RocketMQ)
server:
port: 8082
spring:
application:
name: stream-consumer
cloud:
stream:
binders:
rocketmq-binder:
type: rocketmq
environment:
spring.cloud.stream.rocketmq.binder.name-server: localhost:9876
bindings:
order-input:
destination: order-topic
content-type: application/json
binder: rocketmq-binder
group: order-consumer-group
rocketmq:
binder:
name-server: localhost:9876
bindings:
order-input:
consumer:
enabled: true
tags: '*'
pull-interval: 1000
logging:
level:
org.springframework.cloud.stream: DEBUGConsumer Application
package com.teaching.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.teaching.consumer.sink.OrderSink;
@SpringBootApplication
@EnableBinding(OrderSink.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
System.out.println("✅ Stream 消费者启动成功!");
}
}OrderSink (Input Channel)
package com.teaching.consumer.sink;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface OrderSink {
String INPUT = "order-input";
@Input(INPUT)
SubscribableChannel orderInput();
}OrderEventListener (Message Handling)
package com.teaching.consumer.listener;
import com.teaching.consumer.sink.OrderSink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OrderEventListener {
@StreamListener(OrderSink.INPUT)
public void handleOrderMessage(@Payload OrderMessage message,
@Header("X-ORDER-TYPE") String orderType) {
log.info("收到订单消息: orderType={}, message={}", orderType, message);
switch (message.getStatus()) {
case "CREATED":
handleOrderCreated(message);
break;
case "PAID":
handleOrderPaid(message);
break;
default:
log.warn("未知消息类型: {}", message.getStatus());
}
}
private void handleOrderCreated(OrderMessage message) {
log.info("处理订单创建事件: orderId={}, userId={}, productId={}, quantity={}",
message.getOrderId(), message.getUserId(), message.getProductId(), message.getQuantity());
try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
log.info("订单创建事件处理完成: orderId={}", message.getOrderId());
}
private void handleOrderPaid(OrderMessage message) {
log.info("处理订单支付事件: orderId={}, userId={}, amount={}",
message.getOrderId(), message.getUserId(), message.getAmount());
log.info("订单支付事件处理完成: orderId={}", message.getOrderId());
}
@lombok.Data
public static class OrderMessage {
private String messageId;
private Long orderId;
private Long userId;
private Long productId;
private Integer quantity;
private Long amount;
private String status;
private String timestamp;
}
}Conditional Routing (Header‑Based)
package com.teaching.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
public class OrderConditionalListener {
@StreamListener(target = OrderSink.INPUT, condition = "headers['X-ORDER-TYPE'] == 'CREATE'")
public void handleCreateOrder(Map<String, Object> message, @Header("X-ORDER-TYPE") String orderType) {
log.info("订单创建消息: {}", message);
}
@StreamListener(target = OrderSink.INPUT, condition = "headers['X-ORDER-TYPE'] == 'PAY'")
public void handlePayOrder(Map<String, Object> message, @Header("X-ORDER-TYPE") String orderType) {
log.info("订单支付消息: {}", message);
}
}Switching MQ (RocketMQ → Kafka)
Only the binders section and the binder reference need to be changed; the business code remains untouched.
# Before (RocketMQ)
spring:
cloud:
stream:
binders:
rocketmq-binder:
type: rocketmq
bindings:
order-output:
destination: order-topic
binder: rocketmq-binder
# After (Kafka)
spring:
cloud:
stream:
binders:
kafka-binder:
type: kafka
environment:
spring.kafka.bootstrap-servers: localhost:9092
bindings:
order-output:
destination: order-topic
binder: kafka-binderTesting Procedure
Start RocketMQ (or Kafka) with docker-compose up -d.
Run the producer and consumer modules via mvn spring-boot:run.
Send test messages:
curl -X POST "http://localhost:8081/api/message/order?orderId=1001&userId=1&productId=100&quantity=2"
curl -X POST "http://localhost:8081/api/message/pay?orderId=1001&userId=1&amount=19800"Observe consumer logs showing receipt and processing of the messages.
Common Issues & Pitfalls
1. @EnableBinding is deprecated
Spring Cloud Stream 4.x recommends a functional style using @Bean Consumer<OrderMessage> orderConsumer(), but the legacy annotation still works.
2. Binder dependency conflicts
Only include a single Binder dependency (e.g., RocketMQ or Kafka). Replace the dependency when switching.
3. Message serialization problems
Force JSON payloads by setting
spring.cloud.stream.bindings.order-output.content-type=application/jsonfor all binders.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Coder Trainee
Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
