ActiveMQ Standalone Broker Example

0

In this article, we will show an example of producer sending message to a consumer using ActiveMQ’s standalone broker.

ActiveMQ is a Java Message Service (JMS) 1.1 compliant, open-source, messaging system from the Apache Software Foundation.

Producer  runs in its own JVM, consumer in its own and the message broker facilitates the communications between producer and consumer. The producer sends a message to a queue and the message remains in the queue until another the consumer retrieves the message from that queue. This happens asynchronously so the client acting as consumer of the message does not have to be available when the message is sent to the queue and can instead retrieve the message at a later point of time.

Maven Dependencies

Include activeMQ dependencies to pom.xml.

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javarticles.spring.jms</groupId>
	<artifactId>springJms</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<version>5.12.0</version>
			<artifactId>activemq-all</artifactId>
		</dependency>
	</dependencies>
</project>

ActiveMQ Components Involved

The components involved are producer, consumer, the message itself, destination, and the message broker.

The producer and consumer both act as clients of the broker. A producer creates a message, connects to the broker, dispatches the message to the broker for routing and delivery to a particular destination. A destination is a logical channel through which the clients communicate with one another.

Once the message is delivered to the destination, it sits in there till the consumer connects to the broker and retrieves the message from the same destination.

Let’s now look into an example of standalone broker.

Standalone Broker

There are two basic types of ActiveMQ brokers: embedded and standalone. An embedded broker executes within the same JVM process as the Java clients that are using its services. Whereas a standalone broker is one that does not have its clients co-residing in its JVM and communicates with its clients through network-based transport connectors like TCP/IP based transport connector.

The broker service is created for URI tcp://localhost:61616 which means the broker is listening on port 61616 for incoming transport and network connection requests.

JmsBroker:

package com.javarticles.spring.jms;

import java.net.URI;
import java.net.URISyntaxException;

import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

public class JmsBroker {
    public static void main(String[] args) throws URISyntaxException, Exception {
        BrokerService broker = BrokerFactory.createBroker(new URI(
                "broker:(tcp://localhost:61616)"));
        broker.start();
    }
}

Output:

 INFO | Using Persistence Adapter: AMQPersistenceAdapter(activemq-data\localhost)
 INFO | AMQStore starting using directory: activemq-data\localhost
 INFO | Kaha Store using data directory activemq-data\localhost\kr-store\state
 INFO | Active data files: []
 WARN | The ReferenceStore is not valid - recovering ...
 INFO | Kaha Store successfully deleted data directory activemq-data\localhost\kr-store\data
 INFO | Journal Recovery Started from: DataManager:(data-)
 INFO | Kaha Store using data directory activemq-data\localhost\kr-store\data
 INFO | Recovered 4 operations from redo log in 0.075 seconds.
 INFO | Finished recovering the ReferenceStore
 INFO | ActiveMQ 5.2.0 JMS Message Broker (localhost) is starting
 INFO | For help or more information please see: http://activemq.apache.org/
 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
 INFO | Listening for connections at: tcp://INMAA1-L1005:61616
 INFO | Connector tcp://INMAA1-L1005:61616 Started
 INFO | ActiveMQ JMS Message Broker (localhost, ID:INMAA1-L1005-59627-1444994347153-0:0) started

Producer

In order to send the message, producer has to do the following:

  1. Create a JMS connection factory on a message broker.
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    

    The URL passed to ActiveMQConnectionFactory is based on TCP/IP transport connector. The IP address:port should be the one where the broker is running and listening to.

  2. Producer needs to send message to some logical channel that the broker is aware of, which we call destination. There can be many destinations within a broker. A destination can be either a queue or a topic.
    Destination destination = new ActiveMQQueue("someQueue");
  3. In order for the producer to connect to the broker, we need to open a JMS connection using the connection factory.
    Connection connection = connectionFactory.createConnection();
  4. Next, obtain a JMS session from the connection.
    Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
  5. Create a message. Here we are creating a simple text message.
    Message msg = session.createTextMessage(payload);
  6. Create a producer using the session and the destination that we want to deliver.
    MessageProducer producer = session.createProducer(destination);
  7. Send the JMS message with a message producer.
    producer.send(msg);
  8. Close the session and the connection

JmsMessageProducer:

package com.javarticles.spring.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class JmsMessageProducer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616");
        Destination destination = new ActiveMQQueue("someQueue");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        try {
            String payload = "Hi";
            Message msg = session.createTextMessage(payload);
            MessageProducer producer = session.createProducer(destination);
            System.out.println("Send text '" + payload + "'");
            producer.send(msg);
            session.close();
        } finally {
            if (connection != null) {
                connection.close();
            }
        }

    }

}

Output:

Send text 'Hi'

Consumer

The consumer also follows a similar path as that of the producer. It also needs a connection factory, connection and a session object.
Instead of creating a producer, we create a consumer.

MessageConsumer consumer = session.createConsumer(destination);

Since its a consumer, it connects to broker to retrieve the message.

TextMessage msg = (TextMessage) consumer.receive();
System.out.println(msg);
System.out.println("Received: " + msg.getText());

JmsMessageConsumer:

package com.javarticles.spring.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class JmsMessageConsumer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616");
        Destination destination = new ActiveMQQueue("someQueue");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        try {
            MessageConsumer consumer = session.createConsumer(destination);
            connection.start();
            TextMessage msg = (TextMessage) consumer.receive();
            System.out.println(msg);
            System.out.println("Received: " + msg.getText());
            session.close();
        } finally {
            if (connection != null) {
                connection.close();
            }
        }

    }

}

Output:

ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:INMAA1-L1005-59910-1444994617331-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-59910-1444994617331-0:0:1:1, destination = queue://someQueue, transactionId = null, expiration = 0, timestamp = 1444994617497, arrival = 0, brokerInTime = 1444994617498, brokerOutTime = 1444994668742, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = Hi}
Received: Hi

Download the source code

This was an example about ActiveMQ producer/consumer using standalone broker.

You can download the source code here: ActiveMQStandloneExample.zip
Share.

Comments are closed.