Consumer‑Side Rate Limiting, TTL, and Dead Letter Queues in RabbitMQ
This article explains why consumer‑side flow control is needed in RabbitMQ, demonstrates how to use the QoS API and basicQos settings, shows practical Java code for limiting consumption, and covers TTL and dead‑letter queue configurations to improve message reliability and system stability.
Consumer‑Side Rate Limiting
When a RabbitMQ server accumulates thousands of pending messages, a newly started consumer can be overwhelmed by a sudden flood of data; limiting the producer is not feasible because traffic patterns vary, so limiting the consumer side keeps the consumer stable and prevents resource exhaustion.
1. Why limit the consumer side
In high‑volume scenarios the consumer cannot process all messages instantly; applying flow control on the consumer prevents crashes and performance degradation.
2. Rate‑limit API
RabbitMQ provides a QoS (quality‑of‑service) feature that, when automatic acknowledgments are disabled, stops delivering new messages until a configured number of messages have been acknowledged.
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer‑initiated flow control.
* @param prefetchSize maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server will deliver, 0 if unlimited
* @param global true if the settings should be applied to the entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize: 0 means no size limit per message.
prefetchCount: limits the number of unacknowledged messages a consumer can have; when the limit is reached the consumer is blocked until some messages are acked.
global: true applies the limit at the channel level; false applies it per consumer (the current client library only supports the per‑consumer mode).
Note: prefetchSize and global are not implemented in RabbitMQ; prefetchCount works only when auto‑ack is false.
3. How to apply consumer‑side limiting
Step 1: Disable automatic ack: channel.basicConsume(queueName, false, consumer);
Step 2: Set the limit, e.g., channel.basicQos(0, 15, false);
Step 3: In the consumer’s handleDelivery method, manually ack messages, optionally in batch: channel.basicAck(envelope.getDeliveryTag(), true);
Producer code (unchanged) demonstrates sending ten messages:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
channel.close();
connection.close();
}
}Consumer code verifies the limit (global = false) and adds a 5‑second sleep to make processing visible:
import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
channel.basicConsume(queueName, false, consumer);
}
}Observing the management UI shows the Unacked count staying at 3, confirming that the consumer processes at most three messages concurrently.
TTL
TTL (Time To Live) defines how long a message or a queue can exist before it is automatically discarded. RabbitMQ supports both message‑level and queue‑level TTL, similar to Redis expiration, helping to clean up stale messages and reduce server load.
Message TTL can be set when publishing:
/**
* deliveryMode 2 means persistent message
* expiration sets the message lifetime; after 100 000 ms the message is removed if not consumed
*/
Map
headers = new HashMap<>();
headers.put("myhead1", "111");
headers.put("myhead2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());Queue TTL can be configured in the management UI; messages older than the configured time are removed automatically.
Dead Letter Queue (DLX)
A dead‑letter queue stores messages that were not successfully consumed, e.g., because they were rejected, expired, or the queue reached its maximum length.
Message rejected with basic.reject or basic.nack and requeue=false .
Message TTL expires.
Queue length limit is exceeded.
Implementation steps
Define a dead‑letter exchange and queue, then bind them:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # // matches all routing keysDeclare the normal queue with the dead‑letter exchange argument:
Map
arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("normal.queue", true, false, false, arguments);Producer that sets a short expiration so the message becomes a dead letter:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000") // 10 seconds
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}Consumer that reads from the dead‑letter queue:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
Map
arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}Summary
DLX is a regular exchange; when a queue has dead‑letter messages, RabbitMQ republishes them to the configured exchange, routing them to a designated dead‑letter queue where they can be inspected and processed accordingly.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.