Five Implementation Approaches for Delayed Messages in Message Queues
This article explains the concept and common use cases of delayed messages and compares five practical implementation methods—using Redis sorted sets, RabbitMQ delayed‑message exchange plugin, ActiveMQ scheduled messages, RocketMQ fixed delay levels, and a custom RocketMQ solution—detailing their workflows, advantages, and drawbacks.
After a producer sends a message to a queue, it may not need to be consumed immediately; such messages are called delayed messages. Typical scenarios include notifying students 15 minutes before an online class, closing unpaid orders after an hour, sending re‑engagement pushes after 15 days of inactivity, or reminding responsible personnel about overdue tickets.
1. Redis
Redis sorted sets (ZSET) can store messages with the expected consumption timestamp as the score. A scheduled task repeatedly reads elements whose score is less than the current time and moves the corresponding keys to a list for consumption.
Typical steps:
1. Call API with execution time and payload.
2. Generate a unique key and store the serialized payload in a Redis string.
3. Store the key and timestamp in a Redis sorted set.
4. Periodically fetch the smallest timestamp.
5. If the timestamp is due, push the key to a Redis list.
6. Another task reads keys from the list, retrieves the payload, processes it, and deletes or retries the key.Pros: Simple, practical, quick to implement.
Cons: Single sorted set cannot handle large data volumes; frequent polling may cause unnecessary load. Suitable only for small‑scale scenarios.
2. RabbitMQ
RabbitMQ does not natively support delayed messages, but the combination of TTL (Time‑To‑Live) and dead‑letter exchanges can simulate the behavior. Since version 3.5.8, the official rabbitmq_delayed_message_exchange plugin provides native delayed delivery.
Installation :
rabbitmq-plugins enable rabbitmq_delayed_message_exchangeUsage example (Java): declare an exchange of type x-delayed-message , set the x-delay header with the desired delay in milliseconds, and publish the message.
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);
byte[] body = "This is a delayed message".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("one-more-exchange", "", props.build(), body);Pros: Reliable, widely used middleware.
Cons: Single‑node master can become a performance bottleneck.
3. ActiveMQ
Since version 5.4, ActiveMQ supports persistent delayed messages and even Cron expressions. Enable it by setting schedulerSupport="true" in activemq.xml :
<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true">
</broker>Client‑side properties:
AMQ_SCHEDULED_DELAY – delay in ms.
AMQ_SCHEDULED_PERIOD – repeat interval.
AMQ_SCHEDULED_REPEAT – repeat count.
AMQ_SCHEDULED_CRON – Cron expression.
Example (delay 60 s, repeat 5 times every 10 s):
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60*1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10*1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);Pros: Stable, supports Cron scheduling.
Cons: Single‑node master may limit throughput.
4. RocketMQ (standard)
RocketMQ provides delayed messages only for 18 predefined levels (e.g., 1 s, 5 s, …, 2 h). The delay level is set on the message; values above the maximum are capped.
Consumer example (Java) reads from the scheduled topic, and the broker’s ScheduleMessageService moves the message to the original topic when the delay expires.
public class Consumer {
public static void main(String[] args) throws MQClientException {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OneMoreTopic", "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s %s Receive New Messages:%n", sdf.format(new Date()), Thread.currentThread().getName());
System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
System.out.printf("\tBody: %s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}Producer sets the delay level (e.g., msg.setDelayTimeLevel(3) for a 10 s delay).
public class DelayProducer {
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("OneMoreTopic", "DelayMessage", "This is a delay message.".getBytes());
msg.setDelayTimeLevel(3); // 10 seconds
SendResult result = producer.send(msg);
System.out.printf("%s Send Status: %s, Msg Id: %s %n", sdf.format(new Date()), result.getSendStatus(), result.getMsgId());
producer.shutdown();
}
}Pros: Distributed, high‑throughput, high‑performance, high reliability, supports custom delay via level mapping.
Cons: Only the fixed 18 levels are available; custom delays require source modification.
5. Custom RocketMQ
By modifying the open‑source RocketMQ code, one can compute a suitable delay level for any arbitrary delay, adjust the scheduled topic/queue mapping, and re‑inject the message after the calculated wait time. The core steps are:
When msg.getDelayTimeLevel() > 0 , change the topic to SCHEDULE_TOPIC_XXXX and set queueId = level‑1 .
Store the real topic and queue ID in message properties.
Schedule tasks per level in ScheduleMessageService.start() to poll the corresponding queue.
When a message’s delivery timestamp arrives, rebuild the original message, clear the delay level, restore the real topic/queue, and write it back to the normal store.
The source code snippets in CommitLog and ScheduleMessageService illustrate how the broker determines whether a message is ready and how it performs the re‑delivery.
Conclusion
The article presented five delayed‑message solutions—Redis sorted set, RabbitMQ delayed‑message exchange plugin, ActiveMQ scheduled messages, RocketMQ fixed‑level delays, and a customized RocketMQ approach—each with its own strengths and limitations. Selecting the appropriate method depends on data volume, latency requirements, and operational constraints.
New Oriental Technology
Practical internet development experience, tech sharing, knowledge consolidation, and forward-thinking insights.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.