Backend Development 14 min read

Spring Boot Integration with RabbitMQ: Configuring Direct Exchange, Queues, Consumers, and Manual ACK/NACK Handling

This article demonstrates how to integrate Spring Boot with RabbitMQ using a direct exchange, configuring queues, producers, and two consumers, and explains manual acknowledgment mechanisms (ACK, NACK, Reject) to ensure reliable message processing and handling of consumer failures.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Spring Boot Integration with RabbitMQ: Configuring Direct Exchange, Queues, Consumers, and Manual ACK/NACK Handling

1. Introduction

The author explores integrating Spring Boot with a simple message queue, focusing on ensuring that messages are successfully consumed by the consumers.

2. Preparation

Using a direct exchange configuration, the following components are set up:

Message forwarder type: direct.

Message queue: fair distribution.

Process: producer sends messages to the queue, two consumers retrieve and consume them, and the queue is cleared after consumption.

2.1 Dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 YAML Configuration

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: xiangjiao
    password: bunana
    virtual-host: /xiangjiao
    publisher-confirms: true   # enable send confirm
    publisher-returns: true    # enable return on failure
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
    retry:
      enabled: true

2.3 Configuring Exchange and Queue

Define a direct exchange directExchangeTx and a queue directQueueTx , then bind them with a routing key.

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectExchangeTxQueueConfig {
    @Bean(name="getDirectExchangeTx")
    public DirectExchange getDirectExchangeTx() {
        return new DirectExchange("directExchangeTx", true, false);
    }
    @Bean(name="getQueueTx")
    public Queue getQueueTx() {
        return new Queue("directQueueTx", true, false, false);
    }
    @Bean
    public Binding getDirectExchangeQueueTx(@Qualifier("getDirectExchangeTx") DirectExchange exchange,
                                           @Qualifier("getQueueTx") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("directQueueTxRoutingKey");
    }
}

2.4 Consumer Configuration

Two consumers listen on the same queue. Consumer 1 processes messages with a 1‑second delay; Consumer 2 uses a 3‑second delay.

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="directQueueTx")
public class Consumer1 {
    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("get msg1 success msg = " + msg);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("get msg1 failed msg = " + msg);
        }
    }
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
        try {
            if (!isNull(msg)) {
                int num = Integer.parseInt(msg.substring(3));
                if (num >= 3) {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("get msg2 basicNack msg = " + msg);
                } else {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println("get msg2 basicAck msg = " + msg);
                }
            }
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("get msg2 failed msg = " + msg);
        }
    }
    public static boolean isNull(Object obj) {
        return obj == null || "".equals(obj) || "null".equals(obj);
    }
}

2.5 Message Producer

An interface IMessageServcie and its implementation use RabbitTemplate to send messages with mandatory, confirm, and return callbacks.

public interface IMessageServcie {
    void sendMessage(String exchange, String routingKey, Object msg);
}
@Component
public class MessageServiceImpl implements IMessageServcie, ConfirmCallback, ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void sendMessage(String exchange, String routingKey, Object msg) {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }
    // returnedMessage and confirm methods omitted for brevity
}

A controller endpoint /sendMoreMsgTx sends ten messages at two‑second intervals.

@Controller
public class SendMessageTx {
    @Autowired
    private IMessageServcie messageServiceImpl;
    @RequestMapping("/sendMoreMsgTx")
    @ResponseBody
    public String sendMoreMsgTx() {
        for (int i = 0; i < 10; i++) {
            String msg = "msg" + i;
            System.out.println("发送消息  msg:" + msg);
            messageServiceImpl.sendMessage("directExchangeTx", "directQueueTxRoutingKey", msg);
            try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
        }
        return "send ok";
    }
}

3. ACK Configuration and Testing

The author simulates a failure in Consumer 2 by NACK‑ing messages with a numeric suffix ≥ 3, causing the broker to re‑queue the message until Consumer 1 successfully processes it.

4. Analysis of Confirmation Methods

4.1 channel.basicAck

Used to acknowledge a single message: channel.basicAck(deliveryTag, false) .

4.2 channel.basicNack

Used to negatively acknowledge a message with optional re‑queue: channel.basicNack(deliveryTag, false, true) . Setting the last flag to false discards the message.

4.3 channel.basicReject

Equivalent to basicNack with multiple=false . Example: channel.basicReject(deliveryTag, true) re‑queues the message.

5. Summary

The article consolidates configuration steps for RabbitMQ in Spring Boot, explains three acknowledgment methods (ACK, NACK, Reject), and demonstrates how to prevent message loss when a consumer fails, ensuring reliable delivery across multiple consumers.

Spring BootMessage QueueRabbitMQManual ACKConsumer FailureDirect Exchange
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.