Updated Branches: refs/heads/camel-2.12.x 78de11e39 -> ed6e772b0 refs/heads/master 745a85abb -> 71b216526
CAMEL-6744: Aggregator - Using groupExchanges should store them on body by default Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f9760f7b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f9760f7b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f9760f7b Branch: refs/heads/master Commit: f9760f7bf518cecfc74c0aa17a3e66ecdaf366a5 Parents: 745a85a Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Sep 13 13:21:31 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Sep 13 13:21:31 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/model/AggregateDefinition.java | 3 +- .../AbstractListAggregationStrategy.java | 15 +++- .../GroupedExchangeAggregationStrategy.java | 29 +++++-- ...gregateGroupedExchangeBackwardsCompTest.java | 79 ++++++++++++++++++++ .../AggregateGroupedExchangeBatchSizeTest.java | 4 +- ...eGroupedExchangeMultipleCorrelationTest.java | 4 +- ...gregateGroupedExchangeSizePredicateTest.java | 4 +- .../AggregateGroupedExchangeSizeTest.java | 4 +- .../AggregateGroupedExchangeTest.java | 2 +- 9 files changed, 126 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 0af2206..061ba21 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -775,8 +775,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition /** * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single - * combined Exchange holding all the aggregated exchanges in a {@link java.util.List} as a exchange - * property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}. + * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}. * * @return the builder */ http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java index a19bdbc..ae382dc 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultExchange; /** * Aggregate all exchanges into a {@link List} of values defined by the {@link #getValue(Exchange)} call. @@ -101,10 +100,22 @@ public abstract class AbstractListAggregationStrategy<V> implements CompletionAw private List<V> getList(Exchange exchange) { List<V> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); if (list == null) { - list = new ArrayList<V>(); + list = new GroupedExchangeList(); exchange.setProperty(Exchange.GROUPED_EXCHANGE, list); } return list; } + /** + * A list to contains grouped {@link Exchange}s. + */ + private static final class GroupedExchangeList extends ArrayList { + + @Override + public String toString() { + // lets override toString so we don't write data for all the Exchanges by default + return "List<Exchange>(" + size() + " elements)"; + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java index 8a748b5..587746c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java @@ -16,21 +16,40 @@ */ package org.apache.camel.processor.aggregate; +import java.util.List; + import org.apache.camel.Exchange; /** * Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges - * in a {@link java.util.List} as a exchange property with the key - * {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}. + * in a {@link java.util.List<Exchange>} as the message body. * * @version */ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationStrategy<Exchange> { @Override - public boolean isStoreAsBodyOnCompletion() { - // keep the list as a property to be compatible with old behavior - return false; + @SuppressWarnings("unchecked") + public void onCompletion(Exchange exchange) { + if (isStoreAsBodyOnCompletion()) { + // lets be backwards compatible + // TODO: Remove this method in Camel 3.0 + List list = (List) exchange.getProperty(Exchange.GROUPED_EXCHANGE); + if (list != null) { + exchange.getIn().setBody(list); + } + } + } + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + Exchange answer = super.aggregate(oldExchange, newExchange); + if (oldExchange == null) { + // for the first time we must do a copy as the answer, so the outgoing + // exchange is not one of the grouped exchanges, as that causes a endless circular reference + answer = answer.copy(); + } + return answer; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java new file mode 100644 index 0000000..c27d55f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test for aggregate grouped exchanges. + */ +public class AggregateGroupedExchangeBackwardsCompTest extends ContextTestSupport { + + @SuppressWarnings("unchecked") + public void testGrouped() throws Exception { + // START SNIPPET: e2 + MockEndpoint result = getMockEndpoint("mock:result"); + + // we expect 1 messages since we group all we get in using the same correlation key + result.expectedMessageCount(1); + + // then we sent all the message at once + template.sendBody("direct:start", "100"); + template.sendBody("direct:start", "150"); + template.sendBody("direct:start", "130"); + template.sendBody("direct:start", "200"); + template.sendBody("direct:start", "190"); + + assertMockEndpointsSatisfied(); + + Exchange out = result.getExchanges().get(0); + List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + + assertEquals(5, grouped.size()); + + assertEquals("100", grouped.get(0).getIn().getBody(String.class)); + assertEquals("150", grouped.get(1).getIn().getBody(String.class)); + assertEquals("130", grouped.get(2).getIn().getBody(String.class)); + assertEquals("200", grouped.get(3).getIn().getBody(String.class)); + assertEquals("190", grouped.get(4).getIn().getBody(String.class)); + // END SNIPPET: e2 + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // START SNIPPET: e1 + // our route is aggregating from the direct queue and sending the response to the mock + from("direct:start") + // aggregate all using same expression + .aggregate(constant(true)) + // wait for 0.5 seconds to aggregate + .completionTimeout(500L) + // group the exchanges so we get one single exchange containing all the others + .groupExchanges() + .to("mock:result"); + // END SNIPPET: e1 + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java index 31e0912..f958547 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java @@ -44,7 +44,7 @@ public class AggregateGroupedExchangeBatchSizeTest extends ContextTestSupport { assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); - List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + List<Exchange> grouped = out.getIn().getBody(List.class); assertTrue("Should be either 2 or 4, was " + grouped.size(), grouped.size() == 2 || grouped.size() == 4); @@ -57,7 +57,7 @@ public class AggregateGroupedExchangeBatchSizeTest extends ContextTestSupport { if (result.getReceivedCounter() == 2) { out = result.getExchanges().get(1); - grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + grouped = out.getIn().getBody(List.class); assertEquals(2, grouped.size()); http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java index 2f0b329..b1a10b7 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java @@ -50,7 +50,7 @@ public class AggregateGroupedExchangeMultipleCorrelationTest extends ContextTest assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); - List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + List<Exchange> grouped = out.getIn().getBody(List.class); assertEquals(3, grouped.size()); @@ -60,7 +60,7 @@ public class AggregateGroupedExchangeMultipleCorrelationTest extends ContextTest assertEquals("180", grouped.get(2).getIn().getBody(String.class)); out = result.getExchanges().get(1); - grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + grouped = out.getIn().getBody(List.class); assertEquals(3, grouped.size()); http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java index 701cdb9..9e386fb 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java @@ -44,14 +44,14 @@ public class AggregateGroupedExchangeSizePredicateTest extends ContextTestSuppor assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); - List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + List<Exchange> grouped = out.getIn().getBody(List.class); assertEquals(3, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); assertEquals("130", grouped.get(2).getIn().getBody(String.class)); out = result.getExchanges().get(1); - grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + grouped = out.getIn().getBody(List.class); assertEquals(2, grouped.size()); assertEquals("200", grouped.get(0).getIn().getBody(String.class)); http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java index 346dcbc..ca8911a 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java @@ -45,14 +45,14 @@ public class AggregateGroupedExchangeSizeTest extends ContextTestSupport { assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); - List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + List<Exchange> grouped = out.getIn().getBody(List.class); assertEquals(3, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); assertEquals("130", grouped.get(2).getIn().getBody(String.class)); out = result.getExchanges().get(1); - grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + grouped = out.getIn().getBody(List.class); assertEquals(3, grouped.size()); assertEquals("200", grouped.get(0).getIn().getBody(String.class)); http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java index 18be5d3..4628462 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java @@ -46,7 +46,7 @@ public class AggregateGroupedExchangeTest extends ContextTestSupport { assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); - List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + List<Exchange> grouped = out.getIn().getBody(List.class); assertEquals(5, grouped.size());