Design and Implementation of HunterConsumer and HunterProducer Components for RocketMQ Integration
This article explains the background of RocketMQ clusters, demonstrates typical Java consumer and producer usage, and introduces the AOP‑based HunterConsumer and HunterProducer components that simplify lifecycle management, configuration, and code reuse for backend developers working with messaging systems.
Background: RocketMQ cluster consists of Producer, Broker, Consumer, and Name Server; business developers mainly interact with the Producer and Consumer Java clients.
Typical usage of Consumer and Producer is illustrated with Java code examples.
To eliminate repetitive lifecycle code, a new component based on AOP annotations (@HunterConsumer and @HunterProducer) is introduced, allowing developers to specify topic, tags, and group via annotations while Spring manages bean creation, initialization, and shutdown.
HunterConsumer works by reading Spring configuration, scanning the container for @HunterConsumer annotations, performing pre‑start checks, creating and starting DefaultMQPushConsumer instances, and handling shutdown through a DisposableBean implementation.
public class TestListener implements MessageListenerConcurrently {
DefaultMQPushConsumer consumer = null;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
// business logic
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
public void startConsumer() throws Exception {
consumer = new DefaultMQPushConsumer();
consumer.setSubscribeTopic("testTopic");
consumer.subscribe("testTopic", "testTags");
consumer.setConsumerGroup("testConsumerGroup");
consumer.setNamesrvAddr("name server address");
consumer.registerMessageListener(this);
consumer.start();
}
public void shutdown() {
if (consumer != null) {
consumer.shutdown();
}
}
}Key configuration utilities (PropertyTool) provide typed access to environment and property values, supporting defaults and type conversion.
HunterProducer follows a similar lifecycle, exposing IProducer , IHunterProducer , and IHunterDelayProducer interfaces for sending normal and delayed messages, with Spring‑managed bean creation and shutdown.
public interface IProducer {
SendResult send(Message msg);
SendResult send(String tag, Object body);
void sendCallback(Message msg, HunterSendCallback callback) throws HunterProducerException;
void sendCallback(String tag, Object body, HunterSendCallback callback) throws HunterProducerException;
}
public interface IHunterProducer extends IProducer {}
public interface IHunterDelayProducer extends IProducer {
SendResult send(String tag, Object body, int delay, TimeUnit timeUnit);
SendResult send(String tag, Object body, Date delayEndDate);
void sendCallback(String tag, Object body, int delay, TimeUnit timeUnit, HunterSendCallback callback);
void sendCallback(String tag, Object body, Date delayEndDate, HunterSendCallback callback);
}Both components simplify business code, improve development efficiency, and enhance maintainability by centralizing MQ client management and reducing boilerplate.
Zhuanzhuan Tech
A platform for Zhuanzhuan R&D and industry peers to learn and exchange technology, regularly sharing frontline experience and cutting‑edge topics. We welcome practical discussions and sharing; contact waterystone with any questions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.