Repository: camel
Updated Branches:
  refs/heads/master 1950e7785 -> f3c4fc2a2


CAMEL-9252 add GroupedMessageAggregationStrategy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37ef29c0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37ef29c0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37ef29c0

Branch: refs/heads/master
Commit: 37ef29c05b41743ab2bb04ac1f14404454332424
Parents: 1950e77
Author: khaing211 <khaing...@gmail.com>
Authored: Sat Oct 24 10:39:10 2015 -0700
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Dec 18 17:56:53 2015 +0100

----------------------------------------------------------------------
 .../GroupedMessageAggregationStrategy.java      | 36 ++++++++++++++
 .../aggregator/AggregateGroupMessageTest.java   | 51 ++++++++++++++++++++
 2 files changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/37ef29c0/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
new file mode 100644
index 0000000..b04dbf0
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
@@ -0,0 +1,36 @@
+package org.apache.camel.processor.aggregate;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultExchange;
+
+/**
+ * Aggregate all {@link Message} into a single combined Exchange holding all 
the
+ * aggregated messages in a {@link List} of {@link Message} as the message 
body.
+ * 
+ * This aggregation strategy can used in combination with @{link
+ * org.apache.camel.processor.Splitter} to batch messages
+ * 
+ * @version
+ */
+public class GroupedMessageAggregationStrategy extends 
AbstractListAggregationStrategy<Message> {
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            // for the first time we must create a new empty exchange as the
+            // holder, as the outgoing exchange
+            // must not be one of the grouped exchanges, as that causes a
+            // endless circular reference
+            oldExchange = new DefaultExchange(newExchange);
+        }
+        return super.aggregate(oldExchange, newExchange);
+    }
+
+    @Override
+    public Message getValue(Exchange exchange) {
+        return exchange.getIn();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/37ef29c0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
new file mode 100644
index 0000000..4d7b41c
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
@@ -0,0 +1,51 @@
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy;
+
+public class AggregateGroupMessageTest extends ContextTestSupport {
+
+    @SuppressWarnings("unchecked")
+    public void testGrouped() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "100");
+        template.sendBody("direct:start", "150");
+        template.sendBody("direct:start", "130");
+        template.sendBody("direct:start", "200");
+        template.sendBody("direct:start", "190");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange out = result.getExchanges().get(0);
+        List<Message> grouped = out.getIn().getBody(List.class);
+
+        assertEquals(5, grouped.size());
+
+        assertEquals("100", grouped.get(0).getBody(String.class));
+        assertEquals("150", grouped.get(1).getBody(String.class));
+        assertEquals("130", grouped.get(2).getBody(String.class));
+        assertEquals("200", grouped.get(3).getBody(String.class));
+        assertEquals("190", grouped.get(4).getBody(String.class));
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(constant(true), new 
GroupedMessageAggregationStrategy())
+                    .completionTimeout(500L)
+                    .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to