Apache ActiveMQ Failover Example (original) (raw)

Apache ActiveMQ is an open source messaging server written in Java, which implements JMS 1.1 specifications. In this example, I will demonstrate how to configure a group of AMQ brokers to make the system fault-tolerant.

Tip
If you already know how to install ActiveMQ, you can skip the first two chapters.

1. Introduction

Apache ActiveMQ (AMQ) is a message broker which transfers the messages from the sender to the receiver.

Failover is a procedure by which a system automatically transfers control to a duplicate system when it detects a fault or failure.

AMQ failover transport protocol enables an application automatically reconnect to a broker when a failure is detected while establishing a connection.

In this example, I will demonstrate:

2. Apache ActiveMQ Server Installation

Follow these instructions to install an AMQ server. Then use AMQ admin command: activemq-admin create ${brokerName} to create a server instance.

Create eight AMQ server instances:

activemq-admin create ..\standalone\broker1
activemq-admin create ..\standalone\broker2
activemq-admin create ..\cluster\broker-1
activemq-admin create ..\cluster\broker-2
activemq-admin create ..\cluster\broker-3
activemq-admin create ..\cluster\dynamic-broker1
activemq-admin create ..\cluster\dynamic-broker2
activemq-admin create ..\cluster\dynamic-broker3

3. Apache ActiveMQ Server Configuration

AMQ activemq-admin command copies the AMQ server to the user defined location. Need to configure the default value with the steps below:

  1. Navigate to the AMQ instance folder. Ex: standalone\broker1
  2. Edit the activemq.xml at transportConnector and networkConnector
  3. Edit the jetty.xml with a different web port number
  4. Edit the Windows batch file at ACTIVEMQ_CONF and ACTIVEMQ_DATA

3.1 Two Standalone ActiveMQ Brokers

Use the steps above, configure two standalone AMQ brokers:

Broker Name Home Path Openwire Port Web Port Data Path
broker1 ..\standalone\broker1 61616 8161 broker1\data
broker2 ..\standalone\broker2 61716 7161 broker2\data

Two Standalone AMQ Brokers

Here is an example of broker1.bat file.

broker1.bat

@echo off

set ACTIVEMQ_HOME="C:/MaryZheng/tools/apache-activemq-5.15.0" set ACTIVEMQ_BASE="C:/MaryZheng/tools/apache-activemq-5.15.0/standalone/broker1" set ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf set ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data

set PARAM=%1 :getParam shift if "%1"=="" goto end set PARAM=%PARAM% %1 goto getParam :end

%ACTIVEMQ_HOME%/bin/activemq %PARAM%

Start broker1 with the command: broker1.bat start.

broker1 server log

C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\bin>broker1.bat start Java Runtime: Oracle Corporation 1.8.0_31 C:\MaryZheng\tools\java\jdk1.8.0_31\jre Heap sizes: current=1005056k free=984084k max=1005056k JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=C:/MaryZheng/too ls/apache-activemq-5.15.0/standalone/broker1/conf\login.config -Dactivemq.classpath=C:/MaryZheng/tools/apache-activemq-5.15.0/standalone/broker1/conf;C:/MaryZhe ng/tools/apache-activemq-5.15.0/standalone/broker1/conf;C:/MaryZheng/tools/apache-activemq-5.15.0/conf; -Dactivemq.home=C:/MaryZheng/tools/apache-activemq-5.15. 0 -Dactivemq.base=C:/MaryZheng/tools/apache-activemq-5.15.0/standalone/broker1 -Dactivemq.conf=C:/MaryZheng/tools/apache-activemq-5.15.0/standalone/broker1/conf -Dactivemq.data=C:/MaryZheng/tools/apache-activemq-5.15.0/standalone/broker1/data -Djava.io.tmpdir=C:/MaryZheng/tools/apache-activemq-5.15.0/standalone/broker1 /data\tmp Extensions classpath: [C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\lib,C:\MaryZheng\tools\apache-activemq-5.15.0\lib,C:\MaryZheng\tools\apache-activemq-5.15.0\stan dalone\broker1\lib\camel,C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\lib\optional,C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1
lib\web,C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\lib\extra,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\camel,C:\MaryZheng\tools\apache-act ivemq-5.15.0\lib\optional,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\web,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\extra] ACTIVEMQ_HOME: C:\MaryZheng\tools\apache-activemq-5.15.0 ACTIVEMQ_BASE: C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1 ACTIVEMQ_CONF: C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\conf ACTIVEMQ_DATA: C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\data Loading message broker from: xbean:activemq.xml INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@1753acfe: startup date [Sat Dec 16 07:05:53 CST 2017]; root of context hierarchy INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\data\kahadb] INFO | KahaDB is version 6 INFO | Recovering from the journal @1:65574 INFO | Recovery replayed 15 operations from the journal in 0.032 seconds. INFO | PListStore:[C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\data\broker1\tmp_storage] started INFO | Apache ActiveMQ 5.15.0 (broker1, ID:SL2LS431841-50062-1513429555523-0:1) is starting INFO | Listening for connections at: tcp://SL2LS431841:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector openwire started INFO | Apache ActiveMQ 5.15.0 (broker1, ID:SL2LS431841-50062-1513429555523-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\data\kahadb only has 1146 mb of usable space. - resetting to maximum available disk space: 1146 mb WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.0\standalone\broker1\data only has 1146 mb of usable space. - resetting to maximum available disk space: 1146 mb INFO | No Spring WebApplicationInitializer types detected on classpath INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/ INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/ INFO | Initializing Spring FrameworkServlet 'dispatcher' INFO | No Spring WebApplicationInitializer types detected on classpath INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

Repeat the steps to configure other brokers.

Note: Pay special attention to the highlighted lines. Verify the AMQ via web console.

3.2. Master/Slave ActiveMQ Brokers

In a Master/Slave topology, the Master provides services to the client, the Slave is at standby mode and gets promoted when the Master fails. There are three kinds of Master/Slave configurations:

Configure Master/Slave brokers with “Shared File System”:

Broker Name Home Path Openwire Port Web Port Data Path
broker-1 ..\cluster\broker-1 61816 8861 ..\data
broker-2 ..\cluster\broker-2 61826 8862 ..\data

Note: The configuration steps for Mater/Slave are same as the standalone server. The difference is that Master and Slave must share the same data.

Slave server log

C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2\bin>broker-2.bat start Java Runtime: Oracle Corporation 1.8.0_31 C:\MaryZheng\tools\java\jdk1.8.0_31\jre Heap sizes: current=1005056k free=984084k max=1005056k JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/broker-2/conf\login.config -Dactivemq.classpath=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/broker-2/conf;C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/broker-2/conf;C:/MaryZheng/tools/apache-activemq-5.15.0/conf; -Dactivemq.home=C:/MaryZheng/tools/apache-activemq-5.15.0 -Dac tivemq.base=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/broker-2 -Dactivemq.conf=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/broker-2/conf -Dactivemq.data=C:/MaryZheng/tools/apache-activemq-5.15.0/data -Djava.io.tmpdir=C:/MaryZheng/tools/apache-activemq-5.15.0/data\tmp Extensions classpath: [C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2\lib,C:\MaryZheng\tools\apache-activemq-5.15.0\lib,C:\MaryZheng\tools\apache-activemq-5.15.0\cluste r\broker-2\lib\camel,C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2\lib\optional,C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2\lib\web, C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2\lib\extra,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\camel,C:\MaryZheng\tools\apache-activemq-5.15 .0\lib\optional,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\web,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\extra] ACTIVEMQ_HOME: C:\MaryZheng\tools\apache-activemq-5.15.0 ACTIVEMQ_BASE: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2 ACTIVEMQ_CONF: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\broker-2\conf ACTIVEMQ_DATA: C:\MaryZheng\tools\apache-activemq-5.15.0\data Loading message broker from: xbean:activemq.xml INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@1753acfe: startup date [Sat Dec 16 08:05:25 CST 2017]; root of context hierarchy INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\MaryZheng\tools\apache-activemq-5.15.0\data\kahadb] INFO | Database C:\MaryZheng\tools\apache-activemq-5.15.0\data\kahadb\lock is locked by another server. This broker is now in slave mode waiting a lock to be acquired

Stop the Master and watch the Slave get promoted to the Master.

Slave broker promotes to Master log

INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\MaryZheng\tools\apache-activemq-5.15.0\data\kahadb] INFO | Database C:\MaryZheng\tools\apache-activemq-5.15.0\data\kahadb\lock is locked by another server. This broker is now in slave mode waiting a lock to be acquired INFO | KahaDB is version 6 INFO | Recovering from the journal @1:31536026 INFO | Recovery replayed 97 operations from the journal in 0.042 seconds. INFO | PListStore:[C:\MaryZheng\tools\apache-activemq-5.15.0\data\broker-2\tmp_storage] started INFO | Apache ActiveMQ 5.15.0 (broker-2, ID:SL2LS431841-50406-1513433598677-0:1) is starting INFO | Listening for connections at: tcp://SL2LS431841:61826?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector openwire started INFO | Apache ActiveMQ 5.15.0 (broker-2, ID:SL2LS431841-50406-1513433598677-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 31 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.0\data\kahadb only has 1173 mb of usable space. - resetting to maximum available disk space: 1173 mb WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.0\data only has 1141 mb ofusable space. - resetting to maximum available disk space: 1141 mb INFO | No Spring WebApplicationInitializer types detected on classpath INFO | ActiveMQ WebConsole available at http://0.0.0.0:8961/ INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8961/api/jolokia/ INFO | Initializing Spring FrameworkServlet 'dispatcher' INFO | No Spring WebApplicationInitializer types detected on classpath INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

3.3 Network of Brokers

AMQ provides a networkConnector to connect two brokers together and three options at the transportConnector : updateClusterClients, rebalanceClusterClients and updateClusterClientsOnRemove.

3.3.1 Static Network of Brokers

In a static network of brokers, the networkConnector connects the broker to a list of brokers.

Configure a network of three brokers:

Broker Name Home Path Openwire Port Web Port Data Path
broker-1 ..\cluster\broker-1 61816 8162 ..\data
broker-2 ..\cluster\broker-2 61826 8961 ..\data
broker-3 ..\cluster\broker-3 61516 5161 \broker-3\data

The static network of brokers’ activemq.xml.

activemq.xml

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>
<bean class="io.fabric8.insight.log.log4j.Log4jLogQuery" destroy-method="stop" id="logQuery" init-method="start" lazy-init="false" scope="singleton">
</bean>

<!--
    The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker-3" dataDirectory="${activemq.data}">

<destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                 <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
                <virtualTopic name="JCG.>" prefix="VTC.*." selectorAware="true"/>
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">">
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                     slow topic consumers to block producers and affect other consumers
                     by limiting the number of messages that are retained
                     For more information, see:

                     http://activemq.apache.org/slow-consumer-handling.html

                -->
              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>


    <!--
        The managementContext is used to configure how ActiveMQ is exposed in
        JMX. By default, ActiveMQ uses the MBean server that is started by
        the JVM. For more information, see:

        http://activemq.apache.org/jmx.html
    -->
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext> 

    <!--
        Configure message persistence for the broker. The default persistence
        mechanism is the KahaDB store (identified by the kahaDB tag).
        For more information, see:

        http://activemq.apache.org/persistence.html
    -->
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>


      <!--
        The systemUsage controls the maximum amount of space the broker will
        use before disabling caching and/or slowing down producers. For more information, see:
        http://activemq.apache.org/producer-flow-control.html
      -->
      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>

 <networkConnectors>
  <networkConnector name="amq3-nc" 
    uri="static:(failover:(tcp://0.0.0.0:61816,tcp://0.0.0.0:61826))" 
    dynamicOnly="true" 
    networkTTL="3" 
    duplex="true"/>
</networkConnectors>

    <!--
        The transport connectors expose ActiveMQ over a given protocol to
        clients and other brokers. For more information, see:

        http://activemq.apache.org/configuring-transports.html
    -->
    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" rebalanceClusterClients="true" updateClusterClients="true" updateClusterClientsOnRemove="true" uri="tcp://0.0.0.0:61516?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>


    <!-- destroy the spring context on shutdown to stop jetty -->
    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/>
    </shutdownHooks>

</broker>

<!--
    Enable web consoles, REST and Ajax APIs and demos
    The web consoles requires by default login, you can disable this in the jetty.xml file

    Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>

Keep the Master/Slave server running and start Broker 3.
The Master broker server log shows the it has connected to Broker 3.

master log

INFO | Connector vm://broker-2 started INFO | Started responder end of duplex bridge amq3-nc@ID:SL2LS431841-64674-1513436259188-0:1 INFO | Network connection between vm://broker-2#0 and tcp:///192.168.1.109:64676@61826 (broker-3) has been stablished.

Line 3: the network bridge is established between the broker 3 and 2

Note: Verify the AMQ via web console, you should see the network connector detail under Connection tab.

3.3.2. Dynamic Network of Brokers

Dynamic network of brokers auto-detects the broker within the network. Configure three brokers:

Broker Name Home Path Openwire Port Web Port Data Path
broker-1 ..\cluster\dynamic-broker1 61626 8163 ..\dynamic-broker1\data
broker-2 ..\cluster\dynamic-broker2 61636 8164 ..\dynamic-broker2\data
broker-3 ..\cluster\dynamic-broker3 61646 8165 ..\dynamic-broker3\data

The image below shows six brokers under the cluster directory after steps 3.2 and 3.3.

Cluster of AMQ Brokers

Configuration file example for dynamic Broker 1.

dynamic broker1 activemq.xml

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>
<bean class="io.fabric8.insight.log.log4j.Log4jLogQuery" destroy-method="stop" id="logQuery" init-method="start" lazy-init="false" scope="singleton">
</bean>

<!--
    The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="dynamic-broker1" dataDirectory="${activemq.data}">

<destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                 <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
                <virtualTopic name="JCG.>" prefix="VTC.*." selectorAware="true"/>
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">">
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                     slow topic consumers to block producers and affect other consumers
                     by limiting the number of messages that are retained
                     For more information, see:

                     http://activemq.apache.org/slow-consumer-handling.html

                -->
              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>


    <!--
        The managementContext is used to configure how ActiveMQ is exposed in
        JMX. By default, ActiveMQ uses the MBean server that is started by
        the JVM. For more information, see:

        http://activemq.apache.org/jmx.html
    -->
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext> 

    <!--
        Configure message persistence for the broker. The default persistence
        mechanism is the KahaDB store (identified by the kahaDB tag).
        For more information, see:

        http://activemq.apache.org/persistence.html
    -->
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>


      <!--
        The systemUsage controls the maximum amount of space the broker will
        use before disabling caching and/or slowing down producers. For more information, see:
        http://activemq.apache.org/producer-flow-control.html
      -->
      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>

<networkConnectors>
      <networkConnector uri="multicast://default"
        dynamicOnly="true"
        networkTTL="3"
        prefetchSize="1"
        decreaseNetworkConsumerPriority="true" />
 </networkConnectors>
  
    <!--
        The transport connectors expose ActiveMQ over a given protocol to
        clients and other brokers. For more information, see:

        http://activemq.apache.org/configuring-transports.html
    -->
    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire"  rebalanceClusterClients="true" updateClusterClients="true" updateClusterClientsOnRemove="true" uri="tcp://0.0.0.0:61626?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default" />
    </transportConnectors>


    <!-- destroy the spring context on shutdown to stop jetty -->
    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/>
    </shutdownHooks>

</broker>

<!--
    Enable web consoles, REST and Ajax APIs and demos
    The web consoles requires by default login, you can disable this in the jetty.xml file

    Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>

Start three dynamic brokers.

Dynamic broker3 server.log

C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\bin>dynamic-broker3.bat start Java Runtime: Oracle Corporation 1.8.0_31 C:\MaryZheng\tools\java\jdk1.8.0_31\jre Heap sizes: current=1005056k free=984084k max=1005056k JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=C:/MaryZheng/too ls/apache-activemq-5.15.0/cluster/dynamic-broker3/conf\login.config -Dactivemq.classpath=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/dynamic-broker3/conf; C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/dynamic-broker3/conf;C:/MaryZheng/tools/apache-activemq-5.15.0/conf; -Dactivemq.home=C:/MaryZheng/tools/apache-activemq-5.15.0 -Dactivemq.base=C:/MaryZheng/tools/apache-activemq-5.15.0/clust r/dynamic-broker3 -Dactivemq.conf=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/dynamic-broker3/conf -Dactivemq.data=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/dynamic-broker3/data -Djava.io.tmpdir=C:/MaryZheng/tools/apache-activemq-5.15.0/cluster/dynamic-broker3/data\tmp Extensions classpath: [C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\lib,C:\MaryZheng\tools\apache-activemq-5.15.0\lib,C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\lib\camel,C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\lib\optional,C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\lib\web,C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\lib\extra,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\camel,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\optional,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\web,C:\MaryZheng\tools\apache-activemq-5.15.0\lib\extra] ACTIVEMQ_HOME: C:\MaryZheng\tools\apache-activemq-5.15.0 ACTIVEMQ_BASE: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3 ACTIVEMQ_CONF: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\conf ACTIVEMQ_DATA: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\data Loading message broker from: xbean:activemq.xml INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@7c16905e: startup date [Sat Dec 16 09:48:42 CST 2017]; root of context hierarchy INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\data\kahadb] INFO | PListStore:[C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\data\dynamic-broker3\tmp_storage] started INFO | Apache ActiveMQ 5.15.0 (dynamic-broker3, ID:SL2LS431841-65244-1513439325237-0:1) is starting INFO | Listening for connections at: tcp://SL2LS431841:61646?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector openwire started INFO | Network Connector DiscoveryNetworkConnector:NC:BrokerService[dynamic-broker3] started INFO | Apache ActiveMQ 5.15.0 (dynamic-broker3, ID:SL2LS431841-65244-1513439325237-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\data\kahadb only has 1154 mb of usable space. - resetting to maximum available disk space: 1154 mb WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.0\cluster\dynamic-broker3\data only has 1154 mb of usable space. - resetting to maximum available disk space: 1154 mb INFO | Establishing network connection from vm://dynamic-broker3 to tcp://SL2LS431841:61646 INFO | Connector vm://dynamic-broker3 started INFO | dynamic-broker3 Shutting down NC INFO | dynamic-broker3 bridge to Unknown stopped WARN | Transport Connection to: tcp://192.168.1.109:65245 failed: java.io.EOFException INFO | Connector vm://dynamic-broker3 stopped INFO | No Spring WebApplicationInitializer types detected on classpath INFO | ActiveMQ WebConsole available at http://0.0.0.0:8164/ INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8164/api/jolokia/ INFO | Initializing Spring FrameworkServlet 'dispatcher' INFO | No Spring WebApplicationInitializer types detected on classpath INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml INFO | Establishing network connection from vm://dynamic-broker3 to tcp://SL2LS431841:61636 INFO | Connector vm://dynamic-broker3 started INFO | Network connection between vm://dynamic-broker3#2 and tcp://SL2LS431841/192.168.1.109:61636@65254 (dynamic-broker2) has been established. INFO | Establishing network connection from vm://dynamic-broker3 to tcp://SL2LS431841:61626 INFO | Network connection between vm://dynamic-broker3#4 and tcp://SL2LS431841/192.168.1.109:61626@65266 (dynamic-broker1) has been established.

Note: Try to stop any of these dynamic brokers and watch the other broker’s server log. Verify the connection via AMQ web console.

4. Create Java Client Applications

Create two Java AMQ client applications. One is a producer application which sends ten dummy messages to a test.queue. The other is a consumer application which consumes the messages from the test.queue.

4.1 Common Data

Create a common data class to hold the data used in the Demo.

DemoDataUtils

package jcg.demo.util;

import java.util.Random; import java.util.Scanner;

/**

}

4.2 QueueMessageProducer

Create QueueMessageProducer.

QueueMessageProducer

package jcg.demo.activemq.failover;

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import jcg.demo.util.DemoDataUtils;

/**

}

4.3 MessageProducerApp

A Java application which sends ten messages at 10 seconds intervals.

MessageProducerApp

package jcg.demo.activemq.failover;

import jcg.demo.util.DemoDataUtils;

public class MessageProducerApp {

public static void main(String[] args) {
    String failoverUrl = DemoDataUtils.readFailoverURL();
    
    if (failoverUrl == null) {
        System.out.println("Wrong input");
    } else {
        QueueMessageProducer queProducer = new QueueMessageProducer(failoverUrl, "admin", "admin");
        queProducer.sendDummyMessages(DemoDataUtils.DESTINATION);
    }
}

}

4.4 QueueMessageConsumer

Create QueueMessageConsumer.

QueueMessageConsumer

package jcg.demo.activemq.failover;

import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

}

4.5 MessageConsumerApp

A Java application which consumes messages at 10 seconds intervals.

MessageConsumerApp

package jcg.demo.activemq.failover;

import javax.jms.JMSException;

import jcg.demo.util.DemoDataUtils;

public class MessageConsumerApp {

public static void main(String[] args) {

    String failoverUrl = DemoDataUtils.readFailoverURL();

    if (failoverUrl == null) {
        System.out.println("Wrong input");
    } else {
        QueueMessageConsumer queueMsgListener = new QueueMessageConsumer(failoverUrl, "admin", "admin");
        queueMsgListener.setDestinationName(DemoDataUtils.DESTINATION);

        try {
            queueMsgListener.run();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

}

5. Demo Time

This is the most fun moment of this example. I will show the Java application built at the step 4 continue functions when the connected AMQ broker is down.
Start eight brokers configured at step 3, then start the MessageConsumerApp and MessageProducerApp. While both programs are running, stop the connected AMQ broker. Both applications auto-detect the failure and then reconnect to a different broker.

5.1 Two Standalone Brokers

Below you can find the MessageProducerApp output.

MessageProducerApp output

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 1 QueueMessageProducer started failover:(tcp://localhost:61616,tcp://localhost:61716)?timeout=30000 INFO | Successfully connected to tcp://localhost:61616 WARN | Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {} java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:745) INFO | Successfully reconnected to tcp://localhost:61716 QueueMessageProducer completed

Verify via AMQ web console, notice that there is one message at broker 1 and 9 messages at broker 2. ( the number can vary depends on the shut down timing).

Below you can find the MessageConsumerApp output.

MessageConsumerApp output

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 1 INFO | Successfully connected to tcp://localhost:61716 QueueMessageConsumer Waiting for messages at test.queue failover:(tcp://localhost:61616,tcp://localhost:61716)?timeout=30000 QueueMessageConsumer Received message [ dummy message 1 ] QueueMessageConsumer Received message [ dummy message 2 ] QueueMessageConsumer Received message [ dummy message 3 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 5 ] QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 8 ] QueueMessageConsumer Received message [ dummy message 9 ] QueueMessageConsumer Received message [ dummy message 0 ] WARN | Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {} java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:745) INFO | Successfully reconnected to tcp://localhost:61716

5.2 Master/Slave Brokers

Repeat above steps for the Master/Slave brokers.

Execution for Master/Slave brokers

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 2 QueueMessageProducer started failover:(tcp://localhost:61816,tcp://localhost:61826)?timeout=30000 INFO | Successfully connected to tcp://localhost:61826 WARN | Transport (tcp://localhost:61826) failed , attempting to automatically reconnect: {} java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:745) INFO | Successfully reconnected to tcp://localhost:61816 QueueMessageProducer completed

  *****

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 2 INFO | Successfully connected to tcp://localhost:61826 QueueMessageConsumer Waiting for messages at test.queue failover:(tcp://localhost:61816,tcp://localhost:61826)?timeout=30000 QueueMessageConsumer Received message [ dummy message 0 ] WARN | Transport (tcp://localhost:61826) failed , attempting to automatically reconnect: {} java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:745) QueueMessageConsumer Received message [ dummy message 1 ] INFO | Successfully reconnected to tcp://localhost:61816 WARN | ID:SL2LS431841-51754-1513648383975-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-51754-1513648383975-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 6, responseRequired = true, messageId = ID:SL2LS431841-50044-1513371045088-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:SL2LS431841-50044-1513371045088-1:1:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513371056504, arrival = 0, brokerInTime = 1513371056505, brokerOutTime = 1513648412924, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@59b92659, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 1}, redeliveryCounter = 0} QueueMessageConsumer Received message [ dummy message 2 ] QueueMessageConsumer Received message [ dummy message 3 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 5 ] QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 8 ] QueueMessageConsumer Received message [ dummy message 9 ] QueueMessageConsumer Received message [ dummy message 0 ] QueueMessageConsumer Received message [ dummy message 1 ] QueueMessageConsumer Received message [ dummy message 2 ] QueueMessageConsumer Received message [ dummy message 3 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 5 ] QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 8 ]

5.3 Static Network of Brokers

Repeat above steps for the static network of brokers.

Execution for static network of brokers

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 3 QueueMessageProducer started failover:(tcp://localhost:61516)?timeout=30000 INFO | Successfully connected to tcp://localhost:61516 INFO | Successfully reconnected to tcp://SL2LS431841:61516 WARN | Transport (tcp://SL2LS431841:61516) failed , attempting to automatically reconnect: {} java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:745) INFO | Successfully reconnected to tcp://SL2LS431841:61816 QueueMessageProducer completed

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 3 INFO | Successfully connected to tcp://localhost:61516 INFO | Successfully reconnected to tcp://SL2LS431841:61816 QueueMessageConsumer Waiting for messages at test.queue failover:(tcp://localhost:61516)?timeout=30000 QueueMessageConsumer Received message [ dummy message 0 ] QueueMessageConsumer Received message [ dummy message 0 ] QueueMessageConsumer Received message [ dummy message 1 ] QueueMessageConsumer Received message [ dummy message 2 ] QueueMessageConsumer Received message [ dummy message 3 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 5 ] QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 8 ] QueueMessageConsumer Received message [ dummy message 9 ]

5.4 Dynamic Network of Brokers

Repeat above steps for the dynamic network of brokers.

Execution output for dynamic network of brokers

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 4 QueueMessageProducer started failover:(tcp://localhost:61646)?timeout=30000 INFO | Successfully connected to tcp://localhost:61646 INFO | Successfully reconnected to tcp://SL2LS431841:61626 WARN | Transport (tcp://SL2LS431841:61626) failed , attempting to automatically reconnect: {} java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Thread.java:745) INFO | Successfully reconnected to tcp://SL2LS431841:61646 QueueMessageProducer completed

 *****

Enter Demo Type for Failover: 1 - Stand Alone Brokers 2 - Master-Slave Brokers 3 - Network of Brokers(Static) 4 - Network of Brokers(Dynamic): 4 INFO | Successfully connected to tcp://localhost:61636 INFO | Successfully reconnected to tcp://SL2LS431841:61626 QueueMessageConsumer Waiting for messages at test.queue failover:(tcp://localhost:61636)?timeout=30000 QueueMessageConsumer Received message [ dummy message 0 ] QueueMessageConsumer Received message [ dummy message 1 ] QueueMessageConsumer Received message [ dummy message 2 ] QueueMessageConsumer Received message [ dummy message 3 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 5 ] INFO | Successfully reconnected to tcp://SL2LS431841:61636 QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 9 ] QueueMessageConsumer Received message [ dummy message 5 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 8 ] QueueMessageConsumer Received message [ dummy message 9 ] QueueMessageConsumer Received message [ dummy message 8 ] WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 11, responseRequired = true, messageId = ID:SL2LS431841-54039-1513676753374-1:1:1:1:7, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker1->dynamic-broker2-54099-1513676883550-5:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676813826, arrival = 0, brokerInTime = 1513676968988, brokerOutTime = 1513676968989, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6fb0f2ba, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 6}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 12, responseRequired = true, messageId = ID:SL2LS431841-54039-1513676753374-1:1:1:1:8, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker1->dynamic-broker2-54099-1513676883550-5:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676823835, arrival = 0, brokerInTime = 1513676968999, brokerOutTime = 1513676969000, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1133bfa0, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 7}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 13, responseRequired = true, messageId = ID:SL2LS431841-54039-1513676753374-1:1:1:1:10, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker1->dynamic-broker2-54099-1513676883550-5:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676843872, arrival = 0, brokerInTime = 1513676969006, brokerOutTime = 1513676969007, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6c74ff09, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 9}, redeliveryCounter = 1} INFO | Successfully reconnected to tcp://SL2LS431841:61626 WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:SL2LS431841-53789-1513676183711-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker2->dynamic-broker1-54081-1513676839955-7:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676224129, arrival = 0, brokerInTime = 1513676902269, brokerOutTime = 1513677166569, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4ec525c9, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 4}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId = ID:SL2LS431841-53789-1513676183711-1:1:1:1:6, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker3->dynamic-broker1-54012-1513676733269-11:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676234150, arrival = 0, brokerInTime = 1513676902309, brokerOutTime = 1513677166570, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4002d261, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 5}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 8, responseRequired = true, messageId = ID:SL2LS431841-53789-1513676183711-1:1:1:1:8, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker2->dynamic-broker1-54081-1513676839955-7:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676254168, arrival = 0, brokerInTime = 1513676902311, brokerOutTime = 1513677166570, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@443d4eb9, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 7}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 10, responseRequired = true, messageId = ID:SL2LS431841-53789-1513676183711-1:1:1:1:7, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker3->dynamic-broker1-54012-1513676733269-11:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676244161, arrival = 0, brokerInTime = 1513676902317, brokerOutTime = 1513677166570, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@13fd85a, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 6}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId = ID:SL2LS431841-53789-1513676183711-1:1:1:1:9, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker2->dynamic-broker1-54081-1513676839955-7:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676264175, arrival = 0, brokerInTime = 1513676902319, brokerOutTime = 1513677166571, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@66b34ed9, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 8}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 11, responseRequired = true, messageId = ID:SL2LS431841-53789-1513676183711-1:1:1:1:10, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker3->dynamic-broker1-54012-1513676733269-11:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676274182, arrival = 0, brokerInTime = 1513676902323, brokerOutTime = 1513677166577, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@30f419b4, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 9}, redeliveryCounter = 1} WARN | ID:SL2LS431841-54111-1513676901862-1:1:1:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:SL2LS431841-54111-1513676901862-1:1:1:1, destination = queue://test.queue, message = ActiveMQTextMessage {commandId = 10, responseRequired = true, messageId = ID:SL2LS431841-54039-1513676753374-1:1:1:1:9, originalDestination = null, originalTransactionId = null, producerId = dynamic-broker2->dynamic-broker1-54081-1513676839955-7:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 0, timestamp = 1513676833866, arrival = 0, brokerInTime = 1513676902327, brokerOutTime = 1513677166585, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1e92d195, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = dummy message 8}, redeliveryCounter = 1}

6. Summary

AMQ provides a network connector to bridge any two brokers and provides failover transport connector to allow the client application to connect to a list of AMQ brokers. If the connection from the client to one broker fails, then the failover transport connector will automatically try to connect to the next broker and will keep trying until the connection is established or a retry limit is reached.

In the Java client application, we demonstrate that sometime the failed broker cause one message get lost in the standalone topology. We also demonstrate that the dynamic network broker not only provides the failover function but also reduces the load at each broker.

Topology Message Loss Client Friendly Load Balanced
Standalone Possible No No
Master/Slave No No No
Static Network of Brokers No Yes No
Dynamic Network of Brokers No Yes Yes

7. References

8. Download the Source Code

This example built two Java AMQ client applications along with four AMQ brokers typologies configuration files.