Updated Branches:
  refs/heads/camel-2.12.x 78de11e39 -> ed6e772b0
  refs/heads/master 745a85abb -> 71b216526


CAMEL-6744: Aggregator - Using groupExchanges should store them on body by 
default


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f9760f7b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f9760f7b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f9760f7b

Branch: refs/heads/master
Commit: f9760f7bf518cecfc74c0aa17a3e66ecdaf366a5
Parents: 745a85a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Sep 13 13:21:31 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Sep 13 13:21:31 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  3 +-
 .../AbstractListAggregationStrategy.java        | 15 +++-
 .../GroupedExchangeAggregationStrategy.java     | 29 +++++--
 ...gregateGroupedExchangeBackwardsCompTest.java | 79 ++++++++++++++++++++
 .../AggregateGroupedExchangeBatchSizeTest.java  |  4 +-
 ...eGroupedExchangeMultipleCorrelationTest.java |  4 +-
 ...gregateGroupedExchangeSizePredicateTest.java |  4 +-
 .../AggregateGroupedExchangeSizeTest.java       |  4 +-
 .../AggregateGroupedExchangeTest.java           |  2 +-
 9 files changed, 126 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 0af2206..061ba21 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -775,8 +775,7 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
 
     /**
      * Enables grouped exchanges, so the aggregator will group all aggregated 
exchanges into a single
-     * combined Exchange holding all the aggregated exchanges in a {@link 
java.util.List} as a exchange
-     * property with the key {@link 
org.apache.camel.Exchange#GROUPED_EXCHANGE}.
+     * combined Exchange holding all the aggregated exchanges in a {@link 
java.util.List}.
      *
      * @return the builder
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
index a19bdbc..ae382dc 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchange;
 
 /**
  * Aggregate all exchanges into a {@link List} of values defined by the {@link 
#getValue(Exchange)} call.
@@ -101,10 +100,22 @@ public abstract class AbstractListAggregationStrategy<V> 
implements CompletionAw
     private List<V> getList(Exchange exchange) {
         List<V> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
         if (list == null) {
-            list = new ArrayList<V>();
+            list = new GroupedExchangeList();
             exchange.setProperty(Exchange.GROUPED_EXCHANGE, list);
         }
         return list;
     }
 
+    /**
+     * A list to contains grouped {@link Exchange}s.
+     */
+    private static final class GroupedExchangeList extends ArrayList {
+
+        @Override
+        public String toString() {
+            // lets override toString so we don't write data for all the 
Exchanges by default
+            return "List<Exchange>(" + size() + " elements)";
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 8a748b5..587746c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -16,21 +16,40 @@
  */
 package org.apache.camel.processor.aggregate;
 
+import java.util.List;
+
 import org.apache.camel.Exchange;
 
 /**
  * Aggregate all exchanges into a single combined Exchange holding all the 
aggregated exchanges
- * in a {@link java.util.List} as a exchange property with the key
- * {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}.
+ * in a {@link java.util.List<Exchange>} as the message body.
  *
  * @version 
  */
 public class GroupedExchangeAggregationStrategy extends 
AbstractListAggregationStrategy<Exchange> {
 
     @Override
-    public boolean isStoreAsBodyOnCompletion() {
-        // keep the list as a property to be compatible with old behavior
-        return false;
+    @SuppressWarnings("unchecked")
+    public void onCompletion(Exchange exchange) {
+        if (isStoreAsBodyOnCompletion()) {
+            // lets be backwards compatible
+            // TODO: Remove this method in Camel 3.0
+            List list = (List) exchange.getProperty(Exchange.GROUPED_EXCHANGE);
+            if (list != null) {
+                exchange.getIn().setBody(list);
+            }
+        }
+    }
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        Exchange answer = super.aggregate(oldExchange, newExchange);
+        if (oldExchange == null) {
+            // for the first time we must do a copy as the answer, so the 
outgoing
+            // exchange is not one of the grouped exchanges, as that causes a 
endless circular reference
+            answer = answer.copy();
+        }
+        return answer;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java
new file mode 100644
index 0000000..c27d55f
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBackwardsCompTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for aggregate grouped exchanges.
+ */
+public class AggregateGroupedExchangeBackwardsCompTest extends 
ContextTestSupport {
+
+    @SuppressWarnings("unchecked")
+    public void testGrouped() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 1 messages since we group all we get in using the same 
correlation key
+        result.expectedMessageCount(1);
+
+        // then we sent all the message at once
+        template.sendBody("direct:start", "100");
+        template.sendBody("direct:start", "150");
+        template.sendBody("direct:start", "130");
+        template.sendBody("direct:start", "200");
+        template.sendBody("direct:start", "190");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange out = result.getExchanges().get(0);
+        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+
+        assertEquals(5, grouped.size());
+
+        assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("130", grouped.get(2).getIn().getBody(String.class));
+        assertEquals("200", grouped.get(3).getIn().getBody(String.class));
+        assertEquals("190", grouped.get(4).getIn().getBody(String.class));
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending 
the response to the mock
+                from("direct:start")
+                    // aggregate all using same expression
+                    .aggregate(constant(true))
+                    // wait for 0.5 seconds to aggregate
+                    .completionTimeout(500L)
+                    // group the exchanges so we get one single exchange 
containing all the others
+                    .groupExchanges()
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
index 31e0912..f958547 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
@@ -44,7 +44,7 @@ public class AggregateGroupedExchangeBatchSizeTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
-        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+        List<Exchange> grouped = out.getIn().getBody(List.class);
 
         assertTrue("Should be either 2 or 4, was " + grouped.size(), 
grouped.size() == 2 || grouped.size() == 4);
 
@@ -57,7 +57,7 @@ public class AggregateGroupedExchangeBatchSizeTest extends 
ContextTestSupport {
         if (result.getReceivedCounter() == 2) {
 
             out = result.getExchanges().get(1);
-            grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+            grouped = out.getIn().getBody(List.class);
 
             assertEquals(2, grouped.size());
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
index 2f0b329..b1a10b7 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
@@ -50,7 +50,7 @@ public class AggregateGroupedExchangeMultipleCorrelationTest 
extends ContextTest
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
-        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+        List<Exchange> grouped = out.getIn().getBody(List.class);
 
         assertEquals(3, grouped.size());
 
@@ -60,7 +60,7 @@ public class AggregateGroupedExchangeMultipleCorrelationTest 
extends ContextTest
         assertEquals("180", grouped.get(2).getIn().getBody(String.class));
 
         out = result.getExchanges().get(1);
-        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+        grouped = out.getIn().getBody(List.class);
 
         assertEquals(3, grouped.size());
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
index 701cdb9..9e386fb 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
@@ -44,14 +44,14 @@ public class AggregateGroupedExchangeSizePredicateTest 
extends ContextTestSuppor
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
-        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+        List<Exchange> grouped = out.getIn().getBody(List.class);
         assertEquals(3, grouped.size());
         assertEquals("100", grouped.get(0).getIn().getBody(String.class));
         assertEquals("150", grouped.get(1).getIn().getBody(String.class));
         assertEquals("130", grouped.get(2).getIn().getBody(String.class));
 
         out = result.getExchanges().get(1);
-        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+        grouped = out.getIn().getBody(List.class);
         assertEquals(2, grouped.size());
 
         assertEquals("200", grouped.get(0).getIn().getBody(String.class));

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
index 346dcbc..ca8911a 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
@@ -45,14 +45,14 @@ public class AggregateGroupedExchangeSizeTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
-        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+        List<Exchange> grouped = out.getIn().getBody(List.class);
         assertEquals(3, grouped.size());
         assertEquals("100", grouped.get(0).getIn().getBody(String.class));
         assertEquals("150", grouped.get(1).getIn().getBody(String.class));
         assertEquals("130", grouped.get(2).getIn().getBody(String.class));
 
         out = result.getExchanges().get(1);
-        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+        grouped = out.getIn().getBody(List.class);
         assertEquals(3, grouped.size());
 
         assertEquals("200", grouped.get(0).getIn().getBody(String.class));

http://git-wip-us.apache.org/repos/asf/camel/blob/f9760f7b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
index 18be5d3..4628462 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
@@ -46,7 +46,7 @@ public class AggregateGroupedExchangeTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
-        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+        List<Exchange> grouped = out.getIn().getBody(List.class);
 
         assertEquals(5, grouped.size());
 

Reply via email to