Author: davsclaus Date: Fri Dec 18 13:12:21 2009 New Revision: 892244 URL: http://svn.apache.org/viewvc?rev=892244&view=rev Log: CAMEL-2303: Fixed groupedExchang now working properly on Aggregator.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java - copied, changed from r892224, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java - copied, changed from r892224, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=892244&r1=892243&r2=892244&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Fri Dec 18 13:12:21 2009 @@ -35,6 +35,7 @@ import org.apache.camel.processor.Aggregator; import org.apache.camel.processor.aggregate.AggregationCollection; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.RouteContext; @@ -190,9 +191,15 @@ if (strategy == null && strategyRef != null) { strategy = routeContext.lookup(strategyRef, AggregationStrategy.class); } + // pick a default strategy if (strategy == null) { - // fallback to use latest - strategy = new UseLatestAggregationStrategy(); + if (groupExchanges != null && groupExchanges) { + // if grouped exchange is enabled then use special strategy for that + strategy = new GroupedExchangeAggregationStrategy(); + } else { + // fallback to use latest + strategy = new UseLatestAggregationStrategy(); + } } return strategy; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=892244&r1=892243&r2=892244&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Fri Dec 18 13:12:21 2009 @@ -30,7 +30,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; -import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.spi.ExceptionHandler; @@ -350,32 +349,11 @@ @SuppressWarnings("unchecked") private void sendExchanges() throws Exception { - Exchange grouped = null; - Iterator<Exchange> iter = collection.iterator(); while (iter.hasNext()) { Exchange exchange = iter.next(); iter.remove(); - if (!groupExchanges) { - // non grouped so process the exchange one at a time - processExchange(exchange); - } else { - // grouped so add all exchanges into one group - if (grouped == null) { - grouped = new DefaultExchange(exchange); - } - List<Exchange> list = grouped.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - if (list == null) { - list = new ArrayList<Exchange>(); - grouped.setProperty(Exchange.GROUPED_EXCHANGE, list); - } - list.add(exchange); - } - } - - // and after adding process the single grouped exchange - if (grouped != null) { - processExchange(grouped); + processExchange(exchange); } } } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java (from r892224, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=892224&r2=892244&rev=892244&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java Fri Dec 18 13:12:21 2009 @@ -26,10 +26,9 @@ /** * Unit test for aggregate grouped exchanges. */ -public class AggregateGroupedExchangeTest extends ContextTestSupport { +public class AggregateGroupedExchangeBatchSizeTest extends ContextTestSupport { public void testGrouped() throws Exception { - // START SNIPPET: e2 MockEndpoint result = getMockEndpoint("mock:result"); // we expect 1 messages since we group all we get in using the same correlation key @@ -40,20 +39,27 @@ template.sendBody("direct:start", "150"); template.sendBody("direct:start", "130"); template.sendBody("direct:start", "200"); - template.sendBody("direct:start", "190"); assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - assertEquals(5, grouped.size()); + assertEquals(2, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); - assertEquals("130", grouped.get(2).getIn().getBody(String.class)); - assertEquals("200", grouped.get(3).getIn().getBody(String.class)); - assertEquals("190", grouped.get(4).getIn().getBody(String.class)); + + // wait a bit for the remainder to come in + Thread.sleep(1000); + + out = result.getExchanges().get(1); + grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + + assertEquals(2, grouped.size()); + + assertEquals("130", grouped.get(0).getIn().getBody(String.class)); + assertEquals("200", grouped.get(1).getIn().getBody(String.class)); // END SNIPPET: e2 } @@ -64,8 +70,8 @@ // START SNIPPET: e1 // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") - // aggregated using id as correlation so each is unqiue and thus we batch everything - .aggregate().simple("id") + // aggregated all use same expression + .aggregate().constant(true).batchSize(2) // wait for 0.5 seconds to aggregate .batchTimeout(500L) // group the exchanges so we get one single exchange containing all the others @@ -75,4 +81,4 @@ } }; } -} +} \ No newline at end of file Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java (from r892224, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=892224&r2=892244&rev=892244&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java Fri Dec 18 13:12:21 2009 @@ -26,34 +26,42 @@ /** * Unit test for aggregate grouped exchanges. */ -public class AggregateGroupedExchangeTest extends ContextTestSupport { +public class AggregateGroupedExchangeMultipleCorrelationTest extends ContextTestSupport { public void testGrouped() throws Exception { // START SNIPPET: e2 MockEndpoint result = getMockEndpoint("mock:result"); - // we expect 1 messages since we group all we get in using the same correlation key - result.expectedMessageCount(1); + // we expect 2 messages since we group using A or B keys + result.expectedMessageCount(2); // then we sent all the message at once - template.sendBody("direct:start", "100"); - template.sendBody("direct:start", "150"); - template.sendBody("direct:start", "130"); - template.sendBody("direct:start", "200"); - template.sendBody("direct:start", "190"); + template.sendBodyAndHeader("direct:start", "100", "foo", "A"); + template.sendBodyAndHeader("direct:start", "130", "foo", "B"); + template.sendBodyAndHeader("direct:start", "150", "foo", "A"); + template.sendBodyAndHeader("direct:start", "200", "foo", "B"); + template.sendBodyAndHeader("direct:start", "180", "foo", "B"); + template.sendBodyAndHeader("direct:start", "120", "foo", "A"); assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - assertEquals(5, grouped.size()); + assertEquals(3, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); - assertEquals("130", grouped.get(2).getIn().getBody(String.class)); - assertEquals("200", grouped.get(3).getIn().getBody(String.class)); - assertEquals("190", grouped.get(4).getIn().getBody(String.class)); + assertEquals("120", grouped.get(2).getIn().getBody(String.class)); + + out = result.getExchanges().get(1); + grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + + assertEquals(3, grouped.size()); + + assertEquals("130", grouped.get(0).getIn().getBody(String.class)); + assertEquals("200", grouped.get(1).getIn().getBody(String.class)); + assertEquals("180", grouped.get(2).getIn().getBody(String.class)); // END SNIPPET: e2 } @@ -64,10 +72,10 @@ // START SNIPPET: e1 // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") - // aggregated using id as correlation so each is unqiue and thus we batch everything - .aggregate().simple("id") - // wait for 0.5 seconds to aggregate - .batchTimeout(500L) + // aggregate all using the foo header + .aggregate().header("foo") + // wait for 1 seconds to aggregate + .batchTimeout(1000L) // group the exchanges so we get one single exchange containing all the others .groupExchanges() .to("mock:result"); @@ -75,4 +83,4 @@ } }; } -} +} \ No newline at end of file Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=892244&r1=892243&r2=892244&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Fri Dec 18 13:12:21 2009 @@ -64,8 +64,8 @@ // START SNIPPET: e1 // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") - // aggregated using id as correlation so each is unqiue and thus we batch everything - .aggregate().simple("id") + // aggregate all using same expression + .aggregate().constant(true) // wait for 0.5 seconds to aggregate .batchTimeout(500L) // group the exchanges so we get one single exchange containing all the others