AggregatorPage edited by Claus IbsenAggregatorThis applies for Camel version 2.2 or older. If you use a newer version then the Aggregator has been rewritten from Camel 2.3 onwards and you should use this Aggregator2 link instead. The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message. A correlation _expression_ is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant _expression_. An AggregationStrategy is used to combine all the message exchanges for a single correlation key into a single message exchange. The default strategy just chooses the latest message; so its ideal for throttling messages. For example, imagine a stock market data system; you are receiving 30,000 messages per second; you may want to throttle down the updates as, say, a GUI cannot cope with such massive update rates. So you may want to aggregate these messages together so that within a window (defined by a maximum number of messages or a timeout), messages for the same stock are aggregated together; by just choosing the latest message and discarding the older prices. (You could apply a delta processing algorithm if you prefer to capture some of the history).
Using the Fluent Builders The following example shows how to aggregate messages so that only the latest message for a specific value of the cheese header are sent. // in this route we aggregate all from direct:state based on the header id cheese from("direct:start").aggregate(header("cheese")).to("mock:result"); from("seda:header").setHeader("visited", constant(true)).aggregate(header("cheese")).to("mock:result"); // in this sample we aggregate using our own strategy with a completion predicate // stating that the aggregated header is equal to 5. from("direct:predicate").aggregate(header("cheese"), new MyAggregationStrategy()). completionPredicate(header("aggregated").isEqualTo(5)).to("mock:result"); // this sample is similar to the one above but it also illustrates the use of outBatchSize // to send exchanges to mock:endpoint in batches of 10. from("direct:outBatchPredicate").aggregate(header("cheese"), new MyAggregationStrategy()). completionPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result"); If you were using JMS then you may wish to use the JMSDestination header as the correlation key; or some custom header for the stock symbol (using the above stock market example). from("activemq:someReallyFastTopic").aggregator(header("JMSDestination")).to("activemq:someSlowTopicForGuis"); You can of course use many different _expression_ languages such as XPath, XQuery, SQL or various Scripting Languages. //aggregate based on the message content using an XPath _expression_ //example assumes an XML document starting with <stockQuote symbol='...'> //aggregate messages based on their symbol attribute within the <stockQuote> element from("seda:start").aggregate().xpath("/stockQuote/@symbol", String.class).batchSize(5).to("mock:result"); //this example will aggregate all messages starting with <stockQuote symbol='APACHE'> into //one exchange and all the other messages (different symbol or different root element) into another exchange. from("seda:start").aggregate().xpath("name(/stockquo...@symbol='APACHE'])", String.class).batchSize(5).to("mock:result"); For further examples of this pattern in use you could look at the junit test case Using the Spring XML Extensions
The following example shows how to create a simple aggregator using the XML notation; using an _expression_ for the correlation value used to aggregate messages together. <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate> <correlationExpression> <simple>header.cheese</simple> </correlationExpression> <to uri="mock:result"/> </aggregate> </route> <route> <from uri="seda:header"/> <process ref="setHeaderProcessor"/> <to uri="direct:temp"/> </route> <route> <from uri="direct:temp"/> <aggregate> <correlationExpression> <simple>header.cheese</simple> </correlationExpression> <to uri="mock:result"/> </aggregate> </route> <route> <from uri="direct:predicate"/> <aggregate strategyRef="myAggregatorStrategy"> <correlationExpression> <simple>header.cheese</simple> </correlationExpression> <to uri="mock:result"/> <completionPredicate> <method bean="myAggregatorStrategy" method="isCompleted"/> </completionPredicate> </aggregate> </route> <route> <from uri="direct:outBatchPredicate"/> <aggregate strategyRef="myAggregatorStrategy" outBatchSize="10"> <correlationExpression> <simple>header.cheese</simple> </correlationExpression> <to uri="mock:result"/> <completionPredicate> <method bean="myAggregatorStrategy" method="isCompleted"/> </completionPredicate> </aggregate> </route> <!-- This route turns off in batching by setting batchSize to 1 to run unit test for out batching. Normal use cases may not want to disable in batching --> <route> <from uri="direct:outBatchNoInBatching"/> <aggregate strategyRef="myAggregatorStrategy" batchSize="1" outBatchSize="10"> <correlationExpression> <simple>header.cheese</simple> </correlationExpression> <to uri="mock:result"/> <completionPredicate> <method bean="myAggregatorStrategy" method="isCompleted"/> </completionPredicate> </aggregate> </route> </camelContext> You can specify your own AggregationStrategy if you prefer as shown in the following example <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy"> <correlationExpression> <simple>header.cheese</simple> </correlationExpression> <to uri="mock:result"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.MyAggregator"/> Notice how the strategyRef attribute is used on the <aggregator> element to refer to the custom strategy in Spring. Exchange PropertiesThe following properties is set on each Exchange that are aggregated:
Batch optionsThe aggregator supports the following batch options:
AggregationCollection and AggregationStrategyThis aggregator uses a AggregationCollection to store the exchanges that is currently aggregated. The AggregationCollection uses a correlation _expression_ and an AggregationStrategy.
Camel provides these implementations:
ExamplesDefault exampleBy default Camel uses DefaultAggregationCollection and UseLatestAggregationStrategy, so this simple example will just keep the latest received exchange for the given correlation _expression_: // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") // aggregated by header id // as we have not configured more on the aggregator it will default to aggregate the // latest exchange only .aggregate().header("id") // wait for 0.5 seconds to aggregate .batchTimeout(500L) .to("mock:result"); Using PredicateAggregationCollectionThe PredicateAggregationCollection is an extension to DefaultAggregationCollection that uses a Predicate as well to determine the completion. For instance the Predicate can test for a special header value, a number of maximum aggregated so far etc. To use this the routing is a bit more complex as we need to create our AggregationCollection object as follows: // create the aggregation collection we will use. // - we will correlate the received message based on the id header // - as we will just keep the latest message we use the latest strategy // - and finally we stop aggregate if we receive 2 or more messages AggregationCollection ag = new PredicateAggregationCollection(header("id"), new UseLatestAggregationStrategy(), property(Exchange.AGGREGATED_SIZE).isEqualTo(3)); // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") // we use the collection based aggregator we already have configured .aggregate(ag) // wait for 0.5 seconds to aggregate .batchTimeout(500L) .to("mock:result"); In this sample we use the predicate that we want at most 3 exchanges aggregated by the same correlation id, this is defined as: header(Exchange.AGGREGATED_COUNT).isEqualTo(3) Using this the aggregator will complete if we receive 3 exchanges with the same correlation id or when the specified timeout of 500 msecs has elapsed (whichever criteria is met first). Using custom aggregation strategyIn this example we will aggregate incoming bids and want to aggregate the highest bid. So we provide our own strategy where we implement the code logic: private static class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { // the first time we only have the new exchange so it wins the first round return newExchange; } int oldPrice = oldExchange.getIn().getBody(Integer.class); int newPrice = newExchange.getIn().getBody(Integer.class); // return the "winner" that has the highest price return newPrice > oldPrice ? newExchange : oldExchange; } } Then we setup the routing as follows: // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") // aggregated by header id and use our own strategy how to aggregate .aggregate(new MyAggregationStrategy()).header("id") // wait for 0.5 seconds to aggregate .batchTimeout(500L) .to("mock:result"); And since this is based on an unit test we show the test code that send the bids and what is expected as the winners: MockEndpoint result = getMockEndpoint("mock:result"); // we expect to find the two winners with the highest bid result.expectedMessageCount(2); result.expectedBodiesReceived("200", "150"); // then we sent all the message at once template.sendBodyAndHeader("direct:start", "100", "id", "1"); template.sendBodyAndHeader("direct:start", "150", "id", "2"); template.sendBodyAndHeader("direct:start", "130", "id", "2"); template.sendBodyAndHeader("direct:start", "200", "id", "1"); template.sendBodyAndHeader("direct:start", "190", "id", "1"); assertMockEndpointsSatisfied(); Using custom aggregation collectionIn this example we will aggregate incoming bids and want to aggregate the bids in reverse order (this is just an example). So we provide our own collection where we implement the code logic: class MyReverseAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection { private List<Exchange> collection = new ArrayList<Exchange>(); private _expression_ correlation; private AggregationStrategy strategy; public _expression_ getCorrelationExpression() { return correlation; } public void setCorrelationExpression(_expression_ correlationExpression) { this.correlation = correlationExpression; } public AggregationStrategy getAggregationStrategy() { return strategy; } public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.strategy = aggregationStrategy; } public boolean add(Exchange exchange) { return collection.add(exchange); } public Iterator<Exchange> iterator() { // demonstrate the we can do something with this collection, so we reverse it Collections.reverse(collection); return collection.iterator(); } public int size() { return collection.size(); } public void clear() { collection.clear(); } public void onAggregation(Object correlationKey, Exchange newExchange) { add(newExchange); } } Then we setup the routing as follows: // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") // use our own collection for aggregation .aggregate(new MyReverseAggregationCollection()) // wait for 0.5 seconds to aggregate .batchTimeout(500L) .to("mock:result"); And since this is based on an unit test we show the test code that send the bids and what is expected as the expected reverse order: MockEndpoint result = getMockEndpoint("mock:result"); // we expect 5 messages since our custom aggregation collection just gets it all // but returns them in reverse order result.expectedMessageCount(5); result.expectedBodiesReceived("190", "200", "130", "150", "100"); // then we sent all the message at once template.sendBodyAndHeader("direct:start", "100", "id", "1"); template.sendBodyAndHeader("direct:start", "150", "id", "2"); template.sendBodyAndHeader("direct:start", "130", "id", "2"); template.sendBodyAndHeader("direct:start", "200", "id", "1"); template.sendBodyAndHeader("direct:start", "190", "id", "1"); assertMockEndpointsSatisfied(); Custom aggregation collection in Spring DSLYou can also specify a custom aggregation collection in the Spring DSL. Here is an example for Camel 2.0 <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate batchTimeout="500" collectionRef="aggregatorCollection"> <to uri="mock:result"/> </aggregate> </route> </camelContext> <bean id="aggregatorCollection" class="org.apache.camel.processor.aggregator.MyReverseAggregationCollection"/> In Camel 1.5.1 you will need to specify the aggregator as <aggregator batchTimeout="500" collectionRef="aggregatorCollection"> <_expression_/> <to uri="mock:result"/> </aggregator> Using Grouped ExchangesAvailable as of Camel 2.0 You can enable grouped exchanges to combine all aggregated exchanges into a single org.apache.camel.impl.GroupedExchange holder class that contains all the individual aggregated exchanges. This allows you to process a single Exchange containing all the aggregated exchange. Lets start with how to configure this in the router: // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") // aggregate all using same _expression_ .aggregate().constant(true) // wait for 0.5 seconds to aggregate .batchTimeout(500L) // group the exchanges so we get one single exchange containing all the others .groupExchanges() .to("mock:result"); And the next part is part of an unit code that demonstrates this feature as we send in 5 exchanges each with a different value in the body. MockEndpoint result = getMockEndpoint("mock:result"); // we expect 1 messages since we group all we get in using the same correlation key result.expectedMessageCount(1); // then we sent all the message at once template.sendBody("direct:start", "100"); template.sendBody("direct:start", "150"); template.sendBody("direct:start", "130"); template.sendBody("direct:start", "200"); template.sendBody("direct:start", "190"); assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); assertEquals(5, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); assertEquals("130", grouped.get(2).getIn().getBody(String.class)); assertEquals("200", grouped.get(3).getIn().getBody(String.class)); assertEquals("190", grouped.get(4).getIn().getBody(String.class)); Using Batch ConsumerAvailable as of Camel 2.0 The Aggregator can work together with the Batch Consumer to aggregate the total number of messages that the Batch Consumer have reported. This allows you for instance to aggregate all files polled using the File consumer. For example: from("file://inbox") .aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy()).batchConsumer().batchTimeout(60000).to("bean:processOrder"); When using batchConsumer Camel will automatic adjust the batchSize according to reported by the Batch Consumer in this case the file consumer. Using This PatternIf you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. See also
Change Notification Preferences
View Online
|
View Change
|
Add Comment
|
- [CONF] Apache Camel > Aggregator confluence
- [CONF] Apache Camel > Aggregator confluence
- [CONF] Apache Camel > Aggregator confluence