Author: davsclaus Date: Sun Sep 26 14:30:47 2010 New Revision: 1001436 URL: http://svn.apache.org/viewvc?rev=1001436&view=rev Log: CAMEL-3159: Added discardOnCompletionTimeout to aggregator EIP.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java - copied, changed from r1001415, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java - copied, changed from r1001415, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml - copied, changed from r1001415, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1001436&r1=1001435&r2=1001436&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sun Sep 26 14:30:47 2010 @@ -90,6 +90,8 @@ public class AggregateDefinition extends private Boolean ignoreInvalidCorrelationKeys; @XmlAttribute private Integer closeCorrelationKeyOnCompletion; + @XmlAttribute + private Boolean discardOnCompletionTimeout; public AggregateDefinition() { } @@ -203,6 +205,9 @@ public class AggregateDefinition extends if (getCloseCorrelationKeyOnCompletion() != null) { answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion()); } + if (isDiscardOnCompletionTimeout() != null) { + answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout()); + } return answer; } @@ -390,6 +395,14 @@ public class AggregateDefinition extends this.aggregationRepositoryRef = aggregationRepositoryRef; } + public Boolean isDiscardOnCompletionTimeout() { + return discardOnCompletionTimeout; + } + + public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { + this.discardOnCompletionTimeout = discardOnCompletionTimeout; + } + // Fluent API //------------------------------------------------------------------------- @@ -430,6 +443,18 @@ public class AggregateDefinition extends } /** + * Discards the aggregated message on completion timeout. + * <p/> + * This means on timeout the aggregated message is dropped and not sent out of the aggregator. + * + * @return builder + */ + public AggregateDefinition discardOnCompletionTimeout() { + setDiscardOnCompletionTimeout(true); + return this; + } + + /** * Enables the batch completion mode where we aggregate from a {...@link org.apache.camel.BatchConsumer} * and aggregate the total number of exchanges the {...@link org.apache.camel.BatchConsumer} has reported * as total by checking the exchange property {...@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. @@ -641,4 +666,5 @@ public class AggregateDefinition extends public void setOutputs(List<ProcessorDefinition> outputs) { this.outputs = outputs; } + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1001436&r1=1001435&r2=1001436&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Sep 26 14:30:47 2010 @@ -118,6 +118,7 @@ public class AggregateProcessor extends private Expression completionSizeExpression; private boolean completionFromBatchConsumer; private AtomicInteger batchConsumerCounter = new AtomicInteger(); + private boolean discardOnCompletionTimeout; public AggregateProcessor(CamelContext camelContext, Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy, @@ -359,6 +360,14 @@ public class AggregateProcessor extends closedCorrelationKeys.put(key, key); } + if (fromTimeout && isDiscardOnCompletionTimeout()) { + // discard due timeout + if (LOG.isDebugEnabled()) { + LOG.debug("Aggregation for correlation key " + key + " discarding aggregated exchange: " + exchange); + } + return; + } + onSubmitCompletion(key, exchange); } @@ -503,6 +512,14 @@ public class AggregateProcessor extends this.aggregationRepository = aggregationRepository; } + public boolean isDiscardOnCompletionTimeout() { + return discardOnCompletionTimeout; + } + + public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) { + this.discardOnCompletionTimeout = discardOnCompletionTimeout; + } + /** * On completion task which keeps the booking of the in progress up to date */ Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java (from r1001415, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java&r1=1001415&r2=1001436&rev=1001436&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java Sun Sep 26 14:30:47 2010 @@ -16,23 +16,40 @@ */ package org.apache.camel.processor.aggregator; +import java.util.concurrent.TimeUnit; + import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.BodyInAggregatingStrategy; /** * @version $Revision$ */ -public class AggregateSimpleTimeoutTest extends ContextTestSupport { +public class AggregateDiscardOnTimeoutTest extends ContextTestSupport { + + public void testAggregateDiscardOnTimeout() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + + // wait 3 seconds + Thread.sleep(3000); + + mock.assertIsSatisfied(); - public void testAggregateSimpleTimeout() throws Exception { - getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C"); + // now send 3 which does not timeout + mock.reset(); + mock.expectedBodiesReceived("C+D+E"); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123); - assertMockEndpointsSatisfied(); + // should complete before timeout + mock.await(1500, TimeUnit.MILLISECONDS); } @Override @@ -42,11 +59,12 @@ public class AggregateSimpleTimeoutTest public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") - // aggregate all exchanges correlated by the id header. - // Aggregate them using the BodyInAggregatingStrategy strategy which - // and after 3 seconds of inactivity them timeout and complete the aggregation - // and send it to mock:aggregated - .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(3000) + .aggregate(header("id"), new BodyInAggregatingStrategy()) + .completionSize(3) + // use a 3 second timeout + .completionTimeout(2000) + // and if timeout occurred then just discard the aggregated message + .discardOnCompletionTimeout() .to("mock:aggregated"); // END SNIPPET: e1 } Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java (from r1001415, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java&r1=1001415&r2=1001436&rev=1001436&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java Sun Sep 26 14:30:47 2010 @@ -17,14 +17,14 @@ package org.apache.camel.spring.processor.aggregator; import org.apache.camel.CamelContext; -import org.apache.camel.processor.aggregator.AggregateSimpleTimeoutTest; +import org.apache.camel.processor.aggregator.AggregateDiscardOnTimeoutTest; import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; /** * @version $Revision$ */ -public class SpringAggregateSimpleTimeoutTest extends AggregateSimpleTimeoutTest { +public class SpringAggregateDiscardOnTimeoutTest extends AggregateDiscardOnTimeoutTest { protected CamelContext createCamelContext() throws Exception { return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml"); Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml (from r1001415, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=1001415&r2=1001436&rev=1001436&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml Sun Sep 26 14:30:47 2010 @@ -26,7 +26,7 @@ <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> - <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000"> + <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000" discardOnCompletionTimeout="true"> <correlationExpression> <simple>header.id</simple> </correlationExpression>