Apache ActiveMQ SSL Example (original) (raw)

1. Introduction

Secure Sockets Layer (SSL) is a standard security protocol for establishing encrypted links between a web server and a browser in an online communication. SSL was originally developed for securing web browser and server communications by Netscape in 1994. Subsequently, the protocol was adopted by the Internet Engineering Task Force (IETF) and renamed to Transport Layer Security (TLS) under RFC 2246 in 1999. SSL/TLS addresses the following security considerations:

Java Security Socket Extension (JSSE) is the Java implementation of SSL/TLS protocols. It includes functionality for data encryption, sever authentication, message integrity, and optional client-authentication.

Apache ActiveMQ (AMQ) is written in Java and implements JMS 1.1 specification from the Apache Software Foundation. ActiveMQ uses JSSE to support SSL.

Assuming you understand both AMQ and SSL. In this example, I will demonstrate how to configure an AMQ broker to support SSL and how to create a simple Java application which connects to it securely.

2. Technologies Used

The example code in this article was built and run using:

3. Configure ActiveMQ Server

In this step, we will configure an AMQ server to support the SSL transport in three steps:

  1. Install an AMQ server.
  2. Alter the configuration file to support SSL.
  3. Start the AMQ server and verify the SSL connector is started.

3.1 Activemq.xml

Install AMQ 5.15.3 at a Windows PC. Please check out my other article for more details. If you want to generate a new security key, please follow this article to do so.

In this step, we will use the security certificate that came with the installation and update activemq.xml to enable the SSL transport connector.

activemq.xml

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
      lazy-init="false" scope="singleton"
      init-method="start" destroy-method="stop">
</bean>

<!--
    The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" >
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                     slow topic consumers to block producers and affect other consumers
                     by limiting the number of messages that are retained
                     For more information, see:

                     http://activemq.apache.org/slow-consumer-handling.html

                -->
              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>


    <!--
        The managementContext is used to configure how ActiveMQ is exposed in
        JMX. By default, ActiveMQ uses the MBean server that is started by
        the JVM. For more information, see:

        http://activemq.apache.org/jmx.html
    -->
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>
    <sslContext> 
        <sslContext keyStore="file:${activemq.base}/conf/broker.ks" 
          keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" 
          trustStorePassword="password"/> 
    </sslContext> 


    <!--
        Configure message persistence for the broker. The default persistence
        mechanism is the KahaDB store (identified by the kahaDB tag).
        For more information, see:

        http://activemq.apache.org/persistence.html
    -->
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>


      <!--
        The systemUsage controls the maximum amount of space the broker will
        use before disabling caching and/or slowing down producers. For more information, see:
        http://activemq.apache.org/producer-flow-control.html
      -->
      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>

    <!--
        The transport connectors expose ActiveMQ over a given protocol to
        clients and other brokers. For more information, see:

        http://activemq.apache.org/configuring-transports.html
    -->
    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB 
       
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 
         -->
        
         <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ssl" uri="ssl://0.0.0.0:61714?transport.enabledProtocols=TLSv1.2"/>
         
    </transportConnectors>

    <!-- destroy the spring context on shutdown to stop jetty -->
    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>

</broker>

<!--
    Enable web consoles, REST and Ajax APIs and demos
    The web consoles requires by default login, you can disable this in the jetty.xml file

    Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>

3.2 Server log

Start the AMQ server.

server.log

wrapper | --> Wrapper Started as Console wrapper | Launching a JVM... jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved. jvm 1 | jvm 1 | Java Runtime: Oracle Corporation 1.8.0_40 C:\Program Files\Java\jre1.8.0_40 jvm 1 | Heap sizes: current=251392k free=235655k max=932352k jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -javax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=V4xc5qXB92bkkPap -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=19168 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1 jvm 1 | Extensions classpath: jvm 1 | [....\lib,....\lib\camel,....\lib\optional,....\lib\web,....\lib\extra] jvm 1 | ACTIVEMQ_HOME: .... jvm 1 | ACTIVEMQ_BASE: .... jvm 1 | ACTIVEMQ_CONF: ....\conf jvm 1 | ACTIVEMQ_DATA: ....\data jvm 1 | Loading message broker from: xbean:activemq.xml jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@244a02d0: startup date [Mon May 14 19:42:09 CDT 2018]; root of context hierarchy jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\MaryZheng\tools\apache-activemq-5.15.3\bin\win64....\data\kahadb] jvm 1 | INFO | KahaDB is version 6 jvm 1 | INFO | PListStore:[C:\MaryZheng\tools\apache-activemq-5.15.3\bin\win64....\data\localhost\tmp_storage] started jvm 1 | INFO | Apache ActiveMQ 5.15.3 (localhost, ID:SL2LS431841-55107-1526344932236-0:1) is starting jvm 1 | INFO | Listening for connections at: tcp://SL2LS431841:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector openwire started jvm 1 | INFO | Listening for connections at: ssl://SL2LS431841:61714?transport.enabledProtocols=TLSv1.2 jvm 1 | INFO | Connector ssl started jvm 1 | INFO | Apache ActiveMQ 5.15.3 (localhost, ID:SL2LS431841-55107-1526344932236-0:1) started jvm 1 | INFO | For help or more information please see: http://activemq.apache.org jvm 1 | WARN | Store limit is 102400 mb (current store usage is 3 mb). The data directory: C:\MaryZheng\tools\apache-activemq-5.15.3\bin\win64....\data\kahadb only has 58614 mb of usable space. - resetting to maximum available disk space: 58614 mb jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/ jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/ jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher' jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

Note:

3.3 AMQ Management Console

We can verify the AMQ server’s ssl connector via the AMQ management web console. Go to http://localhost:8161/admin/connections.jsp and confirm the ssl connector.

Figure 1, AMQ connection

4. Connect to ActiveMQ Server

In this step, we will build two Java applications:

4.1 Publish Messages via a Secured Port

Imagine a customer sends sensitive data to your AMQ server, we need to secure the data by enabling the SSL connection. In this step, we will build a QueueMessageProducer class to publish the messages into a queue via a secured connection.

QueueMessageProducer.java

package jcg.demo.activemq.ssl;

import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQSslConnectionFactory;

/**

}

4.2 Consume Messages via a Non-Secured Port

Once the data is in the AMQ server, internal processes, which are protected by the IT firewall, can consume these messages via a non-secured connector for better performance. In this step, we will build a QueueMessageConsumer class to consume the messages from a queue via a non-secured connection.

QueueMessageConsumer.java

package jcg.demo.activemq.ssl;

import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

}

4.3 Install Security Certificate

The client needs to install the security certificate to establish the secured connection. There are several ways to get the security certificate installed for a Java program. As a Java developer, I use the InstallCert class to do so. The source code is credited to Andreas Sterbenz.

InstallCert.java

/*

package jcg.demo.activemq.ssl.util;

import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.security.KeyStore; import java.security.MessageDigest; import java.security.cert.CertificateException; import java.security.cert.X509Certificate;

import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager;

public class InstallCert {

public static void main(final String[] args) {
    InstallCert installCert = new InstallCert();

    try {
        installCert.generateCert("localhost", 61714);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public void generateCert(String host, int port) throws Exception {

    File file = getJsSecCertsFile();

    System.out.println("Loading KeyStore " + file + "...");
    final InputStream in = new FileInputStream(file);
    final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
    ks.load(in, passphrase);
    in.close();

    final SSLContext context = SSLContext.getInstance("TLS");
    final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    tmf.init(ks);
    final X509TrustManager defaultTrustManager = (X509TrustManager) tmf.getTrustManagers()[0];
    final SavingTrustManager tm = new SavingTrustManager(defaultTrustManager);
    context.init(null, new TrustManager[] { tm }, null);
    final SSLSocketFactory factory = context.getSocketFactory();

    System.out.println("Opening connection to " + host + ":" + port + "...");
    final SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
    socket.setSoTimeout(10000);
    try {
        System.out.println("Starting SSL handshake...");
        socket.startHandshake();
        socket.close();
        System.out.println();
        System.out.println("No errors, certificate is already trusted");
    } catch (final SSLException e) {
        System.out.println();
        e.printStackTrace(System.out);
    }

    final X509Certificate[] chain = tm.chain;
    if (chain == null) {
        System.out.println("Could not obtain server certificate chain");
        return;
    }

    final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

    System.out.println();
    System.out.println("Server sent " + chain.length + " certificate(s):");
    System.out.println();
    final MessageDigest sha1 = MessageDigest.getInstance("SHA1");
    final MessageDigest md5 = MessageDigest.getInstance("MD5");
    for (int i = 0; i > 4]);
        sb.append(HEXDIGITS[b & 15]);
        sb.append(' ');
    }
    return sb.toString();
}

private static class SavingTrustManager implements X509TrustManager {

    private final X509TrustManager tm;
    private X509Certificate[] chain;

    SavingTrustManager(final X509TrustManager tm) {
        this.tm = tm;
    }

    @Override
    public X509Certificate[] getAcceptedIssuers() {
        return new X509Certificate[0];
    }

    @Override
    public void checkClientTrusted(final X509Certificate[] chain, final String authType)
            throws CertificateException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void checkServerTrusted(final X509Certificate[] chain, final String authType)
            throws CertificateException {
        this.chain = chain;
        this.tm.checkServerTrusted(chain, authType);
    }
}

}

5. Demo

First, execute the InstallCert to install the certificate into Java keystore. You can try running the program twice to confirm that the certificate is installed correctly.

InstallCert output

Loading KeyStore C:\MaryZheng\tools\java\jdk1.8.0_31\jre\lib\security\cacerts... Opening connection to localhost:61714... Starting SSL handshake...

No errors, certificate is already trusted

Server sent 1 certificate(s):

1 Subject CN=localhost, OU=broker, O=Unknown, L=Unknown, ST=Unknown, C=Unknown Issuer CN=localhost, OU=broker, O=Unknown, L=Unknown, ST=Unknown, C=Unknown sha1 f0 79 0d 04 38 5a 46 ce 86 e1 8a 20 1f 7b ab 3a 46 e4 34 5c md5 3f 6c 0c 89 a8 80 29 cc f5 2d da 5c d7 3f ab 37

Enter certificate to add to trusted keystore or 'q' to quit: [1]

5.1 Execute both Applications

Start QueueMessageProducer as a Java application and capture the output:

QueueMessageProducer output

QueueMessageProducer started ssl://localhost:61714 QueueMessageProducer completed

Start QueueMessageConsumer as a Java application and capture the output:

QueueMessageConsumeroutput

QueueMessageConsumer Waiting for messages at test.queue tcp://localhost:61616 QueueMessageConsumer Received message [ dummy message 0 ] QueueMessageConsumer Received message [ dummy message 1 ] QueueMessageConsumer Received message [ dummy message 2 ] QueueMessageConsumer Received message [ dummy message 3 ] QueueMessageConsumer Received message [ dummy message 4 ] QueueMessageConsumer Received message [ dummy message 5 ] QueueMessageConsumer Received message [ dummy message 6 ] QueueMessageConsumer Received message [ dummy message 7 ] QueueMessageConsumer Received message [ dummy message 8 ] QueueMessageConsumer Received message [ dummy message 9 ]

Note: The QueueMessageProducer starts with an ssl connector.
Verify the ssl connector is enabled via the management web console.

Figure 2, AMQ connection SSL

6. Summary

In this tutorial, we outlined the steps to configure an AMQ server to enable the SSL/TLS transport. SSL must be enabled for Payment Card Industry (PCI) applications. Please check out this article for pro and cons about SSL for other type of applications.

7. Download the Source Code

This example builds two java applications to send and receive messages via the AMQ broker. One via secured SSL, the other not.