Example of message-driven-channel-adapter

0

In one of my previous article, I showed you an example of JMS inbound-channel-adapter for receiving messages from a JMS server. In the inbound-channel-adapter case, the consumer client polls the destination for messages at regular intervals.
This is not an efficient way of receiving messages as a message might arrive between two triggering events, in which case, the message might just be sitting in the queue without anyone consuming it.
The ideal way would be to consume it as soon as it arrives. message-driven-channel-adapter represents this second type of consumer.

One can configure more than one consumer continuously listening on the destination for the messages. Moment a message arrives, the JMS consumer receives it and delegates it to the default MessageListener which posts the message to the configured channel.

Example of message-driven-channel-adapter

Let’s go through message-driven-channel-adapter element. This defines the JMS endpoint to receive the messages from the JMS destination. Once a message is received, it will post to the configured inbound channel processEmpChannel.

Since JMS is involved, we need to let the endpoint know about the connection factory, destination and the channel id where it will post the message.

connection-factory defines the connection factory. If your connection factory’s bean id is “connectionFactory” then you don’t have to specify the attribute, spring will automatically inject the connection factory else you need to set the connection-factory to connection factory’s bean reference.
destination-name defines the destination and input-channel defines the outbound channel.
Note that the channel specified should already be declared. In our example processEmpChannel is declared as a queue.

We also have defined another endpoint service-activator to consume the message posted to the channel, once the message is received, the  method configured is called and the employee message is passed in as the argument to the method.
applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:jms="http://www.springframework.org/schema/integration/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">

	<int:channel id="processEmpChannel">
		<int:queue />
	</int:channel>

	<jms:message-driven-channel-adapter
		channel="processEmpChannel" 
		destination-name="empQueue" />

	<int:service-activator input-channel="processEmpChannel"
		ref="springIntExample" method="processEmployee">
		<int:poller fixed-delay="500" />
	</int:service-activator>

	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="vm://localhost?broker.persistent=false" />
			</bean>
		</property>
		<property name="sessionCacheSize" value="10" />
		<property name="cacheProducers" value="false" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>

	<bean id="springIntExample"
		class="com.javarticles.spring.integration.jms.SpringIntegrationJmsMsgDrivenAdapterExample">
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>
</beans>

 

In this example, we post couple of employee beans to the JMS destination. The messages posted are consumed by the service-activator and method processEmployee gets called for each employee.
SpringIntegrationJmsExample:

package com.javarticles.spring.integration.jms;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class SpringIntegrationJmsMsgDrivenAdapterExample {
    private JmsTemplate jmsTemplate;
    
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        SpringIntegrationJmsMsgDrivenAdapterExample springIntExample = (SpringIntegrationJmsMsgDrivenAdapterExample) context.getBean("springIntExample");
        springIntExample.sendEmployee();
    }
    
    public void sendEmployee() {
        Employee emp = new Employee(1, "Joe", 37);
        System.out.println("Queue employee " + emp + " for processing");
        getJmsTemplate().convertAndSend("empQueue", emp);
        getJmsTemplate().convertAndSend("empQueue", new Employee(2, "Sam", 25));
    }
    
    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void processEmployee(Employee emp) {
        System.out.println("Employee: " + emp + " processed");
    }
}

Output:

Queue employee Employee: [Joe, ID: 1, AGE 37] for processing
Employee: Employee: [Joe, ID: 1, AGE 37] processed
Employee: Employee: [Sam, ID: 2, AGE 25] processed

Download Source Code

In this article, I have shown you example of JMS endpoint message-driven-channel-adapter. You can download the source code here: springIntegrationJmsMsgDrivenChnAdapter

Share.

Leave A Reply