Implementing RabbitMQ Delayed Queues with the Delayed Message Plugin in Spring Boot
This article explains why traditional approaches like Redis expiration, database polling, or JVM DelayQueue are inefficient for delayed messaging, and demonstrates how to use RabbitMQ's delayed‑message‑exchange plugin with Spring Boot to create, configure, send, and consume delayed messages, including full code examples and a test showing a 6‑second delay.
The author introduces the concept of delayed queues in RabbitMQ, citing real‑world scenarios such as Taobao's seven‑day automatic receipt confirmation and 12306's 30‑minute order cancellation timer, and points out the drawbacks of common alternatives like Redis expiration, database polling, and JVM DelayQueue.
RabbitMQ versions prior to 3.6 used dead‑letter queues with TTL for delays, but since 3.6 a dedicated delayed‑message‑exchange plugin is provided. The plugin can be downloaded from GitHub and placed in the RabbitMQ plugins directory.
In a Spring Boot project the exchange, queue, and binding are defined in a configuration class. The relevant Java code is:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange() {
//Map
pros = new HashMap<>();
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue() {
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}To send a delayed message, the MessagePostProcessor is used to set the x-delay header (or the higher‑level setDelay method). Example sender code:
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendLazy(Object message) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("12345678909" + new Date());
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(6000); // 6 seconds
return message;
}
}, correlationData);
}
}The underlying setDelay(Integer i) method simply adds the x-delay header to the message properties:
/**
* Set the x-delay header.
* @param delay the delay.
*/
public void setDelay(Integer delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
} else {
this.headers.put(X_DELAY, delay);
}
}On the consumer side a listener receives the delayed messages and acknowledges them:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MQReceiver {
@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
}A simple JUnit test sends a message and, as shown in the console output, the consumer receives it after the configured 6‑second delay, confirming that the delayed‑queue plugin works as expected.
Source: https://cnblogs.com/haixiang/p/10966985.html
Architect's Tech Stack
Java backend, microservices, distributed systems, containerized programming, and more.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.