Backend Development 8 min read

Implementing RabbitMQ Delayed Queues with the Delayed Message Plugin in Spring Boot

This article explains why traditional approaches like Redis expiration, database polling, or JVM DelayQueue are inefficient for delayed messaging, and demonstrates how to use RabbitMQ's delayed‑message‑exchange plugin with Spring Boot to create, configure, send, and consume delayed messages, including full code examples and a test showing a 6‑second delay.

Architect's Tech Stack
Architect's Tech Stack
Architect's Tech Stack
Implementing RabbitMQ Delayed Queues with the Delayed Message Plugin in Spring Boot

The author introduces the concept of delayed queues in RabbitMQ, citing real‑world scenarios such as Taobao's seven‑day automatic receipt confirmation and 12306's 30‑minute order cancellation timer, and points out the drawbacks of common alternatives like Redis expiration, database polling, and JVM DelayQueue.

RabbitMQ versions prior to 3.6 used dead‑letter queues with TTL for delays, but since 3.6 a dedicated delayed‑message‑exchange plugin is provided. The plugin can be downloaded from GitHub and placed in the RabbitMQ plugins directory.

In a Spring Boot project the exchange, queue, and binding are defined in a configuration class. The relevant Java code is:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {
    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "MQ.LazyQueue";
    public static final String LAZY_KEY = "lazy.#";

    @Bean
    public TopicExchange lazyExchange() {
        //Map
pros = new HashMap<>();
        //pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue() {
        return new Queue(LAZY_QUEUE, true);
    }

    @Bean
    public Binding lazyBinding() {
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }
}

To send a delayed message, the MessagePostProcessor is used to set the x-delay header (or the higher‑level setDelay method). Example sender code:

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;

@Component
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendLazy(Object message) {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData correlationData = new CorrelationData("12345678909" + new Date());
        rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
            new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    message.getMessageProperties().setDelay(6000); // 6 seconds
                    return message;
                }
            }, correlationData);
    }
}

The underlying setDelay(Integer i) method simply adds the x-delay header to the message properties:

/**
 * Set the x-delay header.
 * @param delay the delay.
 */
public void setDelay(Integer delay) {
    if (delay == null || delay < 0) {
        this.headers.remove(X_DELAY);
    } else {
        this.headers.put(X_DELAY, delay);
    }
}

On the consumer side a listener receives the delayed messages and acknowledges them:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class MQReceiver {
    @RabbitListener(queues = "MQ.LazyQueue")
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("lazy receive " + new String(msg.getBody()));
    }
}

A simple JUnit test sends a message and, as shown in the console output, the consumer receives it after the configured 6‑second delay, confirming that the delayed‑queue plugin works as expected.

Source: https://cnblogs.com/haixiang/p/10966985.html

backendJavaSpring BootMessage QueueRabbitMQDelayed Queue
Architect's Tech Stack
Written by

Architect's Tech Stack

Java backend, microservices, distributed systems, containerized programming, and more.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.