SplitterPage edited by Claus IbsenChanges (2)
Full ContentSplitterThe Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually As of Camel 2.0, you need to specify a Splitter as split(). In earlier versions of Camel, you need to use splitter(). Options
Exchange propertiesThe following properties is set on each Exchange that are split:
ExamplesThe following example shows how to take a request from the queue:a endpoint the split it into pieces using an _expression_, then forward each piece to queue:b Using the Fluent Builders RouteBuilder builder = new RouteBuilder() { public void configure() { errorHandler(deadLetterChannel("mock:error")); from("seda:a") .split(body(String.class).tokenize("\n")) .to("seda:b"); } }; The splitter can use any _expression_ language so you could use any of the Languages Supported such as XPath, XQuery, SQL or one of the Scripting Languages to perform the split. e.g. from("activemq:my.queue").split(xpath("//foo/bar")).convertBodyTo(String.class).to("file://some/directory") Using the Spring XML Extensions <camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <split> <xpath>/invoice/lineItems</xpath> <to uri="seda:b"/> </split> </route> </camelContext> For further examples of this pattern in use you could look at one of the junit test case Using Tokenizer from Spring XML Extensions*Avaiaible as of Camel 2.0 You can use the tokenizer _expression_ in the Spring DSL to split bodies or headers using a token. This is a common use-case, so we provided a special tokenizer tag for this. <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <split> <tokenize token="@"/> <to uri="mock:result"/> </split> </route> </camelContext> Splitting the body in Spring XML is a bit harder as you need to use the Simple language to dictate this <split> <simple>${body}</simple> <to uri="mock:result"/> </split> What does the splitter return?Camel 2.2 or older: Camel 2.3 and newer For all versions Parallel execution of distinct 'parts'If you want to execute all parts in parallel you can use special notation of split() with two arguments, where the second one is a boolean flag if processing should be parallel. e.g. XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); from("activemq:my.queue").split(xPathBuilder, true).to("activemq:my.parts"); In Camel 2.0 the boolean option has been refactored into a builder method parallelProcessing so its easier to understand what the route does when we use a method instead of true|false. XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); from("activemq:my.queue").split(xPathBuilder).parallelProcessing().to("activemq:my.parts"); Stream based
You can split streams by enabling the streaming mode using the streaming builder method. from("direct:streaming").split(body().tokenize(",")).streaming().to("activemq:my.parts"); You can also supply your custom splitter to use with streaming like this: import static org.apache.camel.builder.ExpressionBuilder.beanExpression; from("direct:streaming") .split(beanExpression(new MyCustomIteratorFactory(), "iterator")) .streaming().to("activemq:my.parts") Specifying a custom aggregation strategyAvailable as of Camel 2.0 This is specified similar to the Aggregator. Specifying a custom ThreadPoolExecutorYou can customize the underlying ThreadPoolExecutor used in the parallel splitter. In the Java DSL try something like this: XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); ExecutorService pool = ... from("activemq:my.queue") .split(xPathBuilder).parallelProcessing().executorService(pool) .to("activemq:my.parts"); Using a Pojo to do the splittingAs the Splitter can use any _expression_ to do the actual splitting we leverage this fact and use a method _expression_ to invoke a Bean to get the splitted parts. In the route we define the _expression_ as a method call to invoke our Bean that we have registered with the id mySplitterBean in the Registry. from("direct:body") // here we use a POJO bean mySplitterBean to do the split of the payload .split().method("mySplitterBean", "splitBody") .to("mock:result"); from("direct:message") // here we use a POJO bean mySplitterBean to do the split of the message // with a certain header value .split().method("mySplitterBean", "splitMessage") .to("mock:result"); And the logic for our Bean is as simple as. Notice we use Camel Bean Binding to pass in the message body as a String object. public class MySplitterBean { /** * The split body method returns something that is iteratable such as a java.util.List. * * @param body the payload of the incoming message * @return a list containing each part splitted */ public List<String> splitBody(String body) { // since this is based on an unit test you can of cause // use different logic for splitting as Camel have out // of the box support for splitting a String based on comma // but this is for show and tell, since this is java code // you have the full power how you like to split your messages List<String> answer = new ArrayList<String>(); String[] parts = body.split(","); for (String part : parts) { answer.add(part); } return answer; } /** * The split message method returns something that is iteratable such as a java.util.List. * * @param header the header of the incoming message with the name user * @param body the payload of the incoming message * @return a list containing each part splitted */ public List<Message> splitMessage(@Header(value = "user") String header, @Body String body) { // we can leverage the Parameter Binding Annotations // http://camel.apache.org/parameter-binding-annotations.html // to access the message header and body at same time, // then create the message that we want, splitter will // take care rest of them. // *NOTE* this feature requires Camel version >= 1.6.1 List<Message> answer = new ArrayList<Message>(); String[] parts = header.split(","); for (String part : parts) { DefaultMessage message = new DefaultMessage(); message.setHeader("user", part); message.setBody(body); answer.add(message); } return answer; } } Split aggregate request/reply sampleThis sample shows how you can split an Exchange, process each splitted message, aggregate and return a combined response to the original caller using request/reply. The route below illustrates this and how the split supports a aggregationStrategy to hold the in progress processed messages: // this routes starts from the direct:start endpoint // the body is then splitted based on @ separator // the splitter in Camel supports InOut as well and for that we need // to be able to aggregate what response we need to send back, so we provide our // own strategy with the class MyOrderStrategy. from("direct:start") .split(body().tokenize("@"), new MyOrderStrategy()) // each splitted message is then send to this bean where we can process it .to("bean:MyOrderService?method=handleOrder") // this is important to end the splitter route as we do not want to do more routing // on each splitted message .end() // after we have splitted and handled each message we want to send a single combined // response back to the original caller, so we let this bean build it for us // this bean will receive the result of the aggregate strategy: MyOrderStrategy .to("bean:MyOrderService?method=buildCombinedResponse") And the OrderService bean is as follows: public static class MyOrderService { private static int counter; /** * We just handle the order by returning a id line for the order */ public String handleOrder(String line) { LOG.debug("HandleOrder: " + line); return "(id=" + ++counter + ",item=" + line + ")"; } /** * We use the same bean for building the combined response to send * back to the original caller */ public String buildCombinedResponse(String line) { LOG.debug("BuildCombinedResponse: " + line); return "Response[" + line + "]"; } } And our custom aggregationStrategy that is responsible for holding the in progress aggregated message that after the splitter is ended will be sent to the buildCombinedResponse method for final processing before the combined response can be returned to the waiting caller. /** * This is our own order aggregation strategy where we can control * how each splitted message should be combined. As we do not want to * loos any message we copy from the new to the old to preserve the * order lines as long we process them */ public static class MyOrderStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { // put order together in old exchange by adding the order from new exchange if (oldExchange == null) { // the first time we aggregate we only have the new exchange, // so we just return it return newExchange; } String orders = oldExchange.getIn().getBody(String.class); String newLine = newExchange.getIn().getBody(String.class); LOG.debug("Aggregate old orders: " + orders); LOG.debug("Aggregate new order: " + newLine); // put orders together separating by semi colon orders = orders + ";" + newLine; // put combined order back on old to preserve it oldExchange.getIn().setBody(orders); // return old as this is the one that has all the orders gathered until now return oldExchange; } } So lets run the sample and see how it works. HandleOrder: A HandleOrder: B Aggregate old orders: (id=1,item=A) Aggregate new order: (id=2,item=B) HandleOrder: C Aggregate old orders: (id=1,item=A);(id=2,item=B) Aggregate new order: (id=3,item=C) BuildCombinedResponse: (id=1,item=A);(id=2,item=B);(id=3,item=C) Response to caller: Response[(id=1,item=A);(id=2,item=B);(id=3,item=C)] Stop processing in case of exceptionAvailable as of Camel 2.1 The Splitter will by default continue to process the entire Exchange even in case of one of the splitted message will thrown an exception during routing. But sometimes you just want Camel to stop and let the exception be propagated back, and let the Camel error handler handle it. You can do this in Camel 2.1 by specifying that it should stop in case of an exception occurred. This is done by the stopOnException option as shown below: from("direct:start") .split(body().tokenize(",")).stopOnException() .process(new MyProcessor()) .to("mock:split"); And using XML DSL you specify it as follows: <route> <from uri="direct:start"/> <split stopOnException="true"> <tokenize token=","/> <process ref="myProcessor"/> <to uri="mock:split"/> </split> </route> Using onPrepare to execute custom logic when preparing messagesAvailable as of Camel 2.8 See details at Multicast Sharing unit of workAvailable as of Camel 2.8 The Splitter will by default not share unit of work between the parent exchange and each splitted exchange. This means each sub exchange has its own individual unit of work. For example you may have an use case, where you want to split a big message. And you want to regard that process as an atomic isolated operation that either is a success or failure. In case of a failure you want that big message to be moved into a dead letter queue. To support this use case, you would have to share the unit of work on the Splitter. Here is an example in Java DSL errorHandler(deadLetterChannel("mock:dead").useOriginalMessage() .maximumRedeliveries(3).redeliveryDelay(0)); from("direct:start") .to("mock:a") // share unit of work in the splitter, which tells Camel to propagate failures from // processing the splitted messages back to the result of the splitter, which allows // it to act as a combined unit of work .split(body().tokenize(",")).shareUnitOfWork() .to("mock:b") .to("direct:line") .end() .to("mock:result"); from("direct:line") .to("log:line") .process(new MyProcessor()) .to("mock:line"); Now in this example what would happen is that in case there is a problem processing each sub message, the error handler will kick in (yes error handling still applies for the sub messages). But what doesn't happen is that if a sub message fails all redelivery attempts (its exhausted), then its not moved into that dead letter queue. The reason is that we have shared the unit of work, so the sub message will report the error on the shared unit of work. When the Splitter is done, it checks the state of the shared unit of work and checks if any errors occurred. And if an error occurred it will set the exception on the Exchange and mark it for rollback. The error handler will yet again kick in, as the Exchange has been marked as rollback and it had an exception as well. No redelivery attempts is performed (as it was marked for rollback) and the Exchange will be moved into the dead letter queue. Using this from XML DSL is just as easy as you just have to set the shareUnitOfWork attribute to true: <camelContext errorHandlerRef="dlc" xmlns="http://camel.apache.org/schema/spring"> <!-- define error handler as DLC, with use original message enabled --> <errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="mock:dead" useOriginalMessage="true"> <redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="0"/> </errorHandler> <route> <from uri="direct:start"/> <to uri="mock:a"/> <!-- share unit of work in the splitter, which tells Camel to propagate failures from processing the splitted messages back to the result of the splitter, which allows it to act as a combined unit of work --> <split shareUnitOfWork="true"> <tokenize token=","/> <to uri="mock:b"/> <to uri="direct:line"/> </split> <to uri="mock:result"/> </route> <!-- route for processing each splitted line --> <route> <from uri="direct:line"/> <to uri="log:line"/> <process ref="myProcessor"/> <to uri="mock:line"/> </route> </camelContext>
Using This PatternIf you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.
Change Notification Preferences
View Online
|
View Changes
|
Add Comment
|
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence
- [CONF] Apache Camel > Splitter confluence