Apache Camel Splitter Example

0

In this article “Apache Camel Splitter Example”, we will see an example of how we can use Apache Camel‘s Splitter to split an order into its items and process each item individually. We use a Splitter to break out the composite message into a series of individual messages, process each item separately and then use aggregate to put them back into one message. “camel split” Included.

Let’s look into the setup details and then we will start with the example.

This example uses the following frameworks:

  1. Maven 3.2.3
  2. Apache Camel 2.15.1
  3. Spring 4.1.5.RELEASE
  4. Eclipse  as the IDE, version Luna 4.4.1.

Dependencies “camel split”

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javarticles.camel</groupId>
	<artifactId>camelHelloWorld</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>2.15.1</version>
		</dependency>				
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.12</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.12</version>
		</dependency>		
	</dependencies>
</project>

Splitter Example

Splitter has the option of aggregating the processed message fragments using AggregationStrategy. In this example, we don’t explicitly aggregate the messages in which case the default AggregationStrategy comes into picture and returns us the original unsplit exchange.

Order:

package com.javarticles.camel;

import java.util.ArrayList;
import java.util.List;

public class Order {

    private List orderItems = new ArrayList();

    private int number;

    public Order(int number) {
        this.number = number;
    }

    public void addItem(String itemName, int quantity, int price) {
        this.orderItems.add(new OrderItem(this,itemName, quantity, price));
    }

    public int getNumber() {
        return number;
    }

    public List getItems() {
        return this.orderItems;
    }
    
    public String toString() {
        return "Order: " + number + ", Items: " + orderItems;
    }
}

OrderItem:

package com.javarticles.camel;

public class OrderItem {
    private Order order;
    private String itemName;
    private int quantity;
    private int price;
    private boolean processed;
    
    public OrderItem(Order order, String itemName, int quantity, int price) {
        this.order = order;
        this.itemName = itemName;
        this.quantity = quantity;
        this.price = price;
    }

    public Order getOrder() {
        return order;
    }

    public String getItemName() {
        return itemName;
    }
    
    public void process() {
        processed = true;
    }

    public boolean isProcessed() {
        return processed;
    }

    public int getQuantity() {
        return quantity;
    }

    public int getPrice() {
        return price;
    }
    
    public String toString() {
        return "Item[" + itemName + ", total price: " + (quantity * price) + ", processed: " + isProcessed() + "]";
    }
    
}

OrderItemProcessor:

package com.javarticles.camel;

public class OrderItemProcessor {
    public void process(OrderItem item) {
        item.process();
    }
}

I have used a timer component to simulate generation of orders. After every 10 seconds, we see a new order generated and processed. You can reduce the period to 1000 to generate multiple orders.
First Routing:

  1. timer://generateOrders A timer component to generate orders
  2. process The timer component triggers a custom processor which generates order and sets the order into the message
  3. direct:processOrder The generated order is sent to this queue

Second Routing:

  1. direct:processOrder Order is fetched from this queue and then sent to the splitter
  2. split(simple("${body.items}") Splits the order into its items
  3. direct:processOrderItem The order item is sent to this queue for further processing
  4. end This ends the splitter block
  5. log After all the items are processed, we print the order

Third Routing:

  1. direct:processOrderItem Order item is fetched from this queue and then sent to the OrderItemProcessor bean for processing
  2. bean:processOrderItem OrderItemProcessor will process the order item. camel split

CamelSplitterExample:

package com.javarticles.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.jndi.JndiContext;

public class CamelSplitterExample {
	public static final void main(String[] args) throws Exception {
	    JndiContext jndiContext = new JndiContext();
	    jndiContext.bind("processOrderItem", new OrderItemProcessor());
        CamelContext camelContext = new DefaultCamelContext(jndiContext);
        try {
            camelContext.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("timer://generateOrders?fixedRate=true&period=10000")
                    .log("Generate order items")
                    .process(new Processor() {
                        
                        public void process(Exchange exchange) throws Exception {
                            Order order = new Order(1);
                            order.addItem("Coke", 20, 2);
                            order.addItem("Banana", 12, 1);
                            order.addItem("Tab", 5, 250);
                            exchange.getIn().setBody(order);
                        }
                    })
                    .to("direct:processOrder");
                    
                    from("direct:processOrder")
                    .log("Process order ${body}")
                    .split(simple("${body.items}"))
                    .to("direct:processOrderItem")
                    .log("Processing done ${body}")
                    .end()
                    .log("Order processed: ${body}");
                    
                    from("direct:processOrderItem")
                    .log("Processing item ${body.itemName}")
                    .to("bean:processOrderItem");
                    
                }
            });
            camelContext.start();
            Thread.sleep(5000);
        } finally {
            camelContext.stop();
        }
	}
}

Output:

22:14| INFO | MarkerIgnoringBase.java 95 | Generate order items
22:14| INFO | MarkerIgnoringBase.java 95 | Process order Order: 1, Items: [Item[Coke, total price: 40, processed: false], Item[Banana, total price: 12, processed: false], Item[Tab, total price: 1250, processed: false]]
22:14| INFO | MarkerIgnoringBase.java 95 | Processing item Coke
22:14| INFO | MarkerIgnoringBase.java 95 | Processing done Item[Coke, total price: 40, processed: true]
22:14| INFO | MarkerIgnoringBase.java 95 | Processing item Banana
22:14| INFO | MarkerIgnoringBase.java 95 | Processing done Item[Banana, total price: 12, processed: true]
22:14| INFO | MarkerIgnoringBase.java 95 | Processing item Tab
22:14| INFO | MarkerIgnoringBase.java 95 | Processing done Item[Tab, total price: 1250, processed: true]
22:14| INFO | MarkerIgnoringBase.java 95 | Order processed: Order: 1, Items: [Item[Coke, total price: 40, processed: true], Item[Banana, total price: 12, processed: true], Item[Tab, total price: 1250, processed: true]]

Download the source code

This was an example about Apache Camel Splitter. You can download the source code here: camelSplitterExample.zip

Share.

Comments are closed.