Spring Integration Feed Adapter Example

0

Web Syndication refers to the websites providing information and the websites displaying it.

Web feeds available from a site provide help other people with a summary or update of the website’s recently added content.

Spring Integration provides support for Web Syndication by supplying a feed adapter that enables subscribing to an RSS or ATOM feed.

In this example, we will listen to an online feed URL and get the latest post details. Let’s first add the feed adapter dependency.

Dependency

Add the following dependencies:

  1. spring-core
  2. spring-context
  3. spring-integration-core
  4. spring-integration-feed - This is necessary to access the feed adapter

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javarticles.spring.integration.jms</groupId>
	<artifactId>springintegrationjms</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>4.1.2.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-feed</artifactId>
			<version>4.1.2.RELEASE</version>
			<scope>compile</scope>
		</dependency>
	</dependencies>

	<properties>
		<spring.version>4.1.4.RELEASE</spring.version>
	</properties>

</project>

Feed Adapter Example

Few points to note about:

  1. feed:inbound-channel-adapter is your feed adapter
  2. Specify the feed url in url attribute
  3. Feed polled will be sent to the channel specified in channel

The feed adapter will listen to the RSS feed at http://feeds.feedburner.com/javarticles/ieht and will send a message to the channel articlesChannel.

pollableChannelFeedContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
			http://www.springframework.org/schema/integration/feed http://www.springframework.org/schema/integration/feed/spring-integration-feed.xsd">
	<int-feed:inbound-channel-adapter id="feedAdapter"
		channel="articleChannel" auto-startup="true"
		url="http://feeds.feedburner.com/javarticles/ieht">
		<int:poller fixed-rate="10000" max-messages-per-poll="100" />
	</int-feed:inbound-channel-adapter>

	<int:channel id="articleChannel">
		<int:priority-queue/>
	</int:channel>
</beans>

The payload will be of type com.sun.syndication.feed.syn.SyndEntry, which encapsulates information about the news item including content, published date, and title. A poller element is required for
the feed adapter, because it is a poller consumer.

SpringIntegrationPollableChannelFeedExample:

package com.javarticles.spring.integration.feed;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;

import com.rometools.rome.feed.synd.SyndEntry;

public class SpringIntegrationPollableChannelFeedExample {

    public static void main(String[] args) throws InterruptedException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "pollableChannelFeedContext.xml");
        try {
           PollableChannel feedChannel = context.getBean("articleChannel", PollableChannel.class);
            for (int i = 0; i < 10; i++) {
                Message message = (Message) feedChannel.receive(10000);
                if (message != null){
                    SyndEntry entry = message.getPayload();
                    System.out.println(entry.getPublishedDate() + " - " + entry.getTitle());
                }
                else {
                    break;
                }
            }            
        } finally {
            context.close();
        }
    }
    
}

Output:

Tue Apr 28 09:26:44 IST 2015 - Example of registering MBean
Wed Apr 29 10:08:19 IST 2015 - Guava ImmutableList Examples
Thu Apr 30 22:27:28 IST 2015 - Guava Predicate Examples
Fri May 01 14:23:22 IST 2015 - Spring Integration ‘Hello World’ Example
Sat May 02 10:09:37 IST 2015 - Spring Integration Managing Lifecycle using ControlBus
Sat May 02 16:22:46 IST 2015 - Spring Integration Gateway Example
Sun May 03 11:42:23 IST 2015 - Spring Integration Header Enricher Example
Sun May 03 20:20:08 IST 2015 - Spring Integration Content Enricher Example
Tue May 05 08:23:14 IST 2015 - Spring Integration Feed Adapter Example
Tue May 05 10:20:21 IST 2015 - Spring Integration File Adapter Example

In our configuration auto-start is set to true so the feed adapter will start automatically as soon the context is loaded. When the feed adapter starts, it will do its first polling, a com.sun.syndication.feed.synd.SyndEntryFeed instance will be created. It will contain multiple SyndEntry objects and each entry is stored in a local entry queue. The queue will be refreshed if it becomes empty and there are additional new entries available.

How to skip duplicate entries?

Spring Integration also provides a mechanism to prevent duplicate entries. Each feed entry has an associated published date field. Each time the feed adapter creates a new message object, the feed
adapter stores the latest published date in an instance of org.springframework.integration.core.store.MetadataStore. The feed adapter will look at this data store before creating and sending a message to insure that no duplicate entries are sent.

skipDuplicateMessagesContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
			http://www.springframework.org/schema/integration/feed http://www.springframework.org/schema/integration/feed/spring-integration-feed.xsd">
	<int-feed:inbound-channel-adapter id="feedAdapter"
		channel="articleChannel" auto-startup="true"
		url="http://feeds.feedburner.com/javarticles/ieht">
		<int:poller fixed-rate="10000" max-messages-per-poll="100" />
	</int-feed:inbound-channel-adapter>

	<bean id="metadataStore"
		class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore">
		<property name="baseDirectory" value="./" />
	</bean>

	<int:channel id="articleChannel">
		<int:queue />
	</int:channel>
</beans>

You need to point bean id metadataStore to class PropertiesPersistingMetadataStore. With this change properties will be persisted.

SpringIntegrationSkipDuplicatelFeedExample:

package com.javarticles.spring.integration.feed;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;

import com.rometools.rome.feed.synd.SyndEntry;

public class SpringIntegrationSkipDuplicatelFeedExample {

    public static void main(String[] args) throws InterruptedException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "skipDuplicateMessagesFeedContext.xml");
        try {
           PollableChannel feedChannel = context.getBean("articleChannel", PollableChannel.class);
            for (int i = 0; i < 10; i++) {
                Message message = (Message) feedChannel.receive(10000);
                if (message != null){
                    SyndEntry entry = message.getPayload();
                    System.out.println(entry.getPublishedDate() + " - " + entry.getTitle());
                }
                else {
                    break;
                }
            }            
        } finally {
            context.close();
        }
    }
    
}

Output:

Tue Apr 28 09:26:44 IST 2015 - Example of registering MBean
Wed Apr 29 10:08:19 IST 2015 - Guava ImmutableList Examples
Thu Apr 30 22:27:28 IST 2015 - Guava Predicate Examples
Fri May 01 14:23:22 IST 2015 - Spring Integration ‘Hello World’ Example
Sat May 02 10:09:37 IST 2015 - Spring Integration Managing Lifecycle using ControlBus
Sat May 02 16:22:46 IST 2015 - Spring Integration Gateway Example
Sun May 03 11:42:23 IST 2015 - Spring Integration Header Enricher Example
Sun May 03 20:20:08 IST 2015 - Spring Integration Content Enricher Example
Tue May 05 08:23:14 IST 2015 - Spring Integration Feed Adapter Example
Tue May 05 10:20:21 IST 2015 - Spring Integration File Adapter Example

When you run this, a properties file will be generated.

metadata-source.properties:

#Last entry
#Tue May 05 00:31:36 IST 2015
feedAdapter.http\://feeds.feedburner.com/javarticles/ieht=1430664608000

When you run this second time, you won’t see any news feed if there are no new new articles published since its last entry.

Example of feed using DirectChannel

directChannelFeedContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
			http://www.springframework.org/schema/integration/feed http://www.springframework.org/schema/integration/feed/spring-integration-feed.xsd">
	<int-feed:inbound-channel-adapter id="feedAdapter"
		channel="articleChannel" auto-startup="true"
		url="http://feeds.feedburner.com/javarticles/ieht">
		<int:poller fixed-rate="10000" max-messages-per-poll="100" />
	</int-feed:inbound-channel-adapter>

	<int:channel id="articleChannel"/>
</beans>

Its very similar to the previous examples, except that we will have a message handler listening on the direct channel. The messages will be received in its callback.

SpringIntegrationDirectChannelFeedExample:

package com.javarticles.spring.integration.feed;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import com.rometools.rome.feed.synd.SyndEntry;

public class SpringIntegrationDirectChannelFeedExample {

    public static void main(String[] args) throws InterruptedException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "directChannelFeedContext.xml");
        try {            
            DirectChannel channel = context.getBean("articleChannel", DirectChannel.class);
            channel.subscribe(new MessageHandler() {
                
                public void handleMessage(Message<?> message) throws MessagingException {
                    SyndEntry entry = (SyndEntry) message.getPayload();
                    System.out.println(entry.getPublishedDate() + " - " + entry.getTitle());
                }
            });
            Thread.sleep(5000);
        } finally {
            context.close();
        }
    }    
}

Output:

Tue Apr 28 09:26:44 IST 2015 - Example of registering MBean
Wed Apr 29 10:08:19 IST 2015 - Guava ImmutableList Examples
Thu Apr 30 22:27:28 IST 2015 - Guava Predicate Examples
Fri May 01 14:23:22 IST 2015 - Spring Integration ‘Hello World’ Example
Sat May 02 10:09:37 IST 2015 - Spring Integration Managing Lifecycle using ControlBus
Sat May 02 16:22:46 IST 2015 - Spring Integration Gateway Example
Sun May 03 11:42:23 IST 2015 - Spring Integration Header Enricher Example
Sun May 03 20:20:08 IST 2015 - Spring Integration Content Enricher Example
Tue May 05 08:23:14 IST 2015 - Spring Integration Feed Adapter Example
Tue May 05 10:20:21 IST 2015 - Spring Integration File Adapter Example

Download the source code

This was an example about spring integration feed adapter. You can download the source code here: springintegrationFeed.zip/a>

About Author

Ram's expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring. You can reach him at [email protected]

Comments are closed.