Author: davsclaus Date: Fri Nov 11 08:49:59 2011 New Revision: 1200771 URL: http://svn.apache.org/viewvc?rev=1200771&view=rev Log: CAMEL-4660: Aggregate EIP should not use 1s as the first timeout, but instead the configured value. Thanks to Ole Hofstad for the patch.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1200771&r1=1200770&r2=1200771&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Nov 11 08:49:59 2011 @@ -859,7 +859,7 @@ public class AggregateProcessor extends setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); } // trigger completion based on interval - getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS); + getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS); } // start timeout service if its in use Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1200771&r1=1200770&r2=1200771&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Nov 11 08:49:59 2011 @@ -279,6 +279,52 @@ public class AggregateProcessorTest exte ap.stop(); } + + public void testAggregateInitialCompletionInterval() throws Exception { + // camel context must be started + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B", "C+D"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + + AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); + ap.setCompletionInterval(2000); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 123); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("C"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("D"); + e4.getIn().setHeader("id", 123); + + ap.process(e1); + + Thread.sleep(1500L); + ap.process(e2); + + Thread.sleep(500L); + ap.process(e3); + ap.process(e4); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } public void testAggregateIgnoreInvalidCorrelationKey() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result");