Spring Integration Message Bridge

0

A spring integration bridge is an endpoint that simply passes the message directly from the input channel to the output channel without modifying it. The input channel can be either a pollable channel or a subscribable channel and likewise the output channel can be either a pollable or subscribable channel. If the output channel is missing then probably the bridge acts as a stopper.

We can also use bridge to join two different messaging systems. When a message is delivered on a channel of interest in one messaging system, the bridge consumes the message and sends another with the same contents on the corresponding channel in the other messaging system.

In our article, we will see an example for each case. The first one will use spring integration bridge to deliver the message from a subscribable channel to the pollable channel. The second one will deliver the message to a second messaging system.

Before we start with the example, lets first add module dependencies to our pom.xml.

Dependencies

Add the following dependencies:

  1. spring-core
  2. spring-context
  3. spring-integration-core
  4. spring-integration-spring
  5. activemq-core
  6. spring-jms

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javarticles.spring.integration.jms</groupId>
	<artifactId>springintegrationjms</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>4.1.2.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-jms</artifactId>
			<version>4.1.2.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.2.0</version>
		</dependency>
	</dependencies>

	<properties>
		<spring.version>4.1.4.RELEASE</spring.version>
	</properties>

</project>

Bridge Channels

In this example, we will bridge a publish/subscribe model channel with a pollable channel. Our example is about two categories of customer. The customers who pay for annual maintenance are given more priority than the customers who opt for demand service.

If one is privileged customer, then complaint posted is sent through highPriorityCustomerChannel channel which marks the message as ‘High Priority’ message by setting header customerPriority to ‘high’.

The routing is initiated through a call to the gateway adapter.

We have different gateways for each category of customer.

For priority customer:

<int:gateway id="privilegedCustomer" service-interface="com.javarticles.spring.integration.PrivilegedCustomer"
		default-request-channel="privilegedCustomerChannel" />

For the on-demand service customers:

<int:gateway id="baseCustomer" service-interface="com.javarticles.spring.integration.BaseCustomer"
		default-request-channel="basicCustomerChannel" />

Here is the gate way interface for privileged customer.

PrivilegedCustomer:

package com.javarticles.spring.integration;

public interface PrivilegedCustomer {
    String processRequest(String complain);
}

and for the usual customers.

BaseCustomer:

package com.javarticles.spring.integration;

public interface BaseCustomer {
    String processRequest(String complain);
}

We post the complaint using processRequest() method. The complaint posted is sent to channel privilegedCustomerChannel for the privileged customers and to basicCustomerChannel for the non-privileged customers.

The privileged customer’s complaint goes through an additional channel as we want the priority to be marked as ‘high’. We simply pass the message from channel privilegedCustomerChannel to channel highPriorityCustomerChannel using the message bridge which is declared using the <bridge> element in the integration namespace.

<int:bridge id="bridgeHighPriority" input-channel="privilegedCustomerChannel"
		output-channel="highPriorityCustomerChannel">
    <int:poller fixed-delay="500" />
</int:bridge>

Marking of the header is done using the header enricher.

<int:header-enricher id="markAsHighPriority"
		input-channel="highPriorityCustomerChannel" output-channel="processComplaintChannel">
    <int:header name="customerPriority" value="high" />
</int:header-enricher>

Both privileged customer’s complaint and the non-privileged customer’s complaint reach channel processComplaintChannel for the actual processing of the complaint.
A service activator processComplaint is hooked onto the output channel. As soon as the message arrives at the output channel via the bridge endpoint, the customerComplaintProcessor bean is invoked for action. The privileged customer’s complaint is given immediate attention whereas the non-privileged customer’s complaint is queued.

<int:service-activator id="processComplaint"
      ref="customerComplaintProcessor" method="process" input-channel="processComplaintChannel" />

<bean id="customerComplaintProcessor" class="com.javarticles.spring.integration.ComplaintProcessor" />

ComplaintProcessor:

package com.javarticles.spring.integration;

import org.springframework.messaging.Message;

public class ComplaintProcessor {
    public String process(Message complaint) {
        String priority = complaint.getHeaders().get("customerPriority", String.class);
        if ("high".equals(priority)) {
            return "Complaint logged '" + complaint + "'";
        } else {
            return "Complaint queued '" + complaint + "'";
        }
    }
}

You can imagine this as complaint being assigned to dedicated engineers for immediate action versus complaint assigned to a specific date for action.

Here is the entire application context XML.

complaintApplicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration
			http://www.springframework.org/schema/integration/spring-integration.xsd
			http://www.springframework.org/schema/context 
			http://www.springframework.org/schema/context/spring-context.xsd
			http://www.springframework.org/schema/integration/jms 
			http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">

	<int:gateway id="privilegedCustomer"
		service-interface="com.javarticles.spring.integration.PrivilegedCustomer"
		default-request-channel="privilegedCustomerChannel" />

	<int:gateway id="baseCustomer"
		service-interface="com.javarticles.spring.integration.BaseCustomer"
		default-request-channel="basicCustomerChannel" />

	<int:channel id="privilegedCustomerChannel">
		<int:queue />
	</int:channel>

	<int:channel id="highPriorityCustomerChannel" />

	<int:channel id="basicCustomerChannel">
		<int:queue />
	</int:channel>

	<int:channel id="processComplaintChannel" />

	<int:bridge id="bridgeHighPriority" input-channel="privilegedCustomerChannel"
		output-channel="highPriorityCustomerChannel">
		<int:poller fixed-delay="500" />
	</int:bridge>	

	<int:bridge id="bridgeBasicCustomer" input-channel="basicCustomerChannel"
		output-channel="processComplaintChannel">
		<int:poller fixed-delay="500" />
	</int:bridge>

	<int:header-enricher id="markAsHighPriority"
		input-channel="highPriorityCustomerChannel" output-channel="processComplaintChannel">
		<int:header name="customerPriority" value="high" />
	</int:header-enricher>

	<int:service-activator id="processComplaint"
		ref="customerComplaintProcessor" method="process" input-channel="processComplaintChannel" />

	<bean id="customerComplaintProcessor" class="com.javarticles.spring.integration.ComplaintProcessor" />

</beans>

Here is the flow:

Bridge subscribable to p2p channel

Bridge subscribable to p2p channel

Now let’s post some complaint.

SpringIntegrationBridgeExample:

package com.javarticles.spring.integration;

import java.io.IOException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringIntegrationBridgeExample {

    public static void main(String[] args) throws InterruptedException,
            IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "complaintApplicationContext.xml");
        try {
            PrivilegedCustomer privilegedCust = (PrivilegedCustomer) context
                    .getBean("privilegedCustomer");
            String complaint = "Machine is not working!";
            System.out.println("Post high priority complaint '" + complaint +"'");
            System.out.println("Response: " + privilegedCust.processRequest(complaint));
            
            System.out.println("Post normal priority complaint '" + complaint +"'");
            BaseCustomer baseCust = (BaseCustomer) context
                    .getBean("baseCustomer");
            System.out.println("Response: " + baseCust.processRequest("General maintenance!"));
        } finally {
            context.close();
        }
    }

}
Output:
Post high priority complaint 'Machine is not working!'
Response: Complaint logged 'GenericMessage [payload=Machine is not working!, headers={replyChannel=org.springframework.messaging.core.GenericM[email protected], [email protected][email protected], id=54fc623d-b995-9d11-9771-cfed6d590369, customerPriority=high, timestamp=1435582630755}]'
Post normal priority complaint 'Machine is not working!'
Response: Complaint queued 'GenericMessage [payload=General maintenance!, headers={replyChannel=org.springframework.messaging.core.GenericM[email protected], [email protected][email protected], id=cab82e73-2132-4df4-4e33-743327afb0d2, timestamp=1435582630755}]'

Using Bridge to connect Messaging Systems

In this example, we will make use bridge to deliver messages from one message system to another. For example, consider this scenario, after the order s processed, we want the order confirmation email to be handled by a separate system so that the actual order processing and related components is unaffected by the process sending emails.

First we need to start the email messaging system’s JMS broker.

BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setTransportConnectorURIs(new String[] { "tcp://localhost:61623" });
broker.setDeleteAllMessagesOnStartup(true);
broker.start();

Th JMS inbound adapter int-jms:inbound-channel-adapter will receive the message the inbound queue and post the message to the channel orderEmailChannel.

<int-jms:inbound-channel-adapter channel="orderEmailChannel" connection-factory="connectionFactory" 
		destination="orderEmailQueue" auto-startup="true"> 
	<int:poller fixed-delay="500"/> 
</int-jms:inbound-channel-adapter>

The order message will be picked up by the service activator to send the order confirmation by email.

<int:service-activator id="orderEmail" ref="emailOrder" input-channel="orderEmailChannel">
		<int:poller fixed-delay="500" />
	</int:service-activator>

	<bean id="emailOrder" class="com.javarticles.spring.integration.OrderEmailSender" />

We have simplified the class which sends the email.

OrderEmailSender:

package com.javarticles.spring.integration;

public class OrderEmailSender {
    public void sendEmail(String order) {
        System.out.println("Send email for " + order);
    }
}

orderEmailApplicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration
			http://www.springframework.org/schema/integration/spring-integration.xsd
			http://www.springframework.org/schema/context 
			http://www.springframework.org/schema/context/spring-context.xsd
			http://www.springframework.org/schema/integration/jms 
			http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">

	<int-jms:inbound-channel-adapter channel="orderEmailChannel" connection-factory="connectionFactory" 
		destination="orderEmailQueue" auto-startup="true"> 
		<int:poller fixed-delay="500"/> 
	</int-jms:inbound-channel-adapter>

	<int:channel id="orderEmailChannel">
		<int:queue />
	</int:channel>

	<bean id="orderEmailQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="orderEmailQueue" />
	</bean>

	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="tcp://localhost:61623" />
			</bean>
		</property>
		<property name="sessionCacheSize" value="10" />
		<property name="cacheProducers" value="false" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>

	<int:service-activator id="orderEmail" ref="emailOrder" input-channel="orderEmailChannel">
		<int:poller fixed-delay="500" />
	</int:service-activator>

	<bean id="emailOrder" class="com.javarticles.spring.integration.OrderEmailSender" />

</beans>

Now let’s start the email messaging system.

OrderEmailSystem:

package com.javarticles.spring.integration;

import org.apache.activemq.broker.BrokerService;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class OrderEmailSystem {
    public static final void main(String[] args) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setTransportConnectorURIs(new String[] { "tcp://localhost:61623" });
        broker.setDeleteAllMessagesOnStartup(true);
        broker.start();

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "orderEmailApplicationContext.xml");
        context.start();
    }
}

We have seen the configuration of the messaging system that is going to send email. Now we will see the configuration of the messaging system that is going to deliver the processed order message to the email system.

We have three channels orderChannel, orderProcessedChannel and orderEmailChannel. We will send the order message to orderChannel for order processing.

<int:channel id="orderChannel">
    <int:queue />
</int:channel>

<int:channel id="orderEmailChannel"/>
Once the order is processed, the message will be sent to orderProcessedChannel.

<int:service-activator id="processOrder"
		ref="orderProcessor" method="process" input-channel="orderChannel" 
		output-channel="orderProcessedChannel">
		<int:poller fixed-delay="500"/>
</int:service-activator>		

Next, we will configure a bridge endpoint to move the message from orderProcessedChannel to orderEmailChannel so that the message is sent to the email messaging system.

<int:bridge id="bridgeProcessedOrder2Email" input-channel="orderProcessedChannel" output-channel="orderEmailChannel" />		

An outbound JMS adapter is configured to pickup the message from orderEmailChannel and deliver to a JMS destination.

       <int-jms:outbound-channel-adapter
		channel="orderEmailChannel" connection-factory="connectionFactory"
		destination-name="orderEmailQueue"/>


	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="tcp://localhost:61623" />
			</bean>
		</property>
		<property name="sessionCacheSize" value="10" />
		<property name="cacheProducers" value="false" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>

Here is the entire application context XML.

orderProcessingApplicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration
			http://www.springframework.org/schema/integration/spring-integration.xsd
			http://www.springframework.org/schema/context 
			http://www.springframework.org/schema/context/spring-context.xsd
			http://www.springframework.org/schema/integration/jms 
			http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
	
	<int:channel id="orderChannel">
		<int:queue />
	</int:channel>

    <int:channel id="orderEmailChannel"/>

	<int:service-activator id="processOrder"
		ref="orderProcessor" method="process" input-channel="orderChannel" 
		output-channel="orderProcessedChannel">
		<int:poller fixed-delay="500"/>
    </int:service-activator>		

	<bean id="orderProcessor" class="com.javarticles.spring.integration.OrderProcessor" />
	
	<int:bridge id="bridgeProcessedOrder2Email" input-channel="orderProcessedChannel" output-channel="orderEmailChannel" />		

	<int-jms:outbound-channel-adapter
		channel="orderEmailChannel" connection-factory="connectionFactory"
		destination-name="orderEmailQueue"/>


	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="tcp://localhost:61623" />
			</bean>
		</property>
		<property name="sessionCacheSize" value="10" />
		<property name="cacheProducers" value="false" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
</beans>

Now let’s do some testing.

We will first run the email messaging system OrderEmailSystem.

Next, we will initiate the order processing.

SpringIntegrationBridgeMessagingSystemsExample:

package com.javarticles.spring.integration;

import java.io.IOException;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;

public class SpringIntegrationBridgeMessagingSystemsExample {

    public static void main(String[] args) throws InterruptedException,
            IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "orderProcessingApplicationContext.xml");
        try {
            context.start();
            PollableChannel orderChannel = (PollableChannel) context
                    .getBean("orderChannel");
            Message<String> orderMsg = MessageBuilder.withPayload(
                    "Order 3 Laptops").build();
            orderChannel.send(orderMsg);
            Thread.sleep(1000);
        } finally {
            context.close();
        }
    }

}

Output:

Output from Order Processing System:
Processed order Order 3 Laptops

Output From Email Messaging System:
Send email for Processed order Order 3 Laptops
Order Processing Messaging System

Order Processing Messaging System

Messaging System for mailing the Order

Messaging System for mailing the Order

Download the source code

This was an example about spring integration bridge endpoint.

You can download the source code here: springintegrationMessageBridge.zip

About Author

Ram's expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring. You can reach him at [email protected]

Comments are closed.