Author: davsclaus Date: Fri Nov 11 09:12:33 2011 New Revision: 1200781 URL: http://svn.apache.org/viewvc?rev=1200781&view=rev Log: CAMEL-4660: Aggregate EIP should not use 1s as the first timeout, but instead the configured value. Thanks to Ole Hofstad for the patch.
Modified: camel/branches/camel-2.8.x/ (props changed) camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 11 09:12:33 2011 @@ -1 +1 @@ -/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739,1199804,1200214 +/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739,1199804,1200214,1200771 Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1200781&r1=1200780&r2=1200781&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Nov 11 09:12:33 2011 @@ -844,7 +844,7 @@ public class AggregateProcessor extends LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis."); ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); // trigger completion based on interval - scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS); + scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS); } // start timeout service if its in use Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1200781&r1=1200780&r2=1200781&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Nov 11 09:12:33 2011 @@ -279,6 +279,52 @@ public class AggregateProcessorTest exte ap.stop(); } + + public void testAggregateInitialCompletionInterval() throws Exception { + // camel context must be started + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B", "C+D"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + + AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); + ap.setCompletionInterval(2000); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 123); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("C"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("D"); + e4.getIn().setHeader("id", 123); + + ap.process(e1); + + Thread.sleep(1500L); + ap.process(e2); + + Thread.sleep(500L); + ap.process(e3); + ap.process(e4); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } public void testAggregateIgnoreInvalidCorrelationKey() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result");