Author: boday Date: Thu Aug 4 03:30:08 2011 New Revision: 1153737 URL: http://svn.apache.org/viewvc?rev=1153737&view=rev Log: CAMEL-4118 added support for manually completing all message groups with a signal message
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1153737&r1=1153736&r2=1153737&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Aug 4 03:30:08 2011 @@ -78,6 +78,7 @@ public interface Exchange { String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy"; String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey"; String AGGREGATION_STRATEGY = "CamelAggregationStrategy"; + String AGGREGATION_COMPLETE_ALL_GROUPS = "CamelAggregationCompleteAllGroups"; String ASYNC_WAIT = "CamelAsyncWait"; String BATCH_INDEX = "CamelBatchIndex"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1153737&r1=1153736&r2=1153737&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Aug 4 03:30:08 2011 @@ -157,6 +157,14 @@ public class AggregateProcessor extends } public void process(Exchange exchange) throws Exception { + + //check for the special header to force completion of all groups (and ignore the exchange otherwise) + boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + if (completeAllGroups) { + forceCompletionOfAllGroups(); + return; + } + // compute correlation expression String key = correlationExpression.evaluate(exchange, String.class); if (ObjectHelper.isEmpty(key)) { @@ -879,4 +887,37 @@ public class AggregateProcessor extends super.doShutdown(); } + public void forceCompletionOfAllGroups() { + + // only run if CamelContext has been fully started + if (!camelContext.getStatus().isStarted()) { + LOG.warn("cannot start force completion because CamelContext({}) has not been started yet", camelContext.getName()); + return; + } + + LOG.trace("Starting force completion of all groups task"); + + // trigger completion for all in the repository + Set<String> keys = aggregationRepository.getKeys(); + + if (keys != null && !keys.isEmpty()) { + // must acquire the shared aggregation lock to be able to trigger force completion + lock.lock(); + try { + for (String key : keys) { + Exchange exchange = aggregationRepository.get(camelContext, key); + if (exchange != null) { + LOG.trace("force completion triggered for correlation key: {}", key); + // indicate it was completed by a force completion request + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + onCompletion(key, exchange, false); + } + } + } finally { + lock.unlock(); + } + } + + LOG.trace("Completed force completion of all groups task"); + } } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java?rev=1153737&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java Thu Aug 4 03:30:08 2011 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.junit.Test; + +/** + * To test CAMEL-4118 support for completing all aggregation groups with a signal message + */ +public class AggregateForceCompletionHeaderTest extends ContextTestSupport { + + @Test + public void testForceCompletionTrue() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(2); + getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send the signal message to trigger completion of all groups, message should NOT be aggregated + template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + + from("direct:start") + .aggregate(header("id"), new MyAggregationStrategy()) + .completionSize(10) + .to("mock:aggregated"); + } + }; + } + + public static class MyAggregationStrategy implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String body1 = oldExchange.getIn().getBody(String.class); + String body2 = newExchange.getIn().getBody(String.class); + + oldExchange.getIn().setBody(body1 + body2); + return oldExchange; + } + } +} \ No newline at end of file Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1153737&r1=1153736&r2=1153737&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Thu Aug 4 03:30:08 2011 @@ -559,4 +559,50 @@ public class AggregateProcessorTest exte ap.stop(); } + public void testAggregateForceCompletion() throws Exception { + // camel context must be started + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("B+END", "A+END"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + + AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); + ap.setCompletionSize(10); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 456); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("END"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("END"); + e4.getIn().setHeader("id", 456); + + ap.process(e1); + ap.process(e2); + ap.process(e3); + ap.process(e4); + + assertEquals("should not have completed yet", 0, mock.getExchanges().size()); + + ap.forceCompletionOfAllGroups(); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } + } Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java?rev=1153737&view=auto ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java (added) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java Thu Aug 4 03:30:08 2011 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hawtdb; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * To test CAMEL-4118 support for completing all aggregation groups with a signal message + */ +public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport { + + @Override + public void setUp() throws Exception { + deleteDirectory("target/data"); + super.setUp(); + } + + @Test + public void testForceCompletionTrue() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(2); + getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send the signal message to trigger completion of all groups, message should NOT be aggregated + template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // create the hawtdb repo + HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat"); + + // here is the Camel route where we aggregate + from("direct:start") + .aggregate(header("id"), new MyAggregationStrategy()) + // use our created hawtdb repo as aggregation repository + .completionSize(10).aggregationRepository(repo) + .to("mock:aggregated"); + } + }; + } + + public static class MyAggregationStrategy implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String body1 = oldExchange.getIn().getBody(String.class); + String body2 = newExchange.getIn().getBody(String.class); + + oldExchange.getIn().setBody(body1 + body2); + return oldExchange; + } + } +} \ No newline at end of file Added: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java?rev=1153737&view=auto ============================================================================== --- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java (added) +++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java Thu Aug 4 03:30:08 2011 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.jdbc; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * To test CAMEL-4118 support for completing all aggregation groups with a signal message + */ +public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregationTestSupport { + + @Test + public void testForceCompletionTrue() throws Exception { + + getMockEndpoint("mock:aggregated").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "2"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "2"); + + assertMockEndpointsSatisfied(); + + getMockEndpoint("mock:aggregated").expectedMessageCount(2); + getMockEndpoint("mock:aggregated").expectedBodiesReceived("test1test3", "test2test4"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + + //now send the signal message to trigger completion of all groups, message should NOT be aggregated + template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // here is the Camel route where we aggregate + from("direct:start") + .aggregate(header("id"), new MyAggregationStrategy()) + // use our created jdbc repo as aggregation repository + .completionSize(10).aggregationRepository(repo) + .to("mock:aggregated"); + } + }; + } +} \ No newline at end of file