CAMEL-9193: Aggregator in preCompletion mode should also timeout if the new group does not receive further messages
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/925265c4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/925265c4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/925265c4 Branch: refs/heads/camel-2.16.x Commit: 925265c4c3ce251a1bf4d00811b1d0c9e0a42af8 Parents: ff51e64 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Oct 6 10:16:55 2015 +0200 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Oct 8 11:24:32 2015 +0800 ---------------------------------------------------------------------- .../processor/aggregate/AggregateProcessor.java | 5 ++++- ...gatePreCompleteAwareStrategyTimeoutTest.java | 22 ++++++++++++++++++++ camel-core/src/test/resources/log4j.properties | 4 ++-- 3 files changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 5c400f6..822e831 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -438,6 +438,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor originalExchange = null; // and reset the size to 1 size = 1; + // make sure to track timeout as we just restart the correlation group when we are in pre completion mode + trackTimeout(key, newExchange); } // aggregate the exchanges @@ -495,7 +497,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor batchConsumerCorrelationKeys.clear(); // we have already submitted to completion, so answer should be null answer = null; - } else { + } else if (answer != null) { // we are complete for this exchange answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); answer = onCompletion(key, originalExchange, answer, false); @@ -650,6 +652,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor if (!fromTimeout && timeoutMap != null) { // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) + LOG.trace("Removing correlation key {} from timeout", key); timeoutMap.remove(key); } http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java index abfda10..ffcb03f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java @@ -40,6 +40,28 @@ public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSup assertMockEndpointsSatisfied(); } + public void testAggregatePreCompleteTimeoutOnlyOneInLastGroup() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "D", "id", 123); + template.sendBodyAndHeader("direct:start", "E", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + + assertMockEndpointsSatisfied(); + } + + public void testAggregatePreCompleteTimeoutOnlyOneInFirstGroup() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("X"); + + template.sendBodyAndHeader("direct:start", "X", "id", 123); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { http://git-wip-us.apache.org/repos/asf/camel/blob/925265c4/camel-core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/camel-core/src/test/resources/log4j.properties b/camel-core/src/test/resources/log4j.properties index e91928c..f8ecded 100644 --- a/camel-core/src/test/resources/log4j.properties +++ b/camel-core/src/test/resources/log4j.properties @@ -23,7 +23,7 @@ log4j.logger.org.apache.camel.customlogger=TRACE, file2 #log4j.logger.org.apache.camel.impl.converter=WARN #log4j.logger.org.apache.camel.management=DEBUG -log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN +#log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN #log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE #log4j.logger.org.apache.camel.impl.converter=DEBUG @@ -49,7 +49,7 @@ log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE #log4j.logger.org.apache.camel.processor.Delayer=TRACE #log4j.logger.org.apache.camel.processor.Throttler=TRACE -#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=DEBUG +#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=TRACE #log4j.logger.org.apache.camel.impl=TRACE #log4j.logger.org.apache.camel.util.FileUtil=TRACE #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE