Ensuring Reliable Message Consistency with Spring Boot & RocketMQ
This guide explains the principle and step‑by‑step implementation of reliable transactional messaging using Spring Boot 2.3.9 and RocketMQ 4.8.0, covering execution flow, broker handling, project structure, code examples for entities, repositories, services, listeners, and testing.
Reliable Message Consistency Principle
Environment: Spring Boot 2.3.9 + RocketMQ 4.8.0
Execution Process
Producer sends a Prepare message to the broker.
After the Prepare message is acknowledged, the local transaction starts.
If the local transaction succeeds, the producer returns
commit; otherwise it returns
rollback(decided by the developer in the transaction callback).
Producer sends the resulting
commitor
rollbackto the broker, which leads to two possible broker behaviours:
1. Broker receives commit / rollback : If commit , the broker treats the whole transaction as successful and delivers the message to the consumer. If rollback , the broker deletes the half message and does not deliver it. 2. Broker does not receive a confirmation (e.g., the local transaction crashes and returns UNKNOWN ), so the broker periodically checks the transaction status. If the check returns commit , the message is delivered; if it returns rollback or remains UNKNOWN , the half message is removed. The check interval and retry count are configurable.
Project Structure
The project uses a parent‑child Maven layout with two sub‑modules:
account‑managerand
integral‑manager.
Dependencies
<code><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency></code>Account Module
Configuration file
<code>server:
port: 8081
---
rocketmq:
nameServer: localhost:9876
producer:
group: pack-mq
---
spring:
jpa:
generateDdl: false
hibernate:
ddlAuto: update
openInView: true
show-sql: true
---
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8
username: root
password: ******
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
autoCommit: true
idleTimeout: 30000
poolName: MasterDatabookHikariCP
maxLifetime: 1800000
connectionTimeout: 30000
connectionTestQuery: SELECT 1</code>Entity classes
<code>// User table
@Entity
@Table(name = "t_account")
public class Account {
@Id
private Long id;
private String name;
}
// Business log table (used for deduplication)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
@Id
private Long txid;
private Date createTime;
}</code>DAO interfaces
<code>public interface AccountRepository extends JpaRepository<Account, Long> {}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {}</code>Service methods
<code>@Resource
private AccountRepository accountRepository;
@Resource
private AccountLogRepository accountLogRepository;
@Transactional
public boolean register(Account account) {
accountRepository.save(account);
AccountLog accountLog = new AccountLog(account.getId(), new Date());
accountLogRepository.save(accountLog);
return true;
}
public AccountLog existsTxId(Long txid) {
return accountLogRepository.findById(txid).orElse(null);
}</code>Message sending
<code>@Resource
private RocketMQTemplate rocketMQTemplate;
public String sendTx(String topic, String tags, Account account) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
topic + ":" + tags,
MessageBuilder.withPayload(account).setHeader("tx_id", uuid).build(),
uuid);
return result.getSendStatus().name();
}</code>Producer transaction listener
<code>@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
@Resource
private AccountService accountService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
accountService.register(account);
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
AccountLog accountLog = accountService.existsTxId(account.getId());
if (accountLog == null) {
return RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}</code>Controller
<code>@RestController
@RequestMapping("/accounts")
public class AccountController {
@Resource
private ProducerMessageService messageService;
@PostMapping("/send")
public Object sendMessage(@RequestBody Account account) {
return messageService.sendTx("tx-topic", "mks", account);
}
}</code>Integral Module
Entity
<code>@Entity
@Table(name = "t_integral")
public class Integral {
@Id
private Long id;
private Integer score;
private Long acccountId;
}</code>DAO
<code>public interface IntegralRepository extends JpaRepository<Integral, Long> {}</code>Service
<code>@Resource
private IntegralRepository integralRepository;
@Transactional
public Integral saveIntegral(Integral integral) {
return integralRepository.save(integral);
}</code>Message listener
<code>@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {
@Resource
private IntegralService integralService;
@Override
public void onMessage(String message) {
System.out.println("Integral received message: " + message);
try {
Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class);
Integer id = (Integer) jsonMap.get("id");
integralService.saveIntegral(new Integral(1L, 1000, id + 0L));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}</code>Testing
Start both sub‑modules, initialize the database tables, and use Postman to call
/accounts/send. When the Account module’s local transaction fails, the transaction rolls back, the half message is deleted, and the Integral module never receives the message.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.