Apache ActiveMQ Distributed Queue Tutorial (original) (raw)

Apache ActiveMQ (AMQ) is an open source messaging server written in Java, which implements JMS 1.1 specifications. In this article, I will demonstrate how to utilize a distributed queue within a group of AMQ brokers.

1. Introduction

Apache ActiveMQ (AMQ) is a message broker which transfers the messages from the sender to the receiver.

A distributed queue is a single unit of Java Message Service (JMS) queues that are accessible as a single, logical queue to a client. The members of the unit are usually distributed across multiple servers within a cluster, with each queue member belonging to a separate JMS server.

AMQ provides network connectors to connect AMQ servers as a cluster. In a network of AMQ servers, the messages in a queue at Broker A can be consumed by a client from a different broker.

In this example, I will demonstrate how a distributed queue works in AMQ brokers.

2. Apache ActiveMQ Server Installation

Follow these instructions to install an AMQ server. Then use the AMQ admin command: activemq-admin create ${brokerName} to create a server instance.

Click here for details.

3. Producer Java Application

3.1 MessageProducerApp

Create MessageProducerApp.

MessageProducerApp.java

package jcg.demo.activemq;

import jcg.demo.util.DataUtils; import jcg.demo.util.InputData;

public class MessageProducerApp {

public static void main(String[] args) {
    InputData brokerTestData = DataUtils.readTestData();

    if (brokerTestData == null) {
        System.out.println("Wrong input");
    } else {
        QueueMessageProducer queProducer = new QueueMessageProducer(brokerTestData.getBrokerUrl(), DataUtils.ADMIN,
                DataUtils.ADMIN);
        queProducer.sendDummyMessages(brokerTestData.getQueueName());
    }
}

}

3.2 QueueMessageProducer

Create QueueMessageProducer.

QueueMessageProducer.java

package jcg.demo.activemq;

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import jcg.demo.util.DataUtils;

/**

}

3.3 Export MessageProducerApp as a Jar

Export MessageProducerApp as activemq-msgproducerApp.jar

4. Consumer Java Application

4.1 MessageConsumerApp

Create MessageConsumerApp.

MessageConsumerApp.java

package jcg.demo.activemq;

import javax.jms.JMSException;

import jcg.demo.util.DataUtils; import jcg.demo.util.InputData;

public class MessageConsumerApp {

public static void main(String[] args) {

    InputData brokerTestData = DataUtils.readTestData();
    if (brokerTestData == null) {
        System.out.println("Wrong input");
    } else {
        QueueMessageConsumer queueMsgListener = new QueueMessageConsumer(brokerTestData.getBrokerUrl(), DataUtils.ADMIN,
                DataUtils.ADMIN);
        queueMsgListener.setDestinationName(brokerTestData.getQueueName());

        try {
            queueMsgListener.run();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

}

4.2 QueueMessageConsumer

Create QueueMessageConsumer.

QueueMessageConsumer.java

package jcg.demo.activemq;

import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTextMessage;

/**

}

4.3 Common Utils

Create DataUtils.

DataUtils.java

package jcg.demo.util;

import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Scanner;

import org.springframework.util.StringUtils;

/**

}

Create InputData to hold the testing data.

InputData.java

package jcg.demo.util;

/**

}

4.4 Export MessageConsumerApp as a Jar

Export MessageConsumerApp as activemq-msgConsumerApp.jar

5. Distributed Queue in a Static Network of Brokers

In this example, Producer-1 sends messages to Queue.1 at Broker-1. Consumer-1 receives the messages from Queue.1 at Broker-3. Queue.1 is the distributed queue. It’s useful when the producer and consumer applications cannot be in the same AMQ server.

Image below shows a distribute queue (queue.1) in Brokers-1 and Broker-3.

Figure 5 distributed queue 1

Figure 5 distributed queue 1

5.1 Configure a Static Network of Brokers``

Configure a network of Broker-1 and Broker-3:

Broker name Home Path Openwire Port Web Port Data Path
broker-1 ..\cluster\broker-1 61816 8861 ..\data
broker-3 ..\cluster\broker-3 61516 5161 \broker-3\data

Click here for the configuration details.

5.2 Verify the AMQ Brokers – Part I

Start both Broker-1 and Broker-3.

Navigate to the AMQ web console to view the connections details.

Image 5.2.1 Broker-1(8861) Connections

Figure Broker -1 connection

Figure 5.2.1 Broker-1 connection

Note: Broker-1 connection client name is defined at step 4.2.

Image 5.2.2 Broker-3 (5161) Connections

Figure Broker-3 connection

Figure 5.2.2 Broker-3 connection

Note: Broker-3 has a network connector to Broker-1.

5.3 Execute the Consumer Application

Enter java -jar activemq-msgConsumerApp.jar to start MessageConsumerApp.

MessageConsumerApp Output

C:\Users\shu.shan\Desktop>java -jar activemq-msgConsumerApp.jar Enter Broker URL(tcp://$host:$port): tcp://localhost:61816 Enter Queue Name: queue.1 QueueMessageConsumer Waiting for messages at queue='queue.1' broker='tcp://localhost:61816'

5.4 Execute the Publisher Application

While MessageConsumerApp is running, enter java -jar activemq-msgproducerApp to start MessageProducerApp.

MessageProducerApp output

C:\Users\shu.shan\Desktop>java -jar activemq-msgproducerApp.jar Enter Broker URL(tcp://$host:$port): tcp://localhost:61516 Enter Queue Name: queue.1 QueueMessageProducer started tcp://localhost:61516

Image below shows both applications are running.

Figure 5.5 Execution of Application

Figure 5.5 Execution of Application

5.5 Verify the AMQ Brokers – Part II

Navigate to Broker-1 web console, click queues to see queue.1, lastly, click on its active consumers link.

Image Below shows queue.1‘s active consumer – Mzhengclient-queue.1_61816 at broker-1.

Figure Broker-1 consumer

Figure 5.5.1. Broker-1 queue.1 consumer

Image Below shows queue.1‘s active consumer – nc:61516-61816_broker-1_inbound_broker-3 at broker-3.

Figure 5.5.2 Broker-3 Consumer

Figure 5.5.2 Broker-3 Consumer

Note: queue.1 is the distributed queue via the broker’s connect connector.

6. Distributed Queue in a Dynamic Network of Brokers

In this example, Producer-1 sends messages to queue.1 at Dynamic-Broker1, Producer-2 also sends messages to queue.1 at Dynamic-Broker2, Consumer-1 receives the messages from Queue.1 at Dynamic-Broker3. Queue.1 is the distributed queue. It’s useful to share the load among multiple producers and support in-order delivery when processing the messages.

Diagram below shows a distributed queue(Queue.1) among three brokers.

Figure 6 distributed queue 2

Figure 6 distributed queue

6.1 Configure a Dynamic Network of Brokers

Configure a dynamic network of brokers with three brokers:

Broker name Home Path Openwire Port Web Port Data Path
dynamic-broker1 ..\cluster\dynamic-broker1 61626 8166 ..\dynamic-broker1\data
dynamic-broker2 ..\cluster\dynamic-broker2 61636 8164 ..\dynamic-broker2\data
dynamic-broker3 ..\cluster\dynamic-broker3 61646 8165 ..\dynamic-broker3\data

Click here for the configuration details.

6.2 Verify the AMQ Brokers – Part I

Start all three dynamic-brokers. Navigate to the AMQ web console to view the connections details.

Image below shows Dynamic-broker1 (8166) connections.

Figure 6.2 Dynamic-Broker1 Connections

6.3 Execute the Consumer Application

Enter java -jar activemq-msgConsumerApp.jar to start MessageConsumerApp at Dynnamic-broker2.

MessageConsumerApp Output

C:\Users\shu.shan\Desktop>java -jar activemq-msgConsumerApp.jar Enter Broker URL(tcp://$host:$port): tcp://localhost:61636 Enter Queue Name: queue.1 QueueMessageConsumer Waiting for messages at queue='queue.1' broker='tcp://localhost:61636'

6.4 Execute the Publisher Application

While MessageConsumerApp is running, enter java -jar activemq-msgproducerApp to start MessageProducerApp twice, one for Dynamic-broker1, the other for Dynamic-broker3.

Figure 6.4 Application Execution Output

Note: The consumer listens to queue.1 atDynamic-Broker2 while two publishers publish the messages to queue.1 at Dynamic-Broker1 and Dynamic-Broker3. The consumer processed the messages based on the message’s born time.

6.5 Verify the AMQ Brokers – Part II

Navigate to Dynamic-Broker2 web console, click queues to see queue.1, lastly, click on its active consumers link.

Image below shows queue.1‘s active consumer – Mzhengclient-queue.1_61636 at broker-3.

Figure 6.5.1 Broker-3 Consumer

Figure 6.5.1 Dynamic-Broker2 Consumer

Image below shows queue.1 at Dynamic-broker3 has two active consumers via the network of brokers.

Figure 6.5.2 Broker-2 consumer

Figure 6.5.2 Dynamic-Broker3 consumers

Note: queue.1 is the distributed queue via the broker’s connect connector.

7. Summary

In this article, I demonstrated two cases of a distributed queue by utilizing AMQ with a network of brokers. AMQ network of brokers also provides high availability to the client. Click here for more details on high availability.

Distributed Queue provides support for deliveries where subscribers receives messages in the same order they have been published. Besides Apache ActiveMQ, IBM MQ, RabbitMQ, HornetQ, and Apache Kafka also support Distributed Queues.

8. References

9. Download the Source Code

This example builds two java applications to send and receive messages via the AMQ broker.