...
Note: In Java DSL you use the tokenizer
to archive the same. The route above in Java DSL:
Code Block |
from("direct:a").recipientList(header("myHeader").tokenize(","));
|
In Camel 2.1 its a bit easier as you can pass in the delimiter as 2nd parameter:
Code Block |
from("direct:a").recipientList(header("myHeader"), "#");
|
...
The Recipient List now supports parallelProcessing
that for example Splitter also supports. You can use it to use a thread pool to have concurrent tasks sending the Exchange to multiple recipients concurrently.
Code Block |
from("direct:a").recipientList(header("myHeader")).parallelProcessing();
|
And in Spring XML its an attribute on the recipient list tag.
Code Block |
<route>
<from uri="direct:a"/>
<recipientList parallelProcessing="true">
<header>myHeader</header>
</recipientList>
</route>
|
...
The Recipient List now supports stopOnException
that for example Splitter also supports. You can use it to stop sending to any further recipients in case any recipient failed.
Code Block |
from("direct:a").recipientList(header("myHeader")).stopOnException();
|
And in Spring XML its an attribute on the recipient list tag.
Code Block |
<route>
<from uri="direct:a"/>
<recipientList stopOnException="true">
<header>myHeader</header>
</recipientList>
</route>
|
...
The Recipient List now supports ignoreInvalidEndpoints
which the Routing Slip also supports. You can use it to skip endpoints which is invalid.
Code Block |
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();
|
And in Spring XML its an attribute on the recipient list tag.
Code Block |
<route>
<from uri="direct:a"/>
<recipientList ignoreInvalidEndpoints="true">
<header>myHeader</header>
</recipientList>
</route>
|
...
You can now use you own AggregationStrategy
with the Recipient List. However its not that often you need that. What its good for is that in case you are using Request Reply messaging then the replies from the recipient can be aggregated. By default Camel uses UseLatestAggregationStrategy
which just keeps that last received reply. What if you must remember all the bodies that all the recipients send back, then you can use your own custom aggregator that keeps those. Its the same principle as with the Aggregator EIP so check it out for details.
Code Block |
from("direct:a")
.recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
.to("direct:b");
|
And in Spring XML its an attribute on the recipient list tag.
Code Block |
<route>
<from uri="direct:a"/>
<recipientList strategyRef="myStrategy">
<header>myHeader</header>
</recipientList>
<to uri="direct:b"/>
</route>
<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
|
...
When using a custom AggregationStrategy
then the aggregate
method is always invoked in the sequential order (also if parallel processing is enabled) of the endpoints the Recipient List is using. However from Camel 2.12 this is easier to know as the newExchange
Exchange has a property stored (key is Exchange.RECIPIENT_LIST_ENDPOINT
with the uri of the Endpoint.
Code Block |
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String uri = newExchange.getProperty(Exchange.RECIPIENT_LIST_ENDPOINT, String.class);
...
}
|
...
You can use a Bean to provide the recipients, for example:
Code Block |
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");
|
And then MessageRouter
:
Code Block |
public class MessageRouter {
public String routeTo() {
String queueName = "activemq:queue:test2";
return queueName;
}
}
|
When you use a Bean then do not also use the @RecipientList
annotation as this will in fact add yet another recipient list, so you end up having two. Do not do like this.
Code Block |
public class MessageRouter {
@RecipientList
public String routeTo() {
String queueName = "activemq:queue:test2";
return queueName;
}
}
|
Well you should only do like that above (using @RecipientList
) if you route just route to a Bean which you then want to act as a recipient list.
So the original route can be changed to:
Code Block |
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
|
...
By default if a timeout occurs the AggregationStrategy
is not invoked. However you can implement a specialized version
Code Block |
title |
TimeoutAwareAggregationStrategy |
java:title |
TimeoutAwareAggregationStrategy |
|
|
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {
/**
* A timeout occurred
*
* @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
* @param index the index
* @param total the total
* @param timeout the timeout value in millis
*/
void timeout(Exchange oldExchange, int index, int total, long timeout);
|
...
Available as of Camel 2.8
See details at Multicast
Include Page |
|