Apache ActiveMQ Best Practices Tutorial (original) (raw)

Apache ActiveMQ is an open source messaging server written in Java which implements JMS 1.1 specifications. In this tutorial, you will learn how to develop a few Java applications which integrate ActiveMQ to send and receive messages to and from destinations. If you already know how to install and configure ActiveMQ, you can skip the first four chapters.

1. Introduction

Apache ActiveMQ (AMQ) is JMS 1.1 implementation from the Apache Software Foundation.

AMQ is a message broker which translates the messages from the sender to the receiver. Message brokers are the building blocks of message-oriented middleware (MOM) architecture.

AMQ is one of the best open source messaging and Integration Patterns server. It provides a communication between applications, as well as fulfills both notification and inter-operation needs among the applications.

2. Install an Apache ActiveMQ Server

Most of business applications treat the AMQ as an infrastructure resource. We will install an AMQ server as a standalone server in this tutorial. Follow these instructions, we installed the AMQ 5.15.0.

3. Start the Apache ActiveMQ Server

Navigate to \apache-activemq-5.15.0\bin\win64 directory and click on the activemq.bat to start the server.

The output below demonstrates that the server started successfully.

server.log

jvm 1 | INFO | Apache ActiveMQ 5.15.0 (localhost, ID:SL2LS431841-57319-1512184574307-0:1) started jvm 1 | INFO | For help or more information please see: http://activemq.apache.org

4. Monitor the Apache ActiveMQ Server

AMQ provides a web console application to monitor and administrate. After the AMQ server starts, follow the steps below to launch the web console.

Here you should see the “Welcome” page. Users can send, read, and delete messages via the web console.

5. Business Use Cases

Company X provides services to customers. Each new customer will be set up at billing and support systems.

In this tutorial, we will demonstrate how to build customer on-boarding process, billing system, support application, and integrate them via AMQ:

6. Define JMS Message

6.1 Message Destination

For this business use case, both billing and support systems get notified when new customer joins. We choose the publish/subscribe message pattern to build the OnBoardNewCustomerApp which publishes the customer event to AMQ broker topic: VirtualTopic.Customer.Topic.
There are three special characters reserved by AMQ when naming the destination:

6.2 Message Header

The message header provides meta data about the message used by both clients and the AMQ brokers. There are sets of pre-defined JMS message header. Giving two examples below:

6.3 Message Body

The message body is the actual message that integrates the applications together. For this example, the message is Json format of the CustomerEvent.

CustomerEvent

package jcg.demo.model;

public class CustomerEvent { private String type; private Integer customerId;

public CustomerEvent(String type, Integer customerId) {
    this.type = type;
    this.customerId = customerId;
}

public String getType() {
    return type;
}

public Integer getCustomerId() {
    return customerId;
}

public String toString() {
    return "CustomerEvent: type(" + type + "), customerId(" + customerId + ")";
}

public String getCustomerDetailUri() {
    return "https://localhost:8080/support/customer/" + customerId;
}

}

6.4 Configure Virtual Topic

AMQ server installation comes with a ready to use configuration file. Modify the activemq.xml to add below to allow AMQ Broker forwards the messages from any topic named as VirtualTopic.*.Topic to any virtutal topic destination with name starts as Consumer.*.

activemq.xml

Restart the AMQ server after the configuration file updates.

7. Apache ActiveMQ Java Client Library

Add ActiveMQ Java library to the project pom.xml.

pom.xml

org.apache.activemq activemq-client 5.15.0

8. Publish Message Application

In this example, you will see how to create ActiveMQMessgeProducer to send the messages.

8.1 ActiveMQMessgeProducer

A Java class wraps the ActiveMQ Java API to send the messages.

ActiveMQMessgeProducer

package jcg.demo.activemq;

import java.util.Random;

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy;

import com.google.gson.Gson;

import jcg.demo.jms.util.DataUtil;

/**

}

8.2 ActiveMQMessgeProducerTest

This Junit test sends the messages to various destinations. This is my convenient way to send the message to the destination.

ActiveMQMessgeProducerTest

package jcg.demo.activemq;

import javax.jms.JMSException;

import org.junit.After; import org.junit.Before; import org.junit.Test;

import jcg.demo.jms.util.DataUtil;

public class ActiveMQMessageProducerTest {

private ActiveMQMessageProducer msgQueueSender;

@Before
public void setup() {
    msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin");
}

@After
public void cleanup() throws JMSException {
    msgQueueSender.close();
}

@Test
public void send_msg_to_no_transaction_Queue() throws JMSException {
    msgQueueSender.setup(false, false, DataUtil.TEST_GROUP1_QUEUE_1);
    msgQueueSender.sendMessage("JCG");
}

@Test
public void send_msg_to_Group2_Queue1() throws JMSException {
    msgQueueSender.setup(false, false, DataUtil.TEST_GROUP2_QUEUE_1);
    msgQueueSender.sendMessage("JCG");
}

@Test
public void send_msg_to_transaction_Group1_Queue2() throws JMSException {
    msgQueueSender.setup(true, false, DataUtil.TEST_GROUP1_QUEUE_2);
    msgQueueSender.sendMessage("DEMO");
    msgQueueSender.commit(true);
}

@Test
public void send_msg_to_no_transaction_Group1_Topic() throws JMSException {
    msgQueueSender.setup(false, true, DataUtil.TEST_GROUP1_TOPIC);
    msgQueueSender.sendMessage("MZHENG");
}

@Test
public void send_msg_to_Virtual_Topic() throws JMSException {
    msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC);
    msgQueueSender.sendMessage("MZHENG");
}

@Test
public void send_msg_to_Virtual_Topic_WithSelector() throws JMSException {
    msgQueueSender.setup(false, true, DataUtil.TEST_VTC_TOPIC_SELECTOR);
    msgQueueSender.sendMessage("DZONE");
}

}

8.3 Execution Output

We ran the ActiveMQMessgeProducerTest to send message to three queues and three topics. You can verified by viewing the AMQ web console. There are one pending messages in each of three queues:test.group1.queue1, test.group1.queue2, and test.group2.queue1.

There is one messages in each of three topics: JCG.Mary.Topic, test.group1.topic and VirtualTopic.Customer.Topic.

8.4 OnBoardNewCustomerApp

OnBoardNewCustomerApp sends the new customer message to the VirtualTopic.Customer.Topic.

OnBoardNewCustomerApp

package jcg.demo.activemq.app;

import jcg.demo.activemq.ActiveMQMessageProducer; import jcg.demo.jms.util.DataUtil;

public class OnBoardNewCustomerApp { public static void main(String[] args) { ActiveMQMessageProducer msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin"); try { msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC); msgQueueSender.sendMessage("CUSTOMER"); } catch (Exception e) { e.printStackTrace(); } }

}

Execute OnBoardNewCustomerApp sends a customer message to the VirtualTopic.Customer.Topic. However, since there is no consumer yet, so AMQ Broker will not send any message to the virtual topic queue yet.

9. Consume Message Application

9.1 ActiveMQMessageConsumer

A message consumer utilitizes AMQ java API.

ActiveMQMessgeConsumer

package jcg.demo.activemq;

import java.util.Enumeration;

import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

}

9.2 ActiveMQMessageConsumerMainApp

Create ActiveMQMessageConsumerMainApp to consume from various destinations.

ActiveMQMessageConsumerMainApp

package jcg.demo.activemq.app;

import javax.jms.JMSException;

import jcg.demo.activemq.ActiveMQMessageConsumer; import jcg.demo.jms.util.DataUtil;

public class ActiveMQMessageConsumerMainApp {

public static void main(String[] args) {

    consumeCustomerVTCQueue();
    consumerVTCQueueWithSelector();
    consumeGroup1Topic();
    consumeAllGroup2();
    consume_queue_with_prefetchsize();

}

private static void consumeCustomerVTCQueue() {
    // the message in the topic before this subscriber starts will not be
    // picked up.
    ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
            "admin");
    queueMsgListener.setDestinationName("Consumer.zheng." + DataUtil.CUSTOMER_VTC_TOPIC);

    try {
        queueMsgListener.run();
    } catch (JMSException e) {

        e.printStackTrace();
    }
}

private static void consumerVTCQueueWithSelector() {
    ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
            "admin");
    queueMsgListener.setDestinationName("VTC.DZONE." + DataUtil.TEST_VTC_TOPIC_SELECTOR);
    queueMsgListener.setSelector("action='DZONE'");
    try {
        queueMsgListener.run();
    } catch (JMSException e) {

        e.printStackTrace();
    }
}

private static void consumeGroup1Topic() {
    ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
            "admin");
    queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_TOPIC);
    queueMsgListener.setDestinationTopic(true);

    try {
        queueMsgListener.run();
    } catch (JMSException e) {

        e.printStackTrace();
    }
}

private static void consumeAllGroup2() {
    ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
            "admin");
    queueMsgListener.setDestinationName("*.group2.*");

    try {
        queueMsgListener.run();
    } catch (JMSException e) {

        e.printStackTrace();
    }
}

private static void exclusive_queue_Consumer() {
    ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
            "admin");
    queueMsgListener.setDestinationName(DataUtil.TEST_GROUP2_QUEUE_2 + "?consumer.exclusive=true");

    try {
        queueMsgListener.run();
    } catch (JMSException e) {

        e.printStackTrace();
    }
}

private static void consume_queue_with_prefetchsize() {
    ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
            "admin");
    queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_QUEUE_2 + "?consumer.prefetchSize=10");

    try {
        queueMsgListener.run();
    } catch (JMSException e) {

        e.printStackTrace();
    }
}

}

9.3 Execution Output

Now, started the ActiveMQMessageConsumerMainApp. Here is the application output:

ActiveMQMessageConsumerMainApp Output

main: ActiveMQMessageConsumer Waiting for messages at Consumer.zheng.VirtualTopic.Customer.Topic main: ActiveMQMessageConsumer Waiting for messages at VTC.DZONE.JCG.Mary.Topic main: ActiveMQMessageConsumer Waiting for messages at test.group1.topic main: ActiveMQMessageConsumer Waiting for messages at .group2. [ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ .group2.] - Headers: [ action=JCG actionId=40 ] Message: [ {"type":"NEWCUSTOMER","customerId":79} ] main: ActiveMQMessageConsumer Waiting for messages at test.group1.queue2?consumer.prefetchSize=10 [ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ test.group1.queue2?consumer.prefetchSize=10] - Headers: [ action=DEMO actionId=84 ] Message: [ {"type":"NEWCUSTOMER","customerId":28} ]

Now execute OnBoardNewConsumerApp a couple times. Here you see two lines printed out from the running consumer application console as the output below.

ActiveMQMessageConsumerMainApp Output Continue

[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=15 ] Message: [ {"type":"NEWCUSTOMER","customerId":51} ] [ActiveMQ Session Task-2]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=75 ] Message: [ {"type":"NEWCUSTOMER","customerId":73} ]

Always verify and confirm via the AMQ web console.

10. Integration with Spring JMS

Spring JMS provides a JMS integration framework that simplifies the use of the JMS API.

10.1 Add Spring JMS dependency

Add Spring JMS library to the project pom.xml.

pom.xml

org.springframework spring-core 4.1.5.RELEASE org.springframework spring-context 4.1.5.RELEASE org.springframework spring-jms 4.1.5.RELEASE

10.2 Configure Spring Beans

Add Spring JMS Beans to the context.

JmsConfig

package jcg.demo.spring.jms.config;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.jms.support.destination.DynamicDestinationResolver;

import jcg.demo.spring.jms.component.JmsExceptionListener;

@Configuration @EnableJms @ComponentScan(basePackages = "jcg.demo.spring.jms.component, jcg.demo.spring.service") public class JmsConfig {

private String concurrency = "1-10";
private String brokerURI = "tcp://localhost:61616";

@Autowired
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(JmsExceptionListener jmsExceptionListener) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConnectionFactory(jmsExceptionListener));
    factory.setDestinationResolver(destinationResolver());
    factory.setConcurrency(concurrency);
    factory.setPubSubDomain(false);
    return factory;
}

@Bean
@Autowired
public ConnectionFactory jmsConnectionFactory(JmsExceptionListener jmsExceptionListener) {
    return createJmsConnectionFactory(brokerURI, jmsExceptionListener);
}

private ConnectionFactory createJmsConnectionFactory(String brokerURI, JmsExceptionListener jmsExceptionListener) {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURI);
    activeMQConnectionFactory.setExceptionListener(jmsExceptionListener);

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory);
    return connectionFactory;
}

@Bean(name = "jmsQueueTemplate")
@Autowired
public JmsTemplate createJmsQueueTemplate(ConnectionFactory jmsConnectionFactory) {
    return new JmsTemplate(jmsConnectionFactory);
}

@Bean(name = "jmsTopicTemplate")
@Autowired
public JmsTemplate createJmsTopicTemplate(ConnectionFactory jmsConnectionFactory) {
    JmsTemplate template = new JmsTemplate(jmsConnectionFactory);
    template.setPubSubDomain(true);
    return template;
}

@Bean
public DestinationResolver destinationResolver() {
    return new DynamicDestinationResolver();
}

}

As you seen here, the order to create these Beans is managed by the Spring Dependency Injection.

10.3 MessageSender

A class to send messages based on Spring JMS framework.

MessageSender

package jcg.demo.spring.jms.component;

import java.util.Map;

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component;

@Component public class MessageSender {

@Autowired
private JmsTemplate jmsQueueTemplate;

@Autowired
private JmsTemplate jmsTopicTemplate;

public void postToQueue(final String queueName, final String message) {

    MessageCreator messageCreator = new MessageCreator() {

        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(message);
        }
    };

    jmsQueueTemplate.send(queueName, messageCreator);

}

public void postToQueue(final String queueName, Map headers, final String message) {

    jmsQueueTemplate.send(queueName, new MessageCreator() {

        @Override
        public Message createMessage(Session session) throws JMSException {
            Message msg = session.createTextMessage(message);
            headers.forEach((k, v) -> {
                try {
                    msg.setStringProperty(k, v);
                } catch (JMSException e) {
                    System.out.println(
                            String.format("JMS fails to set the Header value '%s' to property '%s'", v, k));
                }
            });
            return msg;
        }
    });
}

public void postToTopic(final String topicName, Map headers, final String message) {

    jmsTopicTemplate.send(topicName, new MessageCreator() {

        @Override
        public Message createMessage(Session session) throws JMSException {
            Message msg = session.createTextMessage(message);
            headers.forEach((k, v) -> {
                try {
                    msg.setStringProperty(k, v);
                } catch (JMSException e) {
                    System.out.println(
                            String.format("JMS fails to set the Header value '%s' to property '%s'", v, k));
                }
            });
            return msg;
        }
    });
}

}

As you seen here, the MessageSender is simpler than the ActiveMQMessageProducer created at step 8.1.

10.4 BillingAppListener

A listener listens the new customer events and integrates with billing system.

BillingAppListener

package jcg.demo.spring.jms.component;

import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component;

import jcg.demo.jms.util.DataUtil; import jcg.demo.model.CustomerEvent; import jcg.demo.spring.service.BillingService; import jcg.demo.spring.service.MessageTransformer;

@Component public class BillingAppListener {

@Autowired
private JmsTemplate jmsQueueTemplate;

@Autowired
private BillingService billingService;

@Autowired
private MessageTransformer msgTransformer;

private String queueName = "Consumer.Billing." + DataUtil.CUSTOMER_VTC_TOPIC;

public String receiveMessage() throws JMSException {
    System.out.println(Thread.currentThread().getName() + ": BillingAppListener receiveMessage.");

    Destination destination = new ActiveMQQueue(queueName);
    TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination);

    CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class);
    return billingService.handleNewCustomer(customerEvt);
}

}

As you seen here, this class is simpler than the ActiveMQMessageConsumer created at step 9.1.

10.5 SupportAppListener

A listener listens the new customer events and integrates with the support system.

SupportAppListener

package jcg.demo.spring.jms.component;

import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component;

import jcg.demo.jms.util.DataUtil; import jcg.demo.model.CustomerEvent; import jcg.demo.spring.service.MessageTransformer; import jcg.demo.spring.service.SupportService;

@Component public class SupportAppListener {

@Autowired
private JmsTemplate jmsQueueTemplate;

@Autowired
private SupportService supportService;

@Autowired
private MessageTransformer msgTransformer;

private String queueName = "Consumer.Support." + DataUtil.CUSTOMER_VTC_TOPIC;

public String receiveMessage() throws JMSException {
    System.out.println(Thread.currentThread().getName() + ": SupportAppListener receiveMessage." );

    Destination destination = new ActiveMQQueue(queueName);
    TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination);
    
    CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class);
    return supportService.handleNewCustomer(customerEvt);
}

}

10.6 ConfigBillingforNewCustomerApp

Configure a Spring context to consume the new customer events to integrates with the billing system.

ConfigBillingforNewCustomerApp

package jcg.demo.spring.jms.app;

import java.net.URISyntaxException;

import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Configuration;

import com.google.gson.Gson;

import jcg.demo.spring.jms.component.BillingAppListener; import jcg.demo.spring.jms.config.JmsConfig;

@Configuration public class ConfigBillingForNewCustomerApp { public static void main(String[] args) throws URISyntaxException, Exception { Gson gson = new Gson();

    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class);
    context.register(ConfigBillingForNewCustomerApp.class);

    try {

        BillingAppListener billingAppListener = (BillingAppListener) context.getBean("billingAppListener");

        System.out.println("ConfigBillingForewCustomerApp receives " + billingAppListener.receiveMessage());

    } finally {
        context.close();
    }
}

}

10.7 ConfigSupportforNewCustomerApp

Configure a Spring context to consume the new customer events to integrates with the support system.

ConfigSupportforNewCustomerApp

package jcg.demo.spring.jms.app;

import java.net.URISyntaxException;

import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Configuration;

import com.google.gson.Gson;

import jcg.demo.spring.jms.component.SupportAppListener; import jcg.demo.spring.jms.config.JmsConfig;

@Configuration public class ConfigSupportForNewCustomerApp { public static void main(String[] args) throws URISyntaxException, Exception { Gson gson = new Gson();

    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class);
    context.register(ConfigSupportForNewCustomerApp.class);

    try {
        SupportAppListener supportAppListener = (SupportAppListener) context.getBean("supportAppListener");
        System.out.println("supportAppListener receives " + supportAppListener.receiveMessage());

    } finally {
        context.close();
    }
}

}

10.8 Run as Distributed Systems

By far, we built one Java JMS application – OnBoardNewCustomerApp and two Spring JMS applications: ConfigBillingForNewCustomerApp and ConfigSupportForNewCustomerApp. Now it’s the time to run them together to enable the onborading customer process integrates with both billing and support system.

ConfigBillingForNewCustomerApp Output

main: ConfigBillingForNewCustomerApp receiveMessage.

ConfigSupportForNewCustomerApp Ourput

main: ConfigSupportForNewCustomerAppreceiveMessage.

Execute the OnBoardNewCustomerApp. Here you will see both consumer received the customer message and processed them.

ConfigBillingForNewCustomerApp Output Continue

ConfigBillingForewCustomerApp receives BillingService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)

ConfigSupportForNewCustomerApp Output Continue

ConfigSupportForNewCustomerApp receives SupportService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)

You just witnessed a working distributed system.

11. Integrating with Tomcat

11.1 Configure Tomcat Resource

Configure Tomcat context.xml with AMQ resource as below.

context.xml

11.2 Look up JNDI Resource

Use jndiContext.lookup to look up the ActiveMQConnectionFactory from the JNDI resource.

JmsConfig

private ConnectionFactory createJmsConnectionFactory(String jndiName, JMSExceptionListener exceptionListener) { CachingConnectionFactory connectionFactory = null; try { Context jndiContext = new InitialContext(); Context envContext = (Context) jndiContext.lookup("java:comp/env"); ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) envContext.lookup(jndiName); connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory); connectionFactory.setExceptionListener(exceptionListener); } catch (NamingException e) { String msg = String.format("Unable to get JMS container with name %s ", jndiName); throw new RuntimeException(msg, e); } return connectionFactory; }

12. Common Problems

There are three common problems when developing an ActiveMQ application.

12.1 Slow Consumer Application

When the AMQ console shows that there are growing numbers of pending messages. It indicates that the consumer’s application is slower than the producer publishes the messages. There are several ways to address this issue:

12.2 ActiveMQ Sends Unwanted Messages to Virtual Topic Queue

There a bug found in an AMQ broker which sends unwanted messages to the virtual queue when selector is defined. Our solution is let the applications handle the selector by setting the selectorAware to false.

12.3 Exception Handler

Some applications redeliver the message back to destination when it encounters an exception. This may jam up the destination if it fails again. The better solution is to have separate exception handler to deal with any exceptions.

13. Summary

In this tutorial, we outlined the steps to install the configure the AMQ server and demonstrated:

We also described three common problems when developing an AMQ application.

14. References

    1. ActionMQ in Action
    2. Apache ActiveMQ

15. Download the Source Code

This example builds several java applications to send and receive messages via the AMQ broker.