Author: boday Date: Wed Sep 21 19:45:44 2011 New Revision: 1173810 URL: http://svn.apache.org/viewvc?rev=1173810&view=rev Log: CAMEL-4097 updated Aggregator to support forcing completion of groups when context is stopped
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1173810&r1=1173809&r2=1173810&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Sep 21 19:45:44 2011 @@ -93,6 +93,8 @@ public class AggregateDefinition extends private Integer closeCorrelationKeyOnCompletion; @XmlAttribute private Boolean discardOnCompletionTimeout; + @XmlAttribute + private Boolean forceCompletionOnStop; public AggregateDefinition() { } @@ -209,6 +211,9 @@ public class AggregateDefinition extends if (getDiscardOnCompletionTimeout() != null) { answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout()); } + if (getForceCompletionOnStop() != null) { + answer.setForceCompletionOnStop(getForceCompletionOnStop()); + } return answer; } @@ -648,6 +653,29 @@ public class AggregateDefinition extends return this; } + /** + * Sets the force completion on stop flag, which considers the current group as complete + * and sends out the aggregated exchange when the stop event is executed + * + * @return builder + */ + public AggregateDefinition forceCompletionOnStop() { + setForceCompletionOnStop(true); + return this; + } + + public Boolean getForceCompletionOnStop() { + return forceCompletionOnStop; + } + + public boolean isForceCompletionOnStop() { + return forceCompletionOnStop != null && forceCompletionOnStop; + } + + public void setForceCompletionOnStop(Boolean forceCompletionOnStop) { + this.forceCompletionOnStop = forceCompletionOnStop; + } + /** * Sending the aggregated output in parallel * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1173810&r1=1173809&r2=1173810&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Sep 21 19:45:44 2011 @@ -115,6 +115,7 @@ public class AggregateProcessor extends private boolean completionFromBatchConsumer; private AtomicInteger batchConsumerCounter = new AtomicInteger(); private boolean discardOnCompletionTimeout; + private boolean forceCompletionOnStop; private ProducerTemplate deadLetterProducerTemplate; @@ -567,6 +568,10 @@ public class AggregateProcessor extends this.discardOnCompletionTimeout = discardOnCompletionTimeout; } + public void setForceCompletionOnStop(boolean forceCompletionOnStop) { + this.forceCompletionOnStop = forceCompletionOnStop; + } + /** * On completion task which keeps the booking of the in progress up to date */ @@ -859,6 +864,16 @@ public class AggregateProcessor extends @Override protected void doStop() throws Exception { + + if (forceCompletionOnStop) { + forceCompletionOfAllGroups(); + + while (inProgressCompleteExchanges.size() > 0) { + LOG.trace("waiting for {} in progress exchanges to complete", inProgressCompleteExchanges.size()); + Thread.sleep(100); + } + } + if (recoverService != null) { camelContext.getExecutorServiceManager().shutdownNow(recoverService); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java?rev=1173810&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java Wed Sep 21 19:45:44 2011 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInAggregatingStrategy; + +/** + * @version + */ +public class AggregateForceCompletionOnStopTest extends ContextTestSupport { + + MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor(); + + public void testForceCompletionTrue() throws Exception { + myCompletionProcessor.reset(); + context.getShutdownStrategy().setShutdownNowOnTimeout(true); + context.getShutdownStrategy().setTimeout(5); + + template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1"); + template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2"); + template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1"); + template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2"); + assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount()); + context.stop(); + assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount()); + } + + public void testForceCompletionFalse() throws Exception { + myCompletionProcessor.reset(); + context.getShutdownStrategy().setShutdownNowOnTimeout(true); + context.getShutdownStrategy().setTimeout(5); + + template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1"); + template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2"); + template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1"); + template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2"); + assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount()); + context.stop(); + assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + + from("direct:forceCompletionTrue") + .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10) + .delay(100) + .process(myCompletionProcessor); + + from("direct:forceCompletionFalse") + .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10) + .delay(100) + .process(myCompletionProcessor); + } + }; + } +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java?rev=1173810&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java Wed Sep 21 19:45:44 2011 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +public class MyCompletionProcessor implements Processor { + private static int aggregationCount; + + public int getAggregationCount() { + return aggregationCount; + } + + @Override + public void process(Exchange exchange) throws Exception { + aggregationCount++; + } + + public void reset() { + aggregationCount = 0; + } +} Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java?rev=1173810&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java (added) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java Wed Sep 21 19:45:44 2011 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor.aggregator; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.aggregator.AggregateForceCompletionOnStopTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +/** + * @version + */ +public class SpringAggregateForceCompletionOnStopTest extends AggregateForceCompletionOnStopTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml"); + } + +} \ No newline at end of file Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml?rev=1173810&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml (added) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml Wed Sep 21 19:45:44 2011 @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:forceCompletionTrue"/> + <aggregate strategyRef="aggregatorStrategy" forceCompletionOnStop="true" completionSize="10"> + <correlationExpression><header>id</header></correlationExpression> + <process ref="myCompletionProcessor"/> + </aggregate> + </route> + + <route> + <from uri="direct:forceCompletionFalse"/> + <aggregate strategyRef="aggregatorStrategy" completionSize="10"> + <correlationExpression><header>id</header></correlationExpression> + <process ref="myCompletionProcessor"/> + </aggregate> + </route> + + </camelContext> + + <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> + <bean id="myCompletionProcessor" class="org.apache.camel.processor.aggregator.MyCompletionProcessor"/> + +</beans>