Spring Boot Integration with RabbitMQ: Configuring Direct Exchange, Queues, Consumers, and Manual ACK/NACK Handling
This article demonstrates how to integrate Spring Boot with RabbitMQ using a direct exchange, configuring queues, producers, and two consumers, and explains manual acknowledgment mechanisms (ACK, NACK, Reject) to ensure reliable message processing and handling of consumer failures.
1. Introduction
The author explores integrating Spring Boot with a simple message queue, focusing on ensuring that messages are successfully consumed by the consumers.
2. Preparation
Using a direct exchange configuration, the following components are set up:
Message forwarder type: direct.
Message queue: fair distribution.
Process: producer sends messages to the queue, two consumers retrieve and consume them, and the queue is cleared after consumption.
2.1 Dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.2 YAML Configuration
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xiangjiao
password: bunana
virtual-host: /xiangjiao
publisher-confirms: true # enable send confirm
publisher-returns: true # enable return on failure
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
retry:
enabled: true2.3 Configuring Exchange and Queue
Define a direct exchange directExchangeTx and a queue directQueueTx , then bind them with a routing key.
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectExchangeTxQueueConfig {
@Bean(name="getDirectExchangeTx")
public DirectExchange getDirectExchangeTx() {
return new DirectExchange("directExchangeTx", true, false);
}
@Bean(name="getQueueTx")
public Queue getQueueTx() {
return new Queue("directQueueTx", true, false, false);
}
@Bean
public Binding getDirectExchangeQueueTx(@Qualifier("getDirectExchangeTx") DirectExchange exchange,
@Qualifier("getQueueTx") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("directQueueTxRoutingKey");
}
}2.4 Consumer Configuration
Two consumers listen on the same queue. Consumer 1 processes messages with a 1‑second delay; Consumer 2 uses a 3‑second delay.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer1 {
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg1 success msg = " + msg);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("get msg1 failed msg = " + msg);
}
}
} import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
try {
if (!isNull(msg)) {
int num = Integer.parseInt(msg.substring(3));
if (num >= 3) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("get msg2 basicNack msg = " + msg);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 basicAck msg = " + msg);
}
}
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("get msg2 failed msg = " + msg);
}
}
public static boolean isNull(Object obj) {
return obj == null || "".equals(obj) || "null".equals(obj);
}
}2.5 Message Producer
An interface IMessageServcie and its implementation use RabbitTemplate to send messages with mandatory, confirm, and return callbacks.
public interface IMessageServcie {
void sendMessage(String exchange, String routingKey, Object msg);
} @Component
public class MessageServiceImpl implements IMessageServcie, ConfirmCallback, ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String exchange, String routingKey, Object msg) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
// returnedMessage and confirm methods omitted for brevity
}A controller endpoint /sendMoreMsgTx sends ten messages at two‑second intervals.
@Controller
public class SendMessageTx {
@Autowired
private IMessageServcie messageServiceImpl;
@RequestMapping("/sendMoreMsgTx")
@ResponseBody
public String sendMoreMsgTx() {
for (int i = 0; i < 10; i++) {
String msg = "msg" + i;
System.out.println("发送消息 msg:" + msg);
messageServiceImpl.sendMessage("directExchangeTx", "directQueueTxRoutingKey", msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
}
return "send ok";
}
}3. ACK Configuration and Testing
The author simulates a failure in Consumer 2 by NACK‑ing messages with a numeric suffix ≥ 3, causing the broker to re‑queue the message until Consumer 1 successfully processes it.
4. Analysis of Confirmation Methods
4.1 channel.basicAck
Used to acknowledge a single message: channel.basicAck(deliveryTag, false) .
4.2 channel.basicNack
Used to negatively acknowledge a message with optional re‑queue: channel.basicNack(deliveryTag, false, true) . Setting the last flag to false discards the message.
4.3 channel.basicReject
Equivalent to basicNack with multiple=false . Example: channel.basicReject(deliveryTag, true) re‑queues the message.
5. Summary
The article consolidates configuration steps for RabbitMQ in Spring Boot, explains three acknowledgment methods (ACK, NACK, Reject), and demonstrates how to prevent message loss when a consumer fails, ensuring reliable delivery across multiple consumers.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.