Author: boday Date: Sun Mar 3 05:26:31 2013 New Revision: 1452002 URL: http://svn.apache.org/r1452002 Log: CAMEL-6119 added Aggregator support for flushing all groups after processing the current message based on a header value
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1452002&r1=1452001&r2=1452002&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Mar 3 05:26:31 2013 @@ -79,6 +79,7 @@ public interface Exchange { String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey"; String AGGREGATION_STRATEGY = "CamelAggregationStrategy"; String AGGREGATION_COMPLETE_ALL_GROUPS = "CamelAggregationCompleteAllGroups"; + String AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE = "CamelAggregationCompleteAllGroupsInclusive"; String ASYNC_WAIT = "CamelAsyncWait"; String BATCH_INDEX = "CamelBatchIndex"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1452002&r1=1452001&r2=1452002&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Mar 3 05:26:31 2013 @@ -224,6 +224,12 @@ public class AggregateProcessor extends lock.unlock(); } } + + // check for the special header to force completion of all groups (inclusive of the message) + boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); + if (completeAllGroupsInclusive) { + forceCompletionOfAllGroups(); + } } /** Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java Sun Mar 3 05:26:31 2013 @@ -16,6 +16,9 @@ */ package org.apache.camel.processor.aggregator; +import java.util.HashMap; +import java.util.Map; + import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -28,7 +31,7 @@ import org.junit.Test; public class AggregateForceCompletionHeaderTest extends ContextTestSupport { @Test - public void testForceCompletionTrue() throws Exception { + public void testForceCompletionTrueExclusive() throws Exception { getMockEndpoint("mock:aggregated").expectedMessageCount(0); @@ -40,7 +43,7 @@ public class AggregateForceCompletionHea assertMockEndpointsSatisfied(); getMockEndpoint("mock:aggregated").expectedMessageCount(2); - getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated @@ -49,6 +52,31 @@ public class AggregateForceCompletionHea assertMockEndpointsSatisfied(); } + @Test + public void testForceCompletionTrueInclusive() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(3); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send a message to trigger completion of all groups, message should be aggregated + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("id", "3"); + headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true); + template.sendBodyAndHeaders("direct:start", "test5", headers); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java Sun Mar 3 05:26:31 2013 @@ -16,6 +16,9 @@ */ package org.apache.camel.component.hawtdb; +import java.util.HashMap; +import java.util.Map; + import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -34,7 +37,7 @@ public class HawtDBAggregateForceComplet } @Test - public void testForceCompletionTrue() throws Exception { + public void testForceCompletionTrueExclusive() throws Exception { getMockEndpoint("mock:aggregated").expectedMessageCount(0); @@ -46,7 +49,7 @@ public class HawtDBAggregateForceComplet assertMockEndpointsSatisfied(); getMockEndpoint("mock:aggregated").expectedMessageCount(2); - getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated @@ -55,6 +58,31 @@ public class HawtDBAggregateForceComplet assertMockEndpointsSatisfied(); } + @Test + public void testForceCompletionTrueInclusive() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(3); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send a message to trigger completion of all groups, message should be aggregated + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("id", "3"); + headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true); + template.sendBodyAndHeaders("direct:start", "test5", headers); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { Modified: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff ============================================================================== --- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java (original) +++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java Sun Mar 3 05:26:31 2013 @@ -16,6 +16,9 @@ */ package org.apache.camel.component.leveldb; +import java.util.HashMap; +import java.util.Map; + import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -34,7 +37,7 @@ public class LevelDBAggregateForceComple } @Test - public void testForceCompletionTrue() throws Exception { + public void testForceCompletionTrueExclusive() throws Exception { getMockEndpoint("mock:aggregated").expectedMessageCount(0); @@ -46,7 +49,7 @@ public class LevelDBAggregateForceComple assertMockEndpointsSatisfied(); getMockEndpoint("mock:aggregated").expectedMessageCount(2); - getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated @@ -55,6 +58,31 @@ public class LevelDBAggregateForceComple assertMockEndpointsSatisfied(); } + @Test + public void testForceCompletionTrueInclusive() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(3); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send a message to trigger completion of all groups, message should be aggregated + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("id", "3"); + headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true); + template.sendBodyAndHeaders("direct:start", "test5", headers); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java?rev=1452002&r1=1452001&r2=1452002&view=diff ============================================================================== --- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java (original) +++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java Sun Mar 3 05:26:31 2013 @@ -16,6 +16,9 @@ */ package org.apache.camel.processor.aggregate.jdbc; +import java.util.HashMap; +import java.util.Map; + import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.junit.Test; @@ -26,7 +29,7 @@ import org.junit.Test; public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregationTestSupport { @Test - public void testForceCompletionTrue() throws Exception { + public void testForceCompletionTrueExclusive() throws Exception { getMockEndpoint("mock:aggregated").expectedMessageCount(0); @@ -38,7 +41,7 @@ public class JdbcAggregateForceCompletio assertMockEndpointsSatisfied(); getMockEndpoint("mock:aggregated").expectedMessageCount(2); - getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated @@ -47,6 +50,31 @@ public class JdbcAggregateForceCompletio assertMockEndpointsSatisfied(); } + @Test + public void testForceCompletionTrueInclusive() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(3); + getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send a message to trigger completion of all groups, message should be aggregated + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("id", "3"); + headers.put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true); + template.sendBodyAndHeaders("direct:start", "test5", headers); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {