Implementing Delayed Message Queues with RabbitMQ Plugin in Spring Boot
This article explains how to replace inefficient traditional delay mechanisms with RabbitMQ's delayed‑queue plugin, showing step‑by‑step configuration, producer and consumer code in Spring Boot, and demonstrates successful delayed message delivery using real‑world examples such as auto‑confirm orders and ticket reservations.
Many common applications use delayed message push, such as Taobao's automatic seven‑day order confirmation and 12306's 30‑minute ticket reservation timeout, which are typically implemented with message middleware.
Traditional solutions like setting Redis expiration, polling database tables, or using JVM's DelayQueue suffer from low performance, high memory pressure, and lack of persistence.
Since RabbitMQ 3.6.x, a delayed‑queue plugin is provided. By installing the plugin in the RabbitMQ plugins directory, you can create an exchange and queue that support delayed messages.
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfig { public static final String LAZY_EXCHANGE = "Ex.LazyExchange"; public static final String LAZY_QUEUE = "MQ.LazyQueue"; public static final String LAZY_KEY = "lazy.#"; @Bean public TopicExchange lazyExchange() { //Map pros = new HashMap<>(); //pros.put("x-delayed-message", "topic"); TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros); exchange.setDelayed(true); return exchange; } @Bean public Queue lazyQueue() { return new Queue(LAZY_QUEUE, true); } @Bean public Binding lazyBinding() { return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY); } }
In the exchange declaration you can enable delayed queues by calling exchange.setDelayed(true) or by passing the x-delayed-message argument directly.
When sending a message, specify the delay via a MessagePostProcessor that sets the x-delay header (or uses message.getMessageProperties().setDelay(6000) ), and mark the message as persistent.
import com.anqi.mq.config.MQConfig; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendLazy(Object message) { rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); CorrelationData correlationData = new CorrelationData("12345678909" + new Date()); rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(6000); return message; } }, correlationData); } }
The underlying implementation of setDelay(Integer i) simply sets the x-delay header, which is equivalent to manually adding the header.
public void setDelay(Integer delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); } else { this.headers.put(X_DELAY, delay); } }
On the consumer side, a listener receives messages from the delayed queue and acknowledges them.
import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MQReceiver { @RabbitListener(queues = "MQ.LazyQueue") @RabbitHandler public void onLazyMessage(Message msg, Channel channel) throws IOException { long deliveryTag = msg.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, true); System.out.println("lazy receive " + new String(msg.getBody())); } }
A simple JUnit test sends a message and, after a 6‑second delay, the consumer prints lazy receive hello spring boot: , confirming that the delayed queue works as expected.
Architecture Digest
Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.