Cloud Native 19 min read

Unified Messaging with Spring Cloud Stream: One Codebase for Multiple MQs

This article demonstrates how Spring Cloud Stream provides a unified programming model that decouples business logic from the underlying message broker, allowing a single codebase to work with RocketMQ, Kafka, or RabbitMQ by only changing configuration, and walks through project structure, implementation, conditional routing, MQ switching, testing, and common pitfalls.

Coder Trainee
Coder Trainee
Coder Trainee
Unified Messaging with Spring Cloud Stream: One Codebase for Multiple MQs

Goal

Show how to use Spring Cloud Stream to achieve a unified programming model where business code is independent of the specific MQ implementation, enabling a single codebase to support RocketMQ, Kafka, or RabbitMQ with only configuration changes.

Core Concepts

Spring Cloud Stream introduces three main abstractions:

Binder : integration component for a concrete MQ (e.g., RocketMQ, Kafka).

Binding : connects a message channel to a Binder.

Input/Output : represent consumer and producer channels respectively.

These concepts are analogous to database drivers, connections, and tables.

Project Structure

spring-cloud-stream-ep02/
├── stream-producer/      # producer module
│   ├── pom.xml
│   └── src/main/java/.../producer/
│       ├── ProducerApplication.java
│       ├── config/StreamConfig.java
│       ├── source/OrderSource.java
│       ├── controller/MessageController.java
│       └── service/OrderEventService.java
├── stream-consumer/      # consumer module
│   ├── pom.xml
│   └── src/main/java/.../consumer/
│       ├── ConsumerApplication.java
│       ├── config/StreamConfig.java
│       ├── sink/OrderSink.java
│       └── listener/OrderEventListener.java
└── docker-compose.yml

Implementation Details

Parent POM

<project xmlns="http://maven.apache.org/POM/4.0.0">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.teaching</groupId>
  <artifactId>stream-parent</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <packaging>pom</packaging>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.4</version>
  </parent>
  <properties>
    <java.version>17</java.version>
    <spring-cloud.version>2023.0.1</spring-cloud.version>
  </properties>
  <modules>
    <module>stream-producer</module>
    <module>stream-consumer</module>
  </modules>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
</project>

Producer Module Dependencies

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>
</dependencies>

Application Configuration (RocketMQ)

server:
  port: 8081
spring:
  application:
    name: stream-producer
  cloud:
    stream:
      binders:
        rocketmq-binder:
          type: rocketmq
          environment:
            spring.cloud.stream.rocketmq.binder.name-server: localhost:9876
      bindings:
        order-output:
          destination: order-topic
          content-type: application/json
          binder: rocketmq-binder
          producer:
            required-groups: order-group
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          order-output:
            producer:
              group: order-producer-group
              send-message-timeout: 3000
              retry-times-when-send-failed: 2
logging:
  level:
    org.springframework.cloud.stream: DEBUG

Producer Application

package com.teaching.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.teaching.producer.source.OrderSource;

@SpringBootApplication
@EnableBinding(OrderSource.class)
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
        System.out.println("✅ Stream 生产者启动成功!");
    }
}

OrderSource (Output Channel)

package com.teaching.producer.source;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OrderSource {
    String OUTPUT = "order-output";
    @Output(OUTPUT)
    MessageChannel orderOutput();
}

OrderEventService (Message Builder)

package com.teaching.producer.service;

import com.teaching.producer.source.OrderSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.UUID;

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventService {
    private final OrderSource orderSource;

    public void sendOrderCreatedMessage(Long orderId, Long userId, Long productId, Integer quantity) {
        OrderMessage messageBody = OrderMessage.builder()
            .messageId(UUID.randomUUID().toString())
            .orderId(orderId)
            .userId(userId)
            .productId(productId)
            .quantity(quantity)
            .status("CREATED")
            .timestamp(LocalDateTime.now())
            .build();
        Message<OrderMessage> message = MessageBuilder.withPayload(messageBody)
            .setHeader("X-ORDER-TYPE", "CREATE")
            .build();
        boolean sent = orderSource.orderOutput().send(message);
        if (sent) {
            log.info("消息发送成功: orderId={}, messageId={}", orderId, messageBody.getMessageId());
        } else {
            log.error("消息发送失败: orderId={}", orderId);
        }
    }

    public void sendOrderPaidMessage(Long orderId, Long userId, Long amount) {
        OrderMessage messageBody = OrderMessage.builder()
            .messageId(UUID.randomUUID().toString())
            .orderId(orderId)
            .userId(userId)
            .amount(amount)
            .status("PAID")
            .timestamp(LocalDateTime.now())
            .build();
        Message<OrderMessage> message = MessageBuilder.withPayload(messageBody)
            .setHeader("X-ORDER-TYPE", "PAY")
            .build();
        orderSource.orderOutput().send(message);
        log.info("支付消息发送成功: orderId={}", orderId);
    }

    @lombok.Data
    @lombok.Builder
    @lombok.NoArgsConstructor
    @lombok.AllArgsConstructor
    public static class OrderMessage {
        private String messageId;
        private Long orderId;
        private Long userId;
        private Long productId;
        private Integer quantity;
        private Long amount;
        private String status;
        private LocalDateTime timestamp;
    }
}

MessageController (REST API)

package com.teaching.producer.controller;

import com.teaching.producer.service.OrderEventService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/api/message")
@RequiredArgsConstructor
public class MessageController {
    private final OrderEventService orderEventService;

    @PostMapping("/order")
    public Map<String, Object> sendOrderMessage(@RequestParam Long orderId,
                                                @RequestParam Long userId,
                                                @RequestParam Long productId,
                                                @RequestParam Integer quantity) {
        orderEventService.sendOrderCreatedMessage(orderId, userId, productId, quantity);
        Map<String, Object> result = new HashMap<>();
        result.put("code", 200);
        result.put("message", "消息发送成功");
        result.put("orderId", orderId);
        return result;
    }

    @PostMapping("/pay")
    public Map<String, Object> sendPayMessage(@RequestParam Long orderId,
                                               @RequestParam Long userId,
                                               @RequestParam Long amount) {
        orderEventService.sendOrderPaidMessage(orderId, userId, amount);
        Map<String, Object> result = new HashMap<>();
        result.put("code", 200);
        result.put("message", "支付消息发送成功");
        return result;
    }
}

Consumer Side

Consumer Dependencies

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>
</dependencies>

Consumer Application Configuration (RocketMQ)

server:
  port: 8082
spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders:
        rocketmq-binder:
          type: rocketmq
          environment:
            spring.cloud.stream.rocketmq.binder.name-server: localhost:9876
      bindings:
        order-input:
          destination: order-topic
          content-type: application/json
          binder: rocketmq-binder
          group: order-consumer-group
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          order-input:
            consumer:
              enabled: true
              tags: '*'
              pull-interval: 1000
logging:
  level:
    org.springframework.cloud.stream: DEBUG

Consumer Application

package com.teaching.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.teaching.consumer.sink.OrderSink;

@SpringBootApplication
@EnableBinding(OrderSink.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
        System.out.println("✅ Stream 消费者启动成功!");
    }
}

OrderSink (Input Channel)

package com.teaching.consumer.sink;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface OrderSink {
    String INPUT = "order-input";
    @Input(INPUT)
    SubscribableChannel orderInput();
}

OrderEventListener (Message Handling)

package com.teaching.consumer.listener;

import com.teaching.consumer.sink.OrderSink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderEventListener {
    @StreamListener(OrderSink.INPUT)
    public void handleOrderMessage(@Payload OrderMessage message,
                                   @Header("X-ORDER-TYPE") String orderType) {
        log.info("收到订单消息: orderType={}, message={}", orderType, message);
        switch (message.getStatus()) {
            case "CREATED":
                handleOrderCreated(message);
                break;
            case "PAID":
                handleOrderPaid(message);
                break;
            default:
                log.warn("未知消息类型: {}", message.getStatus());
        }
    }

    private void handleOrderCreated(OrderMessage message) {
        log.info("处理订单创建事件: orderId={}, userId={}, productId={}, quantity={}",
                message.getOrderId(), message.getUserId(), message.getProductId(), message.getQuantity());
        try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        log.info("订单创建事件处理完成: orderId={}", message.getOrderId());
    }

    private void handleOrderPaid(OrderMessage message) {
        log.info("处理订单支付事件: orderId={}, userId={}, amount={}",
                message.getOrderId(), message.getUserId(), message.getAmount());
        log.info("订单支付事件处理完成: orderId={}", message.getOrderId());
    }

    @lombok.Data
    public static class OrderMessage {
        private String messageId;
        private Long orderId;
        private Long userId;
        private Long productId;
        private Integer quantity;
        private Long amount;
        private String status;
        private String timestamp;
    }
}

Conditional Routing (Header‑Based)

package com.teaching.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
@Slf4j
public class OrderConditionalListener {
    @StreamListener(target = OrderSink.INPUT, condition = "headers['X-ORDER-TYPE'] == 'CREATE'")
    public void handleCreateOrder(Map<String, Object> message, @Header("X-ORDER-TYPE") String orderType) {
        log.info("订单创建消息: {}", message);
    }

    @StreamListener(target = OrderSink.INPUT, condition = "headers['X-ORDER-TYPE'] == 'PAY'")
    public void handlePayOrder(Map<String, Object> message, @Header("X-ORDER-TYPE") String orderType) {
        log.info("订单支付消息: {}", message);
    }
}

Switching MQ (RocketMQ → Kafka)

Only the binders section and the binder reference need to be changed; the business code remains untouched.

# Before (RocketMQ)
spring:
  cloud:
    stream:
      binders:
        rocketmq-binder:
          type: rocketmq
      bindings:
        order-output:
          destination: order-topic
          binder: rocketmq-binder

# After (Kafka)
spring:
  cloud:
    stream:
      binders:
        kafka-binder:
          type: kafka
          environment:
            spring.kafka.bootstrap-servers: localhost:9092
      bindings:
        order-output:
          destination: order-topic
          binder: kafka-binder

Testing Procedure

Start RocketMQ (or Kafka) with docker-compose up -d.

Run the producer and consumer modules via mvn spring-boot:run.

Send test messages:

curl -X POST "http://localhost:8081/api/message/order?orderId=1001&userId=1&productId=100&quantity=2"
curl -X POST "http://localhost:8081/api/message/pay?orderId=1001&userId=1&amount=19800"

Observe consumer logs showing receipt and processing of the messages.

Common Issues & Pitfalls

1. @EnableBinding is deprecated

Spring Cloud Stream 4.x recommends a functional style using @Bean Consumer<OrderMessage> orderConsumer(), but the legacy annotation still works.

2. Binder dependency conflicts

Only include a single Binder dependency (e.g., RocketMQ or Kafka). Replace the dependency when switching.

3. Message serialization problems

Force JSON payloads by setting

spring.cloud.stream.bindings.order-output.content-type=application/json

for all binders.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaMicroservicesKafkaRocketMQMessagingMessage-drivenSpring Cloud Stream
Coder Trainee
Written by

Coder Trainee

Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.