In this article, we will see how to create a network of brokers. Using a network connector,a broker connects to another broker. By default, a network connection is one way only. Our article will look into the default behavior.
The broker that establishes the connection forwards the messages it received to the other broker it is connected to. This improves the performance as the clients can now connect to a local broker instead of each client connecting to a remote broker.
Using the network connection the remote connector connects to the local broker, any messages sent to the remote broker automatically gets forwarded to the local broker and the clients can consume the messages from the local broker.
A network connector is by default unidirectional so the broker only fowards the messages it receives to the connected brokers. One can configure the network connector to create a bi-directional communication so that broker can not only send but also receive messages from other brokers from the same channel.
Static network connector
In order for a broker to connect to another broker,it needs to know the broker’s IP address and the port TCP listener is listening on. Brokers to connect can be either discovered dynamically or statically using pre-defined URIs.
The dynamic discovery of brokers is done using either multicast or rendezvous URIs. In this article, we will use statically configured URIs. In order to use static discovery, we must already know the the addresses of all the remote brokers the broker is going to connect to.
This protocol uses a composite URI where the URI that contains other URIs.
Broker3 is listening on port 61618 and will connect to broker1 and broker2 which will define later in next sections.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker id="myBroker" xmlns="http://activemq.apache.org/schema/core" brokerName="broker3" persistent="true" useJmx="true" start="true" dataDirectory="./activemqdata3"> <transportConnectors> <transportConnector uri="tcp://localhost:61618" /> </transportConnectors> <networkConnectors> <networkConnector name="from_broker3" uri="static:(tcp://localhost:61616,tcp://localhost:61617)" /> </networkConnectors> <managementContext> <managementContext connectorPort="2013"/> </managementContext> </broker> </beans>
Important Queue Attributes
We will connect broker3 to broker1 and broker2. Send some messages to broker3. Create consumers that can connect to broker1 or broker2 to receive messages.
We will review the queue attributes using JMX URL:
- broker1 – service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi
- broker2 – service:jmx:rmi:///jndi/rmi://localhost:2012/jmxrmi
- broker3 – service:jmx:rmi:///jndi/rmi://localhost:2013/jmxrmi
Queue attributes we are interested in are:
- Consumer count – No. of consumers subscribed to this destination.
- Producer Count – Number of producers publishing to this destination
- Dequeue count – No. of messages that have been acknowledged (and removed from the destination).
- InFlightCount – Number of messages that have been dispatched to, but not acknowledged by, consumers.
- ForwardCount – Number of messages that have been forwarded (to a networked broker) from the destination.
- QueueSize – Number of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.
- Enqueue Count – Number of messages that have been sent to the destination.
Start Brokers
We will start broker1, broker2 and broker3 in the same machine.
Broker1 is listening on port 61616.
broker1.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker id="myBroker" xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" persistent="true" useJmx="true" start="true" dataDirectory="./activemqdata1"> <transportConnectors> <transportConnector uri="tcp://localhost:61616" /> </transportConnectors> <managementContext> <managementContext connectorPort="2011"/> </managementContext> </broker> </beans>
Broker2 is listening on port 61617.
broker2.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker id="myBroker" xmlns="http://activemq.apache.org/schema/core" brokerName="broker2" persistent="true" useJmx="true" start="true" dataDirectory="./activemqdata2"> <transportConnectors> <transportConnector uri="tcp://localhost:61617" /> </transportConnectors> <managementContext> <managementContext connectorPort="2012" /> </managementContext> </broker> </beans>
Start broker1.
Broker1:
package com.javarticles.activemq; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Broker1 { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "broker1.xml"); } }
Output:
INFO | PListStore:[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\tmp_storage] started INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\KahaDB] INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi INFO | KahaDB is version 6 INFO | Recovering from the journal @1:266315 INFO | Recovery replayed 975 operations from the journal in 0.085 seconds. INFO | Apache ActiveMQ 5.12.0 (broker1, ID:INMAA1-L1005-56861-1480601399422-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61616 INFO | Connector tcp://localhost:61616 started INFO | Apache ActiveMQ 5.12.0 (broker1, ID:INMAA1-L1005-56861-1480601399422-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\KahaDB only has 15743 mb of usable space - resetting to maximum available disk space: 15744 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata1\broker1\tmp_storage only has 15743 mb of usable space - resetting to maximum available 15743 mb.
Start broker2.
Broker2:
package com.javarticles.activemq; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Broker2 { public static void main(String[] args) { new ClassPathXmlApplicationContext("broker2.xml"); } }
Output:
INFO | PListStore:[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\tmp_storage] started INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\KahaDB] INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:2012/jmxrmi INFO | KahaDB is version 6 INFO | Recovering from the journal @1:223507 INFO | Recovery replayed 976 operations from the journal in 0.029 seconds. INFO | Apache ActiveMQ 5.12.0 (broker2, ID:INMAA1-L1005-56890-1480601446024-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61617 INFO | Connector tcp://localhost:61617 started INFO | Apache ActiveMQ 5.12.0 (broker2, ID:INMAA1-L1005-56890-1480601446024-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\KahaDB only has 15743 mb of usable space - resetting to maximum available disk space: 15744 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata2\broker2\tmp_storage only has 15743 mb of usable space - resetting to maximum available 15743 mb.
Start broker3.
Broker3:
package com.javarticles.activemq; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.network.NetworkConnector; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Broker3 { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "broker3.xml"); BrokerService brokerService = (BrokerService) context .getBean("myBroker"); NetworkConnector networkConnector = brokerService .getNetworkConnectorByName("from_broker3"); System.out.println("Is from_broker3 network connection up? " + (networkConnector != null)); } }
Output:
INFO | PListStore:[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\tmp_storage] started INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\KahaDB] INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:2013/jmxrmi INFO | KahaDB is version 6 INFO | Recovering from the journal @1:317985 INFO | Recovery replayed 975 operations from the journal in 0.033 seconds. INFO | Apache ActiveMQ 5.12.0 (broker3, ID:INMAA1-L1005-56905-1480601502666-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61618 INFO | Connector tcp://localhost:61618 started INFO | Establishing network connection from vm://broker3?async=false to tcp://localhost:61616 INFO | Connector vm://broker3 started INFO | Establishing network connection from vm://broker3?async=false to tcp://localhost:61617 INFO | Network Connector DiscoveryNetworkConnector:from_broker3:BrokerService[broker3] started INFO | Apache ActiveMQ 5.12.0 (broker3, ID:INMAA1-L1005-56905-1480601502666-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\KahaDB only has 15742 mb of usable space - resetting to maximum available disk space: 15743 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javarticles_ws\ActiveMQStaticDestinationExample\.\activemqdata3\broker3\tmp_storage only has 15742 mb of usable space - resetting to maximum available 15742 mb. Is from_broker3 network connection up? true INFO | Network connection between vm://broker3#0 and tcp://localhost/127.0.0.1:[email protected] (broker1) has been established. INFO | Network connection between vm://broker3#2 and tcp://localhost/127.0.0.1:[email protected] (broker2) has been established.
Producer
Start Producer, sends 10 messages to broker3, queue
test_from_broker3
.
Producer:
package com.javarticles.activemq; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class Producer { public static void main(String[] args) throws Exception { // send a message ActiveMQQueue destination = new ActiveMQQueue("test_from_broker3"); ActiveMQConnectionFactory localFactory = new ActiveMQConnectionFactory( "tcp://localhost:61618"); Connection localConnection = localFactory.createConnection(); localConnection.start(); Session localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { MessageProducer localProducer = localSession .createProducer(destination); for (int i = 0; i < 10; i++) { Message message = localSession.createTextMessage("test" + i); System.out.println("Send " + message); localProducer.send(message); } } finally { localSession.close(); localConnection.close(); } } }
Output:
Send ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = test0} ...
Open broker3 JMX admin console. Enqueue count is 10.
Consumer Task
We will now consume the messages from queue. Below polling task connects to the broker URI passed in. Number of messages to consume is also passed in to the constructor.
PollingTask:
package com.javarticles.activemq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class PollingTask implements Runnable { private String brokerUri; private int batchSize; public PollingTask(String brokerUri, int batchSize) { this.brokerUri = brokerUri; this.batchSize = batchSize; } public void run() { ActiveMQConnectionFactory remoteFactory = new ActiveMQConnectionFactory( brokerUri); Connection remoteConnection; try { remoteConnection = remoteFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); return; } try { remoteConnection.start(); } catch (JMSException e) { e.printStackTrace(); return; } Queue destination = new ActiveMQQueue("test_from_broker3"); Session remoteSession; try { remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = remoteSession.createQueue("test_from_broker3"); } catch (JMSException e1) { e1.printStackTrace(); return; } try { MessageConsumer remoteConsumer; try { remoteConsumer = remoteSession.createConsumer(destination); } catch (JMSException e1) { e1.printStackTrace(); return; } System.out.println(Thread.currentThread().getName() + ": waiting for message..."); for (int i = 0; i < batchSize; i++) { try { TextMessage message = (TextMessage) remoteConsumer .receive(); if (message != null) { System.out.println(Thread.currentThread().getName() + ": Message: " + message.getText()); } } catch (final JMSException e) { e.printStackTrace(); break; } } } finally { try { remoteSession.close(); remoteConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
Single Consumer from Broker1
Below client will connect to broker1 and will consume 2 messages.
SingleConsumerFromBroker1:
package com.javarticles.activemq; public class SingleConsumerFromBroker1 { public static void main(String args[]) { PollingTask consumer1 = new PollingTask("tcp://localhost:61616", 2); Thread consumerThread1 = new Thread(consumer1, "consumer"); consumerThread1.start(); } }
Output:
consumer: waiting for message... consumer: Message: test8 consumer: Message: test9
We will now revisit JMX console for broker3. As you can see though the above client to broker3 has consumed only 2 messages. It dequeued all 10 messages.
The current queue size is 0.
Below is broker1’s JMX console after the consumption of messages. Client connected to broker1 and received 2 messages. Broker3 sent all 10 messages to broker1 as it knows there is a demand for those messages from a consumer connected to broker1. Broker1 has still 8 messages in its queue. Consumer count is 1. Its dequeue count is 2 and enqeue count 10.
Messages sent to the local broker will be forwarded to only those brokers that have currently demand for those messages from a consumer. Since there is a consumer attached to a destination on Broker1 all the messages got forwarded from broker3 to broker1..
Dual Consumers from Broker1 and Broker2
Run MultipleConsumersFromBroker1 to consume remaining messages in queue. This uses multiple consumers so now we have a client connected to broker1 and a client connected to broker2.
ConsumerFromBroker1Broker2:
package com.javarticles.activemq; public class ConsumerFromBroker1Broker2 { public static void main(String args[]) { PollingTask consumer1 = new PollingTask("tcp://localhost:61616", 5); Thread consumerThread1 = new Thread(consumer1, "consumer1"); consumerThread1.start(); PollingTask consumer2 = new PollingTask("tcp://localhost:61617", 3); Thread consumerThread2 = new Thread(consumer2, "consumer2"); consumerThread2.start(); } }
Output:
consumer1: waiting for message... consumer2: waiting for message... consumer1: Message: test0 consumer1: Message: test2 consumer2: Message: test1 consumer1: Message: test4 consumer2: Message: test3 consumer2: Message: test5 consumer1: Message: test6 consumer1: Message: test7
Rerun the producer so that it posts another set of 10 messages. This time
Broker3 Console shows that 10 messages (EnqueueCount=10) have been sent to it, of which 9 messages have been sent to the connected brokers (DequeueCount=9). There is still one message in its queue (QueueSize=1).
Broker1 Console shows that it received 6 messages (EnqueueCount=6) from broker3, of which 5 messages have been consumed by the connected client (DequeueCount=5). There is still one message in its queue (QueueSize=1).
Broker1 Console shows that it received 3 messages (EnqueueCount=3) from broker3, of which 3 messages have been consumed by the connected client (DequeueCount=3). There are no messages in its queue (QueueSize=0).
Download the source code
This was an example of how to create a network of activeMQ brokers and their default nature.