Handling Duplicate Consumption in RabbitMQ: Scenarios, Tests, and Solutions
This article explains the causes of duplicate message consumption in RabbitMQ, demonstrates how to reproduce the issue with a 10,000‑message test, and provides three practical solutions using unique IDs stored in Redis to ensure idempotent processing in Java Spring applications.
RabbitMQ can deliver the same message to a consumer more than once when the consumer acknowledges the message but the acknowledgment fails due to network problems or service interruptions, causing the broker to re‑queue the message.
Duplicate Consumption Scenarios
Two main situations lead to duplicate consumption:
The producer sends duplicate messages to the broker.
The broker re‑delivers a message because the consumer's acknowledgment was not received.
While the first case can be avoided by ensuring the producer does not resend messages, the second case requires careful handling on the consumer side.
Reproducing the Issue
A test sends 10,000 messages to a queue ( queueName3 ) using a Spring @GetMapping endpoint:
@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {
String message = "server message sendToClient";
for (int i = 0; i < 10000; i++) {
amqpTemplate.convertAndSend("queueName3", message + ": " + i);
}
return message;
}A consumer listens on the same queue:
@RabbitListener(queues = "queueName3")
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("Received: " + message);
}When the consumer is stopped after processing a certain number of messages (e.g., 7,913), the broker still retains the last message because the acknowledgment was lost, leading to a duplicate delivery upon restart.
Solution Overview
To guarantee idempotent processing, each message should carry a globally unique identifier (UUID). The consumer checks this ID against a store (Redis or a database) before processing:
If the ID is absent, process the message and record the ID.
If the ID already exists, discard the duplicate.
Implementation 1 – Single‑Consumer (String)
@RabbitListener(queues = "queueName4")
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "utf-8");
String cachedId = redisUtil.get("queueName4", "");
if (cachedId.equals(messageId)) {
return; // duplicate
}
System.out.println("Message: " + msg + ", id:" + messageId);
redisUtil.set("queueName4", messageId); // store ID
}Implementation 2 – Multi‑Consumer (List)
@RabbitListener(queues = "queueName4")
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "utf-8");
List
ids = redisUtil.lrange("queueName4");
if (ids.contains(messageId)) {
return; // duplicate
}
System.out.println("Message: " + msg + ", id:" + messageId);
redisUtil.lpush("queueName4", messageId); // add to list
}Implementation 3 – Key‑Value with Expiration
@RabbitListener(queues = "queueName4")
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "utf-8");
String cachedMsg = redisUtil.get(messageId, "");
if (msg.equals(cachedMsg)) {
return; // duplicate
}
System.out.println("Message: " + msg + ", id:" + messageId);
redisUtil.set(messageId, msg, 10L); // expire after 10 minutes
}Testing the Solutions
Start the producer to send 10,000 messages, then start the consumer and interrupt it after a few thousand messages. The stored IDs prevent the same message from being processed twice when the consumer restarts, as demonstrated by the reduced message count in the queue.
By applying any of the three approaches, duplicate consumption can be effectively eliminated, ensuring reliable message processing in RabbitMQ‑based systems.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.