Comprehensive Guide to RocketMQ Message Types, Production, and Consumption
This article provides an in‑depth tutorial on RocketMQ, covering normal, ordered, delayed, transactional, and batch messages, their production and consumption patterns, retry mechanisms, message filtering, and dead‑letter handling, complete with Java code examples for each scenario.
RocketMQ supports various message models, each with distinct sending and consumption semantics. Normal messages can be sent synchronously, asynchronously, or one‑way, with examples such as:
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_A");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// send 100 messages synchronously
for (int i = 0; i < 100; i++) {
byte[] bytes = ("hi " + i).getBytes();
Message msg = new Message("topic_A", "tag_A", bytes);
msg.setKeys("key-" + i);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
producer.shutdown();
}
}Asynchronous sending uses a callback to handle success or failure, while one‑way sending does not wait for any acknowledgment.
Ordered messages guarantee FIFO processing by routing all related messages to the same queue. The article explains global versus partition ordering and shows how to implement a custom MessageQueueSelector to achieve ordered delivery:
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_A");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Integer orderId = i;
byte[] bytes = ("hi " + i).getBytes();
Message msg = new Message("topic_A", "tag_A", bytes);
SendResult sendResult = producer.send(msg, (list, message, arg) -> {
int index = ((Integer) arg) % list.size();
return list.get(index);
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}Delayed messages are stored in a special schedule topic and become visible after a configured delay level. The following snippet demonstrates setting a delay level of 3 (10 seconds):
public class DelayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_A");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
byte[] bytes = ("hi " + i).getBytes();
Message msg = new Message("topic_A", "tag_A", bytes);
msg.setDelayTimeLevel(3); // 10 seconds delay
SendResult sendResult = producer.send(msg);
System.out.println(new SimpleDateFormat("mm:ss").format(new Date()) + " , " + sendResult);
}
producer.shutdown();
}
}Transactional messages provide exactly‑once semantics across distributed systems. The guide outlines the two‑phase commit flow, local transaction state enumeration, and shows a sample transaction listener implementation:
public enum LocalTransactionState {
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW
}
public class ICBCTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if ("TAGA".equals(msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if ("TAGB".equals(msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("checking transaction for " + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}Batch sending aggregates multiple messages into a single request, improving throughput. The article provides a MessageListSplitter utility that respects the 4 MB size limit and shows how to use it in a producer loop.
Message filtering can be performed by tag or SQL expression. Tag filtering uses the subscribe method with a tag expression, while SQL filtering requires enabling enablePropertyFilter=true on the broker and using MessageSelector.bySql on the consumer side.
Both producers and consumers have configurable retry mechanisms. Synchronous sending retries up to two times on different brokers, asynchronous retries stay on the same broker, and transaction messages can be retried by setting retryAnotherBrokerWhenNotStoreOK=true . Consumer retry intervals grow exponentially, and messages exceeding the maximum retry count are moved to a dead‑letter queue named %DLQ%consumerGroup@consumerGroup .
Overall, the guide equips developers with the knowledge to correctly choose message types, implement reliable production and consumption patterns, and handle failure scenarios in RocketMQ‑based backend systems.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.