Apache Camel Filter Examples

0

There are times when we are interested in messages with certain criteria. If the criteria matches, we want the message to pass through else filtered.
To achieve this we use a special kind of Message Router, called Message Filter, to eliminate undesired messages from a channel based on a set of criteria.
In this article, we will see several examples of creating a Message Filter. We will filter based on the below criteria:

  1. POJO Method
  2. Based on a header property
  3. Based on a text in body
  4. XPath
  5. XQuery
  6. Using Camel’s Predicate
  7. Using Compound Predicate
Camel Filter

Camel Filter

Before we start with the example, let’s look into the setup details.

This example uses the following frameworks:

  1. Maven 3.2.3
  2. Apache Camel 2.15.1
  3. Spring 4.1.5.RELEASE
  4. Eclipse  as the IDE, version Luna 4.4.1.

Dependencies

We are just relying camel’s core components and the logger component so our pom.xml consists of:

  1. camel-core – camel core components like timer, bean etc
  2. slf4j-api – in case you want to use log
  3. slf4j-log4j12 – if you want to use log4j as the slf4j implementation
  4. camel-stream – for printing the messages to console
  5. spring-context – for spring support
  6. camel-spring – include it if you want to define route in spring
  7. camel-saxon – Saxon with the XPathBuilder
  8. camel-atom – used for polling Atom feeds.

Filtering using POJO

We want to filter the atom feed XML by camel based articles. Camel will poll the feed every 60 seconds by default.
feed.atom:

<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">

	<title>Java Articles</title>
	<link href="http://www.javarticles.com/" />
	<updated>2015-05-27T15:22:12Z</updated>
	<author>
		<name>Ram Satish</name>
	</author>
	<id>urn:uuid:50a76c30-d335-21d9-b93C-0003311e0af6</id>

	<entry>
		<title>Spring Integration</title>
	</entry>

	<entry>
		<title>Spring Security</title>
	</entry>

	<entry>
		<title>Java 8</title>
	</entry>

	<entry>
		<title>Camel Multitask</title>
	</entry>

	<entry>
		<title>Camel Wiretap</title>
	</entry>

	<entry>
		<title>Camel Logger</title>
	</entry>

</feed>

The <atom: uri is atom:file:src/main/resources/feed.atom?splitEntries=true&consumer.delay=1000. We are using a static file contents as the feed source.
Let’s review the properties:

  1. splitEntries – splits the entries, each poll will select one entry in serial order.
  2. consumer.delay – there is a delay of 1000 ms between each poll

The Entry object represents one feed entry that camel has polled. This is sent to destination seda:feeds.
We have another routing starting from seda:feeds, the entry object is filtered based on title criteria. We only allow the titles that start with ‘Camel’.
FeedFilterExample:

package com.javarticles.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class FeedFilterExample {
    public static void main(String[] args) throws Exception {
        CamelContext camelContext = new DefaultCamelContext();
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("atom:file:src/main/resources/feed.atom?splitEntries=true&consumer.delay=1000").to("seda:feeds");
                    
                    from("seda:feeds")
                    .filter()
                    .method(new CamelArticles(), "filter")
                    .to("seda:filteredArticles");
                    
                    from("seda:filteredArticles").to("stream:out");
            }});
            camelContext.start();
            Thread.sleep(5000);
        } finally {
            camelContext.stop();
        }
    }
}

We will allow only if the title starts with ‘camel’.
CamelArticles:

package com.javarticles.camel;

import org.apache.abdera.model.Entry;
import org.apache.camel.Exchange;


public class CamelArticles {
    public boolean filter(Exchange exchange) {
        Entry entry = exchange.getIn().getBody(Entry.class);
        String title = entry.getTitle();
        boolean camelArticles = title.toLowerCase().startsWith("camel");
        if (camelArticles) {
            System.out.println("allow " + title);
        }
        return camelArticles;
    }
}

Output:

allow Camel LoggerCamel Logger
allow Camel WiretapCamel Wiretap
allow Camel MultitaskCamel Multitask
	

Filter Using Spring

The above filter can be achieved by defining the route in spring. If you want to filter the messages you need to use <filter> element.

applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       ">
	<bean id="camelArticles" class="com.javarticles.camel.CamelArticles" />

	<camelContext xmlns="http://camel.apache.org/schema/spring">
		<route>
			<from uri="atom:file:src/main/resources/feed.atom?splitEntries=true&consumer.delay=1000" />
			<to uri="seda:feeds"/>
		</route>
		<route>
			<from uri="seda:feeds" />
			<filter>
				<method ref="camelArticles" method="filter" />
				<to uri="seda:filteredArticles" />
			</filter>
		</route>
		<route>
			<from uri="seda:filteredArticles"/>
			<to uri="stream:out"/>
		</route>
	</camelContext>
</beans>

The spring context is loaded. Based on spring context, we create camel context and then start it.
FeedFilterExampleUsingSpring:

package com.javarticles.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.spring.SpringCamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class FeedFilterExampleUsingSpring {
    public static void main(String[] args) throws Exception {
        ApplicationContext appContext = new ClassPathXmlApplicationContext(
                "applicationContext.xml");
        CamelContext camelContext = SpringCamelContext.springCamelContext(
                appContext, false);
        try {
            camelContext.start();          
            Thread.sleep(4000);
        } finally {
            camelContext.stop();
        }
    }

}

Output:

allow Camel LoggerCamel Logger
allow Camel WiretapCamel Wiretap
allow Camel MultitaskCamel Multitask
	

Filter by header

In this example, we will filter the messages by header value. We want to allow only high priority message to pass through the route.
Below is the routing which filters messages based on header. PrioritySetter sets the header highPriority to true or false based on the payload object sent.
If header highPriority is set to true, we allow it else filter it out.

from("direct:customer")
.process(new PrioritySetter())
.filter(header("highPriority").isEqualTo(true))
.to("direct:highPriority");

FilterByHeaderExample:

package com.javarticles.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultProducerTemplate;

public class FilterByHeaderExample {
    public static void main(String[] args) throws Exception {
        CamelContext camelContext = new DefaultCamelContext();
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("direct:customer")
                    .process(new PrioritySetter())
                    .filter(header("highPriority")                           
                            .isEqualTo(true))
                        .to("direct:highPriority");
                    from("direct:highPriority").to("stream:out");
            }});
            camelContext.start();
            ProducerTemplate template = new DefaultProducerTemplate(camelContext);
            template.start();
            System.out.println("Post high priority message");
            template.sendBody("direct:customer", new PriorityPayload(true, "This is high priority message"));
            System.out.println("Post low priority message");
            template.sendBody("direct:customer", new PriorityPayload(false, "This is low priority message"));
            Thread.sleep(5000);
        } finally {
            camelContext.stop();
        }
    }
    
    private static class PrioritySetter implements Processor {
        public void process(Exchange exchange) throws Exception {
            PriorityPayload payload = (PriorityPayload) exchange.getIn().getBody();
            exchange.getIn().setHeader("highPriority", payload.isHighPriority);
            exchange.getIn().setBody(payload.payload);
        }
    }
    
    private static class PriorityPayload {
        private boolean isHighPriority;
        private String payload;
        public PriorityPayload(boolean isHigh, String payload) {
            isHighPriority = isHigh;
            this.payload = payload;
        }
    }
}

Output:

Post high priority message
This is high priority message
Post low priority message

Filter by Predicate

In this example, we will filter using Predicate object. Predicate is an interface with one method matches(Exchange exchange). If the message is of our interest we return true else false. We verify the body contents to decide whether to allow or not. If the body starts with ‘Camel’, we allow the message to pass through.

FilterByPredicateExample:

package com.javarticles.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultProducerTemplate;

public class FilterByPredicateExample {
    public static void main(String[] args) throws Exception {
        CamelContext camelContext = new DefaultCamelContext();
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("direct:start")
                    .filter(new Predicate() {

                        public boolean matches(Exchange exchange) {
                            final String body = exchange.getIn().getBody(String.class);
                            return ((body != null) && body.startsWith("Camel"));
                        }})
                      .to("stream:out");                       
                    }});
            camelContext.start();
            ProducerTemplate template = new DefaultProducerTemplate(camelContext);
            template.start();            
            template.sendBody("direct:start", "Camel Multicast");
            template.sendBody("direct:start", "Camel Components");
            template.sendBody("direct:start", "Spring Integration");
            Thread.sleep(5000);
        } finally {
            camelContext.stop();
        }
    }
    
}

Output:

Camel Multicast
Camel Components

Filter by Compound Predicate Example

In this example, we combine predicates. We create a compound predicate. The body should start with ‘Camel’ and it should contain text ‘Components’. Both the predicates should hold true. Agree :-), the example look a bit dummy but you know now how to use the compound predicates:-)

FilterByCompoundPredicateExample:

package com.javarticles.camel;

import static org.apache.camel.builder.PredicateBuilder.and;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultProducerTemplate;

public class FilterByCompoundPredicateExample {
    public static void main(String[] args) throws Exception {
        CamelContext camelContext = new DefaultCamelContext();
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("direct:start")
                    .log("Allow if '${body}' is a camel component")
                    .filter(and(body().startsWith("Camel"), method(new CamelArticles(), "filterOnlyCamelComponents")))
                    .to("stream:out");
                }
            });
            camelContext.start();
            ProducerTemplate template = new DefaultProducerTemplate(
                    camelContext);
            template.start();
            template.sendBody("direct:start", "Camel Multicast");
            template.sendBody("direct:start", "Camel Component: Stream");
            template.sendBody("direct:start", "Spring Integration");
            Thread.sleep(5000);
        } finally {
            camelContext.stop();
        }
    }

}

CamelArticles:

package com.javarticles.camel;

import org.apache.abdera.model.Entry;
import org.apache.camel.Exchange;


public class CamelArticles {
    public boolean filter(Exchange exchange) {
        Entry entry = exchange.getIn().getBody(Entry.class);
        String title = entry.getTitle();
        boolean camelArticles = title.toLowerCase().startsWith("camel");
        if (camelArticles) {
            System.out.println("allow " + title);
        }
        return camelArticles;
    }
    
    public boolean filterOnlyCamelComponents(String body) {       
        boolean camelArticles = body.toLowerCase().contains("component");
        return camelArticles;
    }
}

Output:

09:41| INFO | MarkerIgnoringBase.java 95 | Allow if 'Camel Multicast' is a camel component
09:41| INFO | MarkerIgnoringBase.java 95 | Allow if 'Camel Component: Stream' is a camel component
Camel Component: Stream

Filter by XPath

In this example, we will use XPath to filter the camel titles.
We have two routings, one splits the XML based on camel category. The second routing filters further based on ‘Camel Component’ title.
FilterByXPathExample:

package com.javarticles.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.xml.XPathBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultProducerTemplate;

public class FilterByXPathExample {
    public static void main(String[] args) throws Exception {
        final String articlesXml = 
                "<blog><article><category>spring integration</category><title>SpringInt Splitter</title></article>" +
                "<article><category>java</category><title>Lambda</title></article>" +
                "<article><category>camel</category><title>Camel Multicast</title></article>" +
                "<article><category>camel</category><title>Camel Component: ActiveMQ </title></article>" +
                "<article><category>camel</category><title>Camel Component: Timer</title></article>" +
                "<article><category>camel</category><title>Camel Component: Logger</title></article>" +
                "<article><category>camel</category><title>Camel DSL</title></article></blog>";
        CamelContext camelContext = new DefaultCamelContext();
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    XPathBuilder splitter = new XPathBuilder("//blog/article");
                    from("direct:xpath")
                         .split(splitter).filter().xquery("//article[category='camel']")
                         .to("direct:camel");
                    
                    from("direct:camel")
                    .filter().xpath("//article/title[contains(.,'Camel Component')]")
                    .to("stream:out");

            }});
            camelContext.start();
            ProducerTemplate template = new DefaultProducerTemplate(camelContext);
            template.start();
            template.sendBody("direct:xpath", articlesXml);
            Thread.sleep(1000);
        } finally {
            camelContext.stop();
        }
    }    
}

Output:

<article><category>camel</category><title>Camel Component: ActiveMQ </title></article><article><category>camel</category><title>Camel Component: Timer</title></article><article><category>camel</category><title>Camel Component: Logger</title></article>10:01| INFO | DefaultCamelContext.java 2660 | Apache Camel 2.15.1 (CamelContext: camel-1) is shutting down

Filter Stream by comparing body

In this example, we will create a binary file of titles. In the route, we filter based on the body contents. We allow one that starts with ‘camel’.
FilterStreamExample:

package com.javarticles.camel;

import java.io.File;
import java.io.FileOutputStream;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class FilterStreamExample {
    public static void main(String[] args) throws Exception {
        deleteDir("target/stream");
        createDir("target/stream");
        File file = new File("target/stream/articlesStream.txt");
        file.createNewFile();
        
        FileOutputStream fos = new FileOutputStream(file);
        fos.write("spring core\n".getBytes());
        fos.write("spring batch\n".getBytes());
        fos.write("camel components\n".getBytes());
        fos.write("spring integration\n".getBytes());
        fos.write("camel dsl\n".getBytes());
        fos.write("java 8\n".getBytes());
        fos.close();
        
        CamelContext camelContext = new DefaultCamelContext();
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("stream:file?fileName=target/stream/articlesStream.txt&scanStream=true&scanStreamDelay=100")
                    .filter(body().startsWith("camel"))
                    .to("stream:out");
            }});
            camelContext.start();
            Thread.sleep(1000);
        } finally {
            camelContext.stop();
        }
    }    
    
    public static void createDir(String fileName) {
        File dir = new File(fileName);
        dir.mkdirs();
    }    
    
    private static void deleteDir(String fileName) {
        File file = new File(fileName);
        deleteDir(file);
    }
    
    private static void deleteDir(File file) {
        if (!file.exists()) {
            return;
        }
        
        if (file.isDirectory()) {
            File[] files = file.listFiles();
            for (File child : files) {
                deleteDir(child);
            }
        }
        file.delete();
    }
}

Output:

camel components
camel dsl

Download the source code

This was an example about Camel Filters. You can download the source code here: camelFilterExamples.zip

Share.

Comments are closed.