Aggregator2Page edited by Ben O'Day
Comment:
added info for CAMEL-4118
Changes (1)
Full ContentAggregatorThis applies for Camel version 2.3 or newer. If you use an older version then use this Aggregator link instead. The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message. A correlation _expression_ is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant _expression_. An AggregationStrategy is used to combine all the message exchanges for a single correlation key into a single message exchange. Aggregator optionsThe aggregator supports the following options:
Exchange PropertiesThe following properties are set on each aggregated Exchange:
About AggregationStrategyThe AggregationStrategy is used for aggregating the old (lookup by its correlation id) and the new exchanges together into a single exchange. Possible implementations include performing some kind of combining or delta processing, such as adding line items together into an invoice or just using the newest exchange and removing old exchanges such as for state tracking or market data prices; where old values are of little use. Notice the aggregation strategy is a mandatory option and must be provided to the aggregator. Here are a few example AggregationStrategy implementations that should help you create your own custom strategy. //simply combines Exchange String body values using '+' as a delimiter class StringAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(oldBody + "+" + newBody); return oldExchange; } } //simply combines Exchange body values into an ArrayList<Object> class ArrayListAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(); ArrayList<Object> list = null; if (oldExchange == null) { list = new ArrayList<Object>(); list.add(newBody); newExchange.getIn().setBody(list); return newExchange; } else { list = oldExchange.getIn().getBody(ArrayList.class); list.add(newBody); return oldExchange; } } } About completionWhen aggregation Exchanges at some point you need to indicate that the aggregated exchanges is complete, so they can be send out of the aggregator. Camel allows you to indicate completion in various ways as follows:
Notice that all the completion ways are per correlation key. And you can combine them in any way you like. It's basically the first which triggers that wins. So you can use a completion size together with a completion timeout. Only completionTimeout and completionInterval cannot be used at the same time. Notice the completion is a mandatory option and must be provided to the aggregator. If not provided Camel will thrown an Exception on startup. Persistent AggregationRepositoryThe aggregator provides a pluggable repository which you can implement your own org.apache.camel.spi.AggregationRepository. ExamplesSee some examples from the old Aggregator which is somewhat similar to this new aggregator.
Using completionTimeoutIn this example we want to aggregate all incoming messages and after 3 seconds of inactivity we want the aggregation to complete. This is done using the completionTimeout option as shown: from("direct:start") // aggregate all exchanges correlated by the id header. // Aggregate them using the BodyInAggregatingStrategy strategy which // and after 3 seconds of inactivity them timeout and complete the aggregation // and send it to mock:aggregated .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(3000) .to("mock:aggregated"); And the same example using Spring XML: <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000"> <correlationExpression> <simple>header.id</simple> </correlationExpression> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> Using completionSizeIn this example we want to aggregate all incoming messages and when we have 3 messages aggregated (in the same correlation group) we want the aggregation to complete. This is done using the completionSize option as shown: from("direct:start") // aggregate all exchanges correlated by the id header. // Aggregate them using the BodyInAggregatingStrategy strategy which // and after 3 messages has been aggregated then complete the aggregation // and send it to mock:aggregated .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3) .to("mock:aggregated"); And the same example using Spring XML: <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" completionSize="3"> <correlationExpression> <simple>header.id</simple> </correlationExpression> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> Using completionPredicateIn this example we want to aggregate all incoming messages and use a Predicate to determine when we are complete. The Predicate can be evaluated using either the aggregated exchange (default) or the incoming exchange. We will so both situations as examples. We start with the default situation as shown: from("direct:start") // aggregate all exchanges correlated by the id header. // Aggregate them using the BodyInAggregatingStrategy strategy which // and when the aggregated body contains A+B+C then complete the aggregation // and send it to mock:aggregated .aggregate(header("id"), new BodyInAggregatingStrategy()).completionPredicate(body().contains("A+B+C")) .to("mock:aggregated"); And the same example using Spring XML: <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy"> <correlationExpression> <simple>header.id</simple> </correlationExpression> <completionPredicate> <simple>${body} contains 'A+B+C'</simple> </completionPredicate> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> And the other situation where we use the eagerCheckCompletion option to tell Camel to use the incoming Exchange. Notice how we can just test in the completion predicate that the incoming message is the END message: from("direct:start") // aggregate all exchanges correlated by the id header. // Aggregate them using the BodyInAggregatingStrategy strategy // do eager checking which means the completion predicate will use the incoming exchange // which allows us to trigger completion when a certain exchange arrived which is the // END message .aggregate(header("id"), new BodyInAggregatingStrategy()) .eagerCheckCompletion().completionPredicate(body().isEqualTo("END")) .to("mock:aggregated"); And the same example using Spring XML: <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" eagerCheckCompletion="true"> <correlationExpression> <simple>header.id</simple> </correlationExpression> <completionPredicate> <simple>${body} == 'END'</simple> </completionPredicate> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> Using dynamic completionTimeoutIn this example we want to aggregate all incoming messages and after a period of inactivity we want the aggregation to complete. The period should be computed at runtime based on the timeout header in the incoming messages. This is done using the completionTimeout option as shown: from("direct:start") // aggregate all exchanges correlated by the id header. // Aggregate them using the BodyInAggregatingStrategy strategy which // and the timeout header contains the timeout in millis of inactivity them timeout and complete the aggregation // and send it to mock:aggregated .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(header("timeout")) .to("mock:aggregated"); And the same example using Spring XML: <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy"> <correlationExpression> <simple>header.id</simple> </correlationExpression> <completionTimeout> <header>timeout</header> </completionTimeout> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> Note: You can also add a fixed timeout value and Camel will fallback to use this value if the dynamic value was null or 0. Using dynamic completionSizeIn this example we want to aggregate all incoming messages based on a dynamic size per correlation key. The size is computed at runtime based on the mySize header in the incoming messages. This is done using the completionSize option as shown: from("direct:start") // aggregate all exchanges correlated by the id header. // Aggregate them using the BodyInAggregatingStrategy strategy which // and the header mySize determines the number of aggregated messages should trigger the completion // and send it to mock:aggregated .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(header("mySize")) .to("mock:aggregated"); And the same example using Spring XML: <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy"> <correlationExpression> <simple>header.id</simple> </correlationExpression> <completionSize> <header>mySize</header> </completionSize> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> Note: You can also add a fixed size value and Camel will fallback to use this value if the dynamic value was null or 0. Using This PatternIf you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. Manually Force the Completion of All Aggregated Exchanges ImmediatelyAvailable as of Camel 2.9 See also
Change Notification Preferences
View Online
|
View Changes
|
Add Comment
|
- [CONF] Apache Camel > Aggregator2 confluence
- [CONF] Apache Camel > Aggregator2 confluence
- [CONF] Apache Camel > Aggregator2 confluence
- [CONF] Apache Camel > Aggregator2 confluence
- [CONF] Apache Camel > Aggregator2 confluence