Author: boday
Date: Sun Mar  3 05:26:31 2013
New Revision: 1452002

URL: http://svn.apache.org/r1452002
Log:
CAMEL-6119 added Aggregator support for flushing all groups after processing 
the current message based on a header value

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
    
camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
    
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1452002&r1=1452001&r2=1452002&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Mar 
 3 05:26:31 2013
@@ -79,6 +79,7 @@ public interface Exchange {
     String AGGREGATED_CORRELATION_KEY       = "CamelAggregatedCorrelationKey";
     String AGGREGATION_STRATEGY             = "CamelAggregationStrategy";
     String AGGREGATION_COMPLETE_ALL_GROUPS  = 
"CamelAggregationCompleteAllGroups";
+    String AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE  = 
"CamelAggregationCompleteAllGroupsInclusive";
     String ASYNC_WAIT                       = "CamelAsyncWait";
 
     String BATCH_INDEX                = "CamelBatchIndex";

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1452002&r1=1452001&r2=1452002&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Sun Mar  3 05:26:31 2013
@@ -224,6 +224,12 @@ public class AggregateProcessor extends 
                 lock.unlock();
             }
         }
+
+        // check for the special header to force completion of all groups 
(inclusive of the message)
+        boolean completeAllGroupsInclusive = 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, 
false, boolean.class);
+        if (completeAllGroupsInclusive) {
+            forceCompletionOfAllGroups();
+        }
     }
 
     /**

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
 Sun Mar  3 05:26:31 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
@@ -28,7 +31,7 @@ import org.junit.Test;
 public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
 
     @Test
-    public void testForceCompletionTrue() throws Exception {
+    public void testForceCompletionTrueExclusive() throws Exception {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(0);
 
@@ -40,7 +43,7 @@ public class AggregateForceCompletionHea
         assertMockEndpointsSatisfied();
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
-        
getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", 
"test2test4");
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
@@ -49,6 +52,31 @@ public class AggregateForceCompletionHea
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testForceCompletionTrueInclusive() throws Exception {
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+
+        //now send a message to trigger completion of all groups, message 
should be aggregated
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", "3");
+        headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true);
+        template.sendBodyAndHeaders("direct:start", "test5", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
 Sun Mar  3 05:26:31 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.hawtdb;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -34,7 +37,7 @@ public class HawtDBAggregateForceComplet
     }
 
     @Test
-    public void testForceCompletionTrue() throws Exception {
+    public void testForceCompletionTrueExclusive() throws Exception {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(0);
 
@@ -46,7 +49,7 @@ public class HawtDBAggregateForceComplet
         assertMockEndpointsSatisfied();
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
-        
getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", 
"test2test4");
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
@@ -55,6 +58,31 @@ public class HawtDBAggregateForceComplet
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testForceCompletionTrueInclusive() throws Exception {
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+
+        //now send a message to trigger completion of all groups, message 
should be aggregated
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", "3");
+        headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true);
+        template.sendBodyAndHeaders("direct:start", "test5", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

Modified: 
camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff
==============================================================================
--- 
camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
 (original)
+++ 
camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
 Sun Mar  3 05:26:31 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.leveldb;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -34,7 +37,7 @@ public class LevelDBAggregateForceComple
     }
 
     @Test
-    public void testForceCompletionTrue() throws Exception {
+    public void testForceCompletionTrueExclusive() throws Exception {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(0);
 
@@ -46,7 +49,7 @@ public class LevelDBAggregateForceComple
         assertMockEndpointsSatisfied();
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
-        
getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", 
"test2test4");
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
@@ -55,6 +58,31 @@ public class LevelDBAggregateForceComple
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testForceCompletionTrueInclusive() throws Exception {
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+
+        //now send a message to trigger completion of all groups, message 
should be aggregated
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", "3");
+        headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true);
+        template.sendBodyAndHeaders("direct:start", "test5", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

Modified: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
 Sun Mar  3 05:26:31 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
@@ -26,7 +29,7 @@ import org.junit.Test;
 public class JdbcAggregateForceCompletionHeaderTest extends 
AbstractJdbcAggregationTestSupport {
 
     @Test
-    public void testForceCompletionTrue() throws Exception {
+    public void testForceCompletionTrueExclusive() throws Exception {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(0);
 
@@ -38,7 +41,7 @@ public class JdbcAggregateForceCompletio
         assertMockEndpointsSatisfied();
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
-        
getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", 
"test2test4");
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
@@ -47,6 +50,31 @@ public class JdbcAggregateForceCompletio
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testForceCompletionTrueInclusive() throws Exception {
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+
+        //now send a message to trigger completion of all groups, message 
should be aggregated
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", "3");
+        headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true);
+        template.sendBodyAndHeaders("direct:start", "test5", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {


Reply via email to