A spring integration bridge is an endpoint that simply passes the message directly from the input channel to the output channel without modifying it. The input channel can be either a pollable channel or a subscribable channel and likewise the output channel can be either a pollable or subscribable channel. If the output channel is missing then probably the bridge acts as a stopper.
We can also use bridge to join two different messaging systems. When a message is delivered on a channel of interest in one messaging system, the bridge consumes the message and sends another with the same contents on the corresponding channel in the other messaging system.
In our article, we will see an example for each case. The first one will use spring integration bridge to deliver the message from a subscribable channel to the pollable channel. The second one will deliver the message to a second messaging system.
Before we start with the example, lets first add module dependencies to our pom.xml
.
Dependencies
Add the following dependencies:
spring-core
spring-context
spring-integration-core
spring-integration-spring
activemq-core
spring-jms
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javarticles.spring.integration.jms</groupId> <artifactId>springintegrationjms</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.1.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> <version>4.1.2.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.2.0</version> </dependency> </dependencies> <properties> <spring.version>4.1.4.RELEASE</spring.version> </properties> </project>
Bridge Channels
In this example, we will bridge a publish/subscribe model channel with a pollable channel. Our example is about two categories of customer. The customers who pay for annual maintenance are given more priority than the customers who opt for demand service.
If one is privileged customer, then complaint posted is sent through highPriorityCustomerChannel
channel which marks the message as ‘High Priority’ message by setting header customerPriority
to ‘high’.
The routing is initiated through a call to the gateway adapter.
We have different gateways for each category of customer.
For priority customer:
<int:gateway id="privilegedCustomer" service-interface="com.javarticles.spring.integration.PrivilegedCustomer" default-request-channel="privilegedCustomerChannel" />
For the on-demand service customers:
<int:gateway id="baseCustomer" service-interface="com.javarticles.spring.integration.BaseCustomer" default-request-channel="basicCustomerChannel" />
Here is the gate way interface for privileged customer.
PrivilegedCustomer:
package com.javarticles.spring.integration; public interface PrivilegedCustomer { String processRequest(String complain); }
and for the usual customers.
BaseCustomer:
package com.javarticles.spring.integration; public interface BaseCustomer { String processRequest(String complain); }
We post the complaint using processRequest()
method. The complaint posted is sent to channel privilegedCustomerChannel
for the privileged customers and to basicCustomerChannel
for the non-privileged customers.
The privileged customer’s complaint goes through an additional channel as we want the priority to be marked as ‘high’. We simply pass the message from channel privilegedCustomerChannel
to channel highPriorityCustomerChannel
using the message bridge which is declared using the <bridge>
element in the integration namespace.
<int:bridge id="bridgeHighPriority" input-channel="privilegedCustomerChannel" output-channel="highPriorityCustomerChannel"> <int:poller fixed-delay="500" /> </int:bridge>
Marking of the header is done using the header enricher.
<int:header-enricher id="markAsHighPriority" input-channel="highPriorityCustomerChannel" output-channel="processComplaintChannel"> <int:header name="customerPriority" value="high" /> </int:header-enricher>
Both privileged customer’s complaint and the non-privileged customer’s complaint reach channel processComplaintChannel
for the actual processing of the complaint.
A service activator processComplaint
is hooked onto the output channel. As soon as the message arrives at the output channel via the bridge endpoint, the customerComplaintProcessor
bean is invoked for action. The privileged customer’s complaint is given immediate attention whereas the non-privileged customer’s complaint is queued.
<int:service-activator id="processComplaint" ref="customerComplaintProcessor" method="process" input-channel="processComplaintChannel" /> <bean id="customerComplaintProcessor" class="com.javarticles.spring.integration.ComplaintProcessor" />
ComplaintProcessor:
package com.javarticles.spring.integration; import org.springframework.messaging.Message; public class ComplaintProcessor { public String process(Message complaint) { String priority = complaint.getHeaders().get("customerPriority", String.class); if ("high".equals(priority)) { return "Complaint logged '" + complaint + "'"; } else { return "Complaint queued '" + complaint + "'"; } } }
You can imagine this as complaint being assigned to dedicated engineers for immediate action versus complaint assigned to a specific date for action.
Here is the entire application context XML.
complaintApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd"> <int:gateway id="privilegedCustomer" service-interface="com.javarticles.spring.integration.PrivilegedCustomer" default-request-channel="privilegedCustomerChannel" /> <int:gateway id="baseCustomer" service-interface="com.javarticles.spring.integration.BaseCustomer" default-request-channel="basicCustomerChannel" /> <int:channel id="privilegedCustomerChannel"> <int:queue /> </int:channel> <int:channel id="highPriorityCustomerChannel" /> <int:channel id="basicCustomerChannel"> <int:queue /> </int:channel> <int:channel id="processComplaintChannel" /> <int:bridge id="bridgeHighPriority" input-channel="privilegedCustomerChannel" output-channel="highPriorityCustomerChannel"> <int:poller fixed-delay="500" /> </int:bridge> <int:bridge id="bridgeBasicCustomer" input-channel="basicCustomerChannel" output-channel="processComplaintChannel"> <int:poller fixed-delay="500" /> </int:bridge> <int:header-enricher id="markAsHighPriority" input-channel="highPriorityCustomerChannel" output-channel="processComplaintChannel"> <int:header name="customerPriority" value="high" /> </int:header-enricher> <int:service-activator id="processComplaint" ref="customerComplaintProcessor" method="process" input-channel="processComplaintChannel" /> <bean id="customerComplaintProcessor" class="com.javarticles.spring.integration.ComplaintProcessor" /> </beans>
Here is the flow:
Now let’s post some complaint.
SpringIntegrationBridgeExample:
package com.javarticles.spring.integration; import java.io.IOException; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringIntegrationBridgeExample { public static void main(String[] args) throws InterruptedException, IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "complaintApplicationContext.xml"); try { PrivilegedCustomer privilegedCust = (PrivilegedCustomer) context .getBean("privilegedCustomer"); String complaint = "Machine is not working!"; System.out.println("Post high priority complaint '" + complaint +"'"); System.out.println("Response: " + privilegedCust.processRequest(complaint)); System.out.println("Post normal priority complaint '" + complaint +"'"); BaseCustomer baseCust = (BaseCustomer) context .getBean("baseCustomer"); System.out.println("Response: " + baseCust.processRequest("General maintenance!")); } finally { context.close(); } } }
Output:
Post high priority complaint 'Machine is not working!' Response: Complaint logged 'GenericMessage [payload=Machine is not working!, headers={replyChannel=org.springframewor[email protected]11ca1dcf, errorChannel=org.springframewor[email protected]11ca1dcf, id=54fc623d-b995-9d11-9771-cfed6d590369, customerPriority=high, timestamp=1435582630755}]' Post normal priority complaint 'Machine is not working!' Response: Complaint queued 'GenericMessage [payload=General maintenance!, headers={replyChannel=org.springframewor[email protected]24a75ab2, errorChannel=org.springframewor[email protected]24a75ab2, id=cab82e73-2132-4df4-4e33-743327afb0d2, timestamp=1435582630755}]'
Using Bridge to connect Messaging Systems
In this example, we will make use bridge to deliver messages from one message system to another. For example, consider this scenario, after the order s processed, we want the order confirmation email to be handled by a separate system so that the actual order processing and related components is unaffected by the process sending emails.
First we need to start the email messaging system’s JMS broker.
BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); broker.setTransportConnectorURIs(new String[] { "tcp://localhost:61623" }); broker.setDeleteAllMessagesOnStartup(true); broker.start();
Th JMS inbound adapter int-jms:inbound-channel-adapter
will receive the message the inbound queue and post the message to the channel orderEmailChannel
.
<int-jms:inbound-channel-adapter channel="orderEmailChannel" connection-factory="connectionFactory" destination="orderEmailQueue" auto-startup="true"> <int:poller fixed-delay="500"/> </int-jms:inbound-channel-adapter>
The order message will be picked up by the service activator to send the order confirmation by email.
<int:service-activator id="orderEmail" ref="emailOrder" input-channel="orderEmailChannel"> <int:poller fixed-delay="500" /> </int:service-activator> <bean id="emailOrder" class="com.javarticles.spring.integration.OrderEmailSender" />
We have simplified the class which sends the email.
OrderEmailSender:
package com.javarticles.spring.integration; public class OrderEmailSender { public void sendEmail(String order) { System.out.println("Send email for " + order); } }
orderEmailApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd"> <int-jms:inbound-channel-adapter channel="orderEmailChannel" connection-factory="connectionFactory" destination="orderEmailQueue" auto-startup="true"> <int:poller fixed-delay="500"/> </int-jms:inbound-channel-adapter> <int:channel id="orderEmailChannel"> <int:queue /> </int:channel> <bean id="orderEmailQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="orderEmailQueue" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61623" /> </bean> </property> <property name="sessionCacheSize" value="10" /> <property name="cacheProducers" value="false" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <int:service-activator id="orderEmail" ref="emailOrder" input-channel="orderEmailChannel"> <int:poller fixed-delay="500" /> </int:service-activator> <bean id="emailOrder" class="com.javarticles.spring.integration.OrderEmailSender" /> </beans>
Now let’s start the email messaging system.
OrderEmailSystem:
package com.javarticles.spring.integration; import org.apache.activemq.broker.BrokerService; import org.springframework.context.support.ClassPathXmlApplicationContext; public class OrderEmailSystem { public static final void main(String[] args) throws Exception { BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); broker.setTransportConnectorURIs(new String[] { "tcp://localhost:61623" }); broker.setDeleteAllMessagesOnStartup(true); broker.start(); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "orderEmailApplicationContext.xml"); context.start(); } }
We have seen the configuration of the messaging system that is going to send email. Now we will see the configuration of the messaging system that is going to deliver the processed order message to the email system.
We have three channels orderChannel
, orderProcessedChannel
and orderEmailChannel
. We will send the order message to orderChannel
for order processing.
<int:channel id="orderChannel"> <int:queue /> </int:channel> <int:channel id="orderEmailChannel"/>
Once the order is processed, the message will be sent to orderProcessedChannel
.
<int:service-activator id="processOrder" ref="orderProcessor" method="process" input-channel="orderChannel" output-channel="orderProcessedChannel"> <int:poller fixed-delay="500"/> </int:service-activator>
Next, we will configure a bridge endpoint to move the message from orderProcessedChannel
to orderEmailChannel
so that the message is sent to the email messaging system.
<int:bridge id="bridgeProcessedOrder2Email" input-channel="orderProcessedChannel" output-channel="orderEmailChannel" />
An outbound JMS adapter is configured to pickup the message from orderEmailChannel
and deliver to a JMS destination.
<int-jms:outbound-channel-adapter channel="orderEmailChannel" connection-factory="connectionFactory" destination-name="orderEmailQueue"/> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61623" /> </bean> </property> <property name="sessionCacheSize" value="10" /> <property name="cacheProducers" value="false" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean>
Here is the entire application context XML.
orderProcessingApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd"> <int:channel id="orderChannel"> <int:queue /> </int:channel> <int:channel id="orderEmailChannel"/> <int:service-activator id="processOrder" ref="orderProcessor" method="process" input-channel="orderChannel" output-channel="orderProcessedChannel"> <int:poller fixed-delay="500"/> </int:service-activator> <bean id="orderProcessor" class="com.javarticles.spring.integration.OrderProcessor" /> <int:bridge id="bridgeProcessedOrder2Email" input-channel="orderProcessedChannel" output-channel="orderEmailChannel" /> <int-jms:outbound-channel-adapter channel="orderEmailChannel" connection-factory="connectionFactory" destination-name="orderEmailQueue"/> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61623" /> </bean> </property> <property name="sessionCacheSize" value="10" /> <property name="cacheProducers" value="false" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean> </beans>
Now let’s do some testing.
We will first run the email messaging system OrderEmailSystem
.
Next, we will initiate the order processing.
SpringIntegrationBridgeMessagingSystemsExample:
package com.javarticles.spring.integration; import java.io.IOException; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; public class SpringIntegrationBridgeMessagingSystemsExample { public static void main(String[] args) throws InterruptedException, IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "orderProcessingApplicationContext.xml"); try { context.start(); PollableChannel orderChannel = (PollableChannel) context .getBean("orderChannel"); Message<String> orderMsg = MessageBuilder.withPayload( "Order 3 Laptops").build(); orderChannel.send(orderMsg); Thread.sleep(1000); } finally { context.close(); } } }
Output:
Output from Order Processing System: Processed order Order 3 Laptops Output From Email Messaging System: Send email for Processed order Order 3 Laptops
Download the source code
This was an example about spring integration bridge endpoint.