Integrating ActiveMQ with Spring: Configuration, Producers, Consumers, Listeners, and Transaction Management
This article provides a comprehensive guide on integrating ActiveMQ with Spring, covering XML configuration of connection factories, producers, consumers, various MessageListener implementations, message conversion, and JMS transaction management, supplemented with complete code examples and deployment instructions.
This article explains how to integrate ActiveMQ with Spring, covering the necessary knowledge of Spring, JMS, and ActiveMQ configuration using XML files, different types of message listeners, message converters, and Spring's JMS transaction management.
1. Spring integration with ActiveMQ – XML configuration
1.1 Configuring ConnectionFactory
Spring provides several ConnectionFactory implementations, such as SingleConnectionFactory and CachingConnectionFactory. The example uses SingleConnectionFactory. The actual JMS ConnectionFactory must be supplied by the JMS provider (ActiveMQ) and injected into Spring's wrapper.
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>When using ActiveMQ, a PooledConnectionFactory can be created by injecting an ActiveMQConnectionFactory, which pools connections, sessions, and producers.
<!-- ActiveMQ connection factory -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.3.3:61616" userName="admin" password="admin" />
<!-- Spring Caching connection factory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>1.2 Configuring the producer
The producer uses Spring's JmsTemplate to send messages. The JmsTemplate bean must reference the previously defined ConnectionFactory.
<!-- Spring JMS template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>Destinations (queues or topics) are defined as beans; the default destination can be set via defaultDestination or defaultDestinationName on the JmsTemplate.
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg><value>queue</value></constructor-arg>
</bean>
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>1.3 Configuring the consumer
Consumers are set up using a MessageListenerContainer, which requires a ConnectionFactory, a Destination, and a MessageListener implementation.
<bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>1.4 Defining a MessageListener
Implement the JMS MessageListener interface and override the onMessage method to process incoming messages.
1.5 Example analysis
An example creates a session‑aware queue, sends messages, and replies using a custom ConsumerSessionAwareMessageListener . The full XML configuration is provided.
<!-- JmsTemplate bean -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.210.128:61616"/>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg><value>queue</value></constructor-arg>
</bean>
<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg><value>sessionAwareQueue</value></constructor-arg>
</bean>
<bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener">
<property name="messageConverter" ref="emailMessageConverter"/>
</bean>
<bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">
<property name="destination" ref="queueDestination"/>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="sessionAwareQueue"/>
<property name="messageListener" ref="consumerSessionAwareMessageListener"/>
</bean>Java code for the producer service:
@Component
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
private Destination responseDestination;
public void sendMessage(Destination destination, final String message) {
System.out.println("---------------Producer sending message-----------------");
System.out.println("Message: " + message);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
public void sendMessage(final Destination destination, final Serializable obj) {
jmsTemplate.convertAndSend(destination, obj);
}
}Java code for a SessionAwareMessageListener that replies after receiving a message:
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<TextMessage> {
private Destination destination;
public void onMessage(TextMessage message, Session session) throws JMSException {
System.out.println("Received a message");
System.out.println("Content: " + message.getText());
MessageProducer producer = session.createProducer(destination);
System.out.println("Sending a reply message");
Message textMessage = session.createTextMessage("Reply from ConsumerSessionAwareMessageListener");
producer.send(textMessage);
}
public Destination getDestination() { return destination; }
public void setDestination(Destination destination) { this.destination = destination; }
}JUnit test to trigger the session‑aware listener:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
@Autowired
private ProducerService producerService;
@Autowired @Qualifier("queueDestination")
private Destination destination;
@Autowired @Qualifier("sessionAwareQueue")
private Destination sessionAwareQueue;
@Test
public void testSessionAwareMessageListener() {
producerService.sendMessage(sessionAwareQueue, "Test SessionAwareMessageListener");
}
}2. MessageListener types
Spring supports three listener types: MessageListener , SessionAwareMessageListener , and MessageListenerAdapter . The article describes their differences and usage.
2.1 MessageListener – the standard JMS interface with a single onMessage(Message) method.
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMsg = (TextMessage) message;
System.out.println("Received a text message: " + textMsg.getText());
}
}2.2 SessionAwareMessageListener – allows access to the Session so the listener can send a reply message.
2.3 MessageListenerAdapter – adapts a plain POJO by converting messages to appropriate types (String, byte[], Map, Serializable) before invoking the target method.
3. MessageConverter
MessageConverter transforms between JMS messages and Java objects. Spring’s SimpleMessageConverter is used by default, but custom converters can be implemented for special needs.
// Example of sending an object using a converter
TestMqBean bean = new TestMqBean();
bean.setAge(13);
for (int i = 0; i < 10; i++) {
bean.setName("send to data -" + i);
producer.send(session.createObjectMessage(bean));
}
// Using JmsTemplate.convertAndSend
jmsTemplate.convertAndSend(destination, obj);
// Receiving an object
public void receiveMessage(ObjectMessage message) throws JMSException {
System.out.println(message.getObject());
}4. Spring JMS transaction management
Spring provides JmsTransactionManager to manage transactions for a JMS ConnectionFactory. By setting sessionTransacted="true" on a listener container, message reception participates in a transaction that can be rolled back on failure.
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>For distributed transactions, a JtaTransactionManager can be configured alongside a JMS container that supports XA.
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
<tx:annotation-driven transaction-manager="jtaTransactionManager"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>5. References
Spring JMS integration – transaction management: http://elim.iteye.com/blog/1983532
Spring JMS integration – three listener types: http://elim.iteye.com/blog/1893676
Spring JMS integration – ActiveMQ basics: http://elim.iteye.com/blog/1893038
6. Source code download
The complete project can be cloned from GitHub: https://github.com/wuya11/SpringinteJMSonActiveMQ
Original source: https://www.cnblogs.com/wlandwl/p/activemqtwo.html
Architecture Digest
Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.