Prevent Message Loss in Spring Boot 2.7.9 with RabbitMQ Confirm Callbacks
This guide explains common message loss scenarios in Spring Boot 2.7.9 with RabbitMQ and provides step‑by‑step solutions—including producer confirm callbacks, durable exchanges and queues, manual consumer acknowledgments, and configurable retry mechanisms—to ensure reliable message delivery.
Environment
SpringBoot 2.7.9
Message loss scenarios
Producer loses messages: data sent by the producer fails to reach the MQ server due to network issues.
MQ server loses messages: messages are not persisted or the server is restarted, causing loss.
Consumer loses messages: after receiving a message, the service crashes before processing completes (automatic ACK).
Producer loss solution – confirm mechanism
Add the AMQP starter dependency:
<code><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</code>Declare exchange, queue and binding beans:
<code>@Bean
public TopicExchange topicExchange() {
return new TopicExchange("akf.exchange", true, false);
}
@Bean
public Queue queue() {
return new Queue("akf.queue", true, false, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("akf.#");
}
</code>Configure RabbitMQ properties:
<code>spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
template:
mandatory: true
</code>Implement callbacks via ApplicationContextAware :
<code>@Component
public class ConfigRabbitTemplate implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlation: " + correlationData);
if (ack) {
System.out.println("消息发送到交换机");
} else {
System.out.println("消息发送失败 - " + ", cause" + cause);
}
}
});
rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned.getExchange() + ", " + returned.getRoutingKey() + ", " + returned.getReplyCode() + ", " + returned.getMessage().toString());
}
});
}
}
</code>Testing with an incorrect exchange or routing key will trigger the above logs.
MQ server loss mitigation
Declare durable exchange and queue (the second parameter of TopicExchange and the first boolean of Queue control persistence):
<code>@Bean
public TopicExchange topicExchange() {
return new TopicExchange("akf.exchange", true, false);
}
@Bean
public Queue queue() {
return new Queue("akf.queue", true, false, false);
}
</code>Set message persistence:
<code>Message message = MessageBuilder.withBody("Hello".getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
</code>Consumer loss mitigation
Disable automatic acknowledgment and use manual ACK mode:
<code>spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: manual # 设置为手动应答
</code>Message listener with manual ACK:
<code>@RabbitListener(queues = {"akf.queue"})
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("接收到消息:" + new String(message.getBody()));
// business logic here
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// handle exception, typically discard or send to dead‑letter queue
}
}
</code>Message retry configuration
Enable retry in the listener configuration:
<code>spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: auto
concurrency: 1
retry:
enabled: true
initialInterval: 1000
multiplier: 3
maxInterval: 20000
maxAttempts: 4 # 重试4次,1s, 3s, 9s
stateless: true # 若业务有事务需设为 false
</code>Define a MessageRecoverer bean to handle messages that exceed retry attempts. The example uses RepublishMessageRecoverer to forward them to a dedicated error exchange/queue:
<code>@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
</code>After the configured retry count is exhausted, the message is republished to the error queue.
Conclusion
By applying producer confirm callbacks, durable exchanges/queues, manual consumer acknowledgments, and Spring’s retry mechanism with a custom MessageRecoverer , you can achieve reliable message delivery in a Spring Boot 2.7.9 application using RabbitMQ.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.