ActiveMQ Network of Brokers

0

In this article, we will see how to create a network of brokers. Using a network connector,a broker connects to another broker. By default, a network connection is one way only. Our article will look into the default behavior.

The broker that establishes the connection forwards the messages it received to the other broker it is connected to. This improves the performance as the clients can now connect to a local broker instead of each client connecting to a remote broker.

Using the network connection the remote connector connects to the local broker, any messages sent to the remote broker automatically gets forwarded to the local broker and the clients can consume the messages from the local broker.

Forwarding Bridge

Forwarding Bridge

A network connector is by default unidirectional so the broker only fowards the messages it receives to the connected brokers. One can configure the network connector to create a bi-directional communication so that broker can not only send but also receive messages from other brokers from the same channel.

Static network connector

In order for a broker to connect to another broker,it needs to know the broker’s IP address and the port TCP listener is listening on. Brokers to connect can be either discovered dynamically or statically using pre-defined URIs.

The dynamic discovery of brokers is done using either multicast or rendezvous URIs. In this article, we will use statically configured URIs. In order to use static discovery, we must already know the the addresses of all the remote brokers the broker is going to connect to.

This protocol uses a composite URI where the URI that contains other URIs.
Broker3 is listening on port 61618 and will connect to broker1 and broker2 which will define later in next sections.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<broker id="myBroker" xmlns="http://activemq.apache.org/schema/core"
		brokerName="broker3" persistent="true" useJmx="true" start="true"
		dataDirectory="./activemqdata3">
		<transportConnectors>
			<transportConnector uri="tcp://localhost:61618" />
		</transportConnectors>
		<networkConnectors>
			<networkConnector name="from_broker3" uri="static:(tcp://localhost:61616,tcp://localhost:61617)"
				 />
		</networkConnectors>
		<managementContext>
			<managementContext connectorPort="2013"/>
		</managementContext>		
	</broker>

</beans>

Important Queue Attributes

We will connect broker3 to broker1 and broker2. Send some messages to broker3. Create  consumers that can connect to broker1 or broker2 to receive messages.

We will review the queue attributes using JMX URL:

  1. broker1 – service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi
  2. broker2 – service:jmx:rmi:///jndi/rmi://localhost:2012/jmxrmi
  3. broker3 – service:jmx:rmi:///jndi/rmi://localhost:2013/jmxrmi

Queue attributes we are interested in are:

  1. Consumer count – No. of consumers subscribed to this destination.
  2. Producer Count – Number of producers publishing to this destination
  3. Dequeue count – No. of messages that have been acknowledged (and removed from the destination).
  4. InFlightCount – Number of messages that have been dispatched to, but not acknowledged by, consumers.
  5. ForwardCount – Number of messages that have been forwarded (to a networked broker) from the destination.
  6. QueueSize – Number of messages in the destination which are yet to be consumed.  Potentially dispatched but unacknowledged.
  7. Enqueue Count – Number of messages that have been sent to the destination.

Start Brokers

We will start broker1, broker2 and broker3 in the same machine.

Broker1 is listening on port 61616.

broker1.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">


	<broker id="myBroker" xmlns="http://activemq.apache.org/schema/core"
		brokerName="broker1" persistent="true" useJmx="true" start="true"
		dataDirectory="./activemqdata1">
		<transportConnectors>
			<transportConnector uri="tcp://localhost:61616" />
		</transportConnectors>
		<managementContext>
			<managementContext connectorPort="2011"/>
		</managementContext>
	</broker>

</beans>

Broker2 is listening on port 61617.

broker2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<broker id="myBroker" xmlns="http://activemq.apache.org/schema/core"
		brokerName="broker2" persistent="true" useJmx="true" start="true"
		dataDirectory="./activemqdata2">
		<transportConnectors>
			<transportConnector uri="tcp://localhost:61617" />
		</transportConnectors>
		<managementContext>
			<managementContext connectorPort="2012" />
		</managementContext>
	</broker>

</beans>

Start broker1.

Broker1:

package com.javarticles.activemq;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Broker1 {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "broker1.xml");        
    }

}

Output:

 INFO | PListStore:[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\KahaDB]
 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:266315
 INFO | Recovery replayed 975 operations from the journal in 0.085 seconds.
 INFO | Apache ActiveMQ 5.12.0 (broker1, ID:INMAA1-L1005-56861-1480601399422-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61616
 INFO | Connector tcp://localhost:61616 started
 INFO | Apache ActiveMQ 5.12.0 (broker1, ID:INMAA1-L1005-56861-1480601399422-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\KahaDB only has 15743 mb of usable space - resetting to maximum available disk space: 15744 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\tmp_storage only has 15743 mb of usable space - resetting to maximum available 15743 mb.

Start broker2.

Broker2:

package com.javarticles.activemq;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Broker2 {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("broker2.xml");
    }

}

Output:

 INFO | PListStore:[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\KahaDB]
 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:2012/jmxrmi
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:223507
 INFO | Recovery replayed 976 operations from the journal in 0.029 seconds.
 INFO | Apache ActiveMQ 5.12.0 (broker2, ID:INMAA1-L1005-56890-1480601446024-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61617
 INFO | Connector tcp://localhost:61617 started
 INFO | Apache ActiveMQ 5.12.0 (broker2, ID:INMAA1-L1005-56890-1480601446024-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\KahaDB only has 15743 mb of usable space - resetting to maximum available disk space: 15744 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\tmp_storage only has 15743 mb of usable space - resetting to maximum available 15743 mb.

Start broker3.

Broker3:

package com.javarticles.activemq;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Broker3 {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "broker3.xml");
        BrokerService brokerService = (BrokerService) context
                .getBean("myBroker");
        NetworkConnector networkConnector = brokerService
                .getNetworkConnectorByName("from_broker3");
        System.out.println("Is from_broker3 network connection up? "
                + (networkConnector != null));
    }

}

Output:

 INFO | PListStore:[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\KahaDB]
 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:2013/jmxrmi
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:317985
 INFO | Recovery replayed 975 operations from the journal in 0.033 seconds.
 INFO | Apache ActiveMQ 5.12.0 (broker3, ID:INMAA1-L1005-56905-1480601502666-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61618
 INFO | Connector tcp://localhost:61618 started
 INFO | Establishing network connection from vm://broker3?async=false to tcp://localhost:61616
 INFO | Connector vm://broker3 started
 INFO | Establishing network connection from vm://broker3?async=false to tcp://localhost:61617
 INFO | Network Connector DiscoveryNetworkConnector:from_broker3:BrokerService[broker3] started
 INFO | Apache ActiveMQ 5.12.0 (broker3, ID:INMAA1-L1005-56905-1480601502666-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\KahaDB only has 15742 mb of usable space - resetting to maximum available disk space: 15743 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\tmp_storage only has 15742 mb of usable space - resetting to maximum available 15742 mb.
Is from_broker3 network connection up? true
 INFO | Network connection between vm://broker3#0 and tcp://localhost/127.0.0.1:61616@56906 (broker1) has been established.
 INFO | Network connection between vm://broker3#2 and tcp://localhost/127.0.0.1:61617@56907 (broker2) has been established.

Producer

Start Producer, sends 10 messages to broker3, queue

test_from_broker3.

Producer:

package com.javarticles.activemq;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class Producer {
    public static void main(String[] args) throws Exception {
        // send a message
        ActiveMQQueue destination = new ActiveMQQueue("test_from_broker3");

        ActiveMQConnectionFactory localFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61618");
        Connection localConnection = localFactory.createConnection();
        localConnection.start();
        Session localSession = localConnection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        try {
            MessageProducer localProducer = localSession
                    .createProducer(destination);

            for (int i = 0; i < 10; i++) {
                Message message = localSession.createTextMessage("test" + i);
                System.out.println("Send " + message);
                localProducer.send(message);
            }
        } finally {
            localSession.close();
            localConnection.close();
        }

    }

}

Output:

Send ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = test0}
...

Open broker3 JMX admin console. Enqueue count is 10.

Broker3 Console

Broker3 Console

Consumer Task

We will now consume the messages from queue. Below polling task connects to the broker URI passed in.  Number of messages to consume is also passed in to the constructor.

PollingTask:

package com.javarticles.activemq;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class PollingTask implements Runnable {
    private String brokerUri;
    private int batchSize;

    public PollingTask(String brokerUri, int batchSize) {
        this.brokerUri = brokerUri;
        this.batchSize = batchSize;
    }

    public void run() {
        ActiveMQConnectionFactory remoteFactory = new ActiveMQConnectionFactory(
                brokerUri);
        Connection remoteConnection;
        try {
            remoteConnection = remoteFactory.createConnection();
        } catch (JMSException e) {
            e.printStackTrace();
            return;
        }
        try {
            remoteConnection.start();
        } catch (JMSException e) {
            e.printStackTrace();
            return;
        }
        Queue destination = new ActiveMQQueue("test_from_broker3");
        Session remoteSession;
        try {
            remoteSession = remoteConnection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            destination = remoteSession.createQueue("test_from_broker3");
        } catch (JMSException e1) {
            e1.printStackTrace();
            return;
        }
        try {
            MessageConsumer remoteConsumer;
            try {
                remoteConsumer = remoteSession.createConsumer(destination);
            } catch (JMSException e1) {
                e1.printStackTrace();
                return;
            }
            System.out.println(Thread.currentThread().getName() + ": waiting for message...");
            for (int i = 0; i < batchSize; i++) {
                try {
                    TextMessage message = (TextMessage) remoteConsumer
                            .receive();
                    if (message != null) {
                        System.out.println(Thread.currentThread().getName()
                                + ": Message: " + message.getText());
                    }
                } catch (final JMSException e) {
                    e.printStackTrace();
                    break;
                }
            }
        } finally {
            try {
                remoteSession.close();
                remoteConnection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

Single Consumer from Broker1

Below client will connect to broker1 and will consume 2 messages.

SingleConsumerFromBroker1:

package com.javarticles.activemq;

public class SingleConsumerFromBroker1 {
    public static void main(String args[]) {
        PollingTask consumer1 = new PollingTask("tcp://localhost:61616", 2);
        Thread consumerThread1 = new Thread(consumer1, "consumer");
        consumerThread1.start();
    }
}

Output:

consumer: waiting for message...
consumer: Message: test8
consumer: Message: test9

We will now revisit JMX console for broker3. As you can see though the above client to broker3 has consumed only 2 messages. It dequeued all 10 messages.
The current queue size is 0.

Broker3 after Consume

Broker3 after Consume

Below is broker1’s JMX console after the consumption of messages. Client connected to broker1 and received 2 messages. Broker3 sent all 10 messages to broker1 as it knows there is a demand for those messages from a consumer connected to broker1. Broker1 has still 8 messages in its queue. Consumer count is 1. Its dequeue count is 2 and enqeue count 10.

Broker1 after consumption

Broker1 after consumption

Messages sent to the local broker will be forwarded to only those brokers that have currently demand for those messages from a consumer. Since there is a consumer attached to a destination on Broker1 all the messages got forwarded from broker3 to broker1..

Dual Consumers from Broker1 and Broker2

Run MultipleConsumersFromBroker1 to consume remaining messages in queue. This uses multiple consumers so now we have a client connected to broker1 and a client connected to broker2.

ConsumerFromBroker1Broker2:

package com.javarticles.activemq;

public class ConsumerFromBroker1Broker2 {
    public static void main(String args[]) {
        PollingTask consumer1 = new PollingTask("tcp://localhost:61616", 5);
        Thread consumerThread1 = new Thread(consumer1, "consumer1");
        consumerThread1.start();
        
        PollingTask consumer2 = new PollingTask("tcp://localhost:61617", 3);
        Thread consumerThread2 = new Thread(consumer2, "consumer2");
        consumerThread2.start();
    }
}

Output:

consumer1: waiting for message...
consumer2: waiting for message...
consumer1: Message: test0
consumer1: Message: test2
consumer2: Message: test1
consumer1: Message: test4
consumer2: Message: test3
consumer2: Message: test5
consumer1: Message: test6
consumer1: Message: test7

Rerun the producer so that it posts another set of 10 messages. This time

Broker3 Console shows that 10 messages (EnqueueCount=10) have been sent to it, of which 9 messages have been sent to the connected brokers (DequeueCount=9). There is still one message in its queue (QueueSize=1).

Broker3 Console

Broker3 Console

Broker1 Console shows that it received 6 messages (EnqueueCount=6) from broker3, of which 5 messages have been consumed by the connected client (DequeueCount=5). There is still one message in its queue (QueueSize=1).

Broker1 Console

Broker1 Console

Broker1 Console shows that it received 3 messages (EnqueueCount=3) from broker3, of which 3 messages have been consumed by the connected client (DequeueCount=3). There are no messages in its queue (QueueSize=0).

Broker1 Console

Broker2 Console

Download the source code

This was an example of how to create a network of activeMQ brokers and their default nature.

You can download the source code here: activemqnetworkconnectorexample.zip

About Author

Ram's expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring. You can reach him at rsatish.m@gmail.com

Comments are closed.