Repository: camel Updated Branches: refs/heads/master bcaddc146 -> c483de770
CAMEL-9017: Camel-Hazelcast: HazelcastAggregationRepository::confirm should check useRecovery before using persistedCache Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c483de77 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c483de77 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c483de77 Branch: refs/heads/master Commit: c483de7707f82ec59a1fc06c481754cecffe502f Parents: bcaddc1 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sun Jul 26 12:11:13 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sun Jul 26 12:11:13 2015 +0200 ---------------------------------------------------------------------- .../HazelcastAggregationRepository.java | 4 +- ...azelcastAggregationRepositoryRoutesTest.java | 55 ++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c483de77/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java index 8b85b3a..65207c2 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java @@ -358,7 +358,9 @@ public class HazelcastAggregationRepository extends ServiceSupport @Override public void confirm(CamelContext camelContext, String exchangeId) { LOG.trace("Confirming an exchange with ID {}.", exchangeId); - persistedCache.remove(exchangeId); + if (useRecovery) { + persistedCache.remove(exchangeId); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c483de77/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java index 8becc17..3d56cf5 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java @@ -91,5 +91,60 @@ public class HazelcastAggregationRepositoryRoutesTest extends HazelcastAggregati mock.assertIsSatisfied(); } + + @Test + public void checkAggregationFromTwoRoutesNoRecovery() throws Exception { + final HazelcastAggregationRepository repoOne = + new HazelcastAggregationRepository(REPO_NAME, false, getFirstInstance()); + + + final HazelcastAggregationRepository repoTwo = + new HazelcastAggregationRepository(REPO_NAME, false, getSecondInstance()); + + repoOne.setUseRecovery(false); + repoTwo.setUseRecovery(false); + + final int completionSize = 4; + final String correlator = "CORRELATOR"; + RouteBuilder rbOne = new RouteBuilder() { + @Override + public void configure() throws Exception { + + from(DIRECT_ONE).routeId("AggregatingRouteOne") + .aggregate(header(correlator)) + .aggregationRepository(repoOne) + .aggregationStrategy(new SumOfIntsAggregationStrategy()) + .completionSize(completionSize) + .to(MOCK_GOTCHA); + } + }; + + RouteBuilder rbTwo = new RouteBuilder() { + @Override + public void configure() throws Exception { + + from(DIRECT_TWO).routeId("AggregatingRouteTwo") + .aggregate(header(correlator)) + .aggregationRepository(repoTwo) + .aggregationStrategy(new SumOfIntsAggregationStrategy()) + .completionSize(completionSize) + .to(MOCK_GOTCHA); + } + }; + + context().addRoutes(rbOne); + context().addRoutes(rbTwo); + context().start(); + + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(1 + 2 + 3 + 4); + + produceOne.sendBodyAndHeader(1, correlator, correlator); + produceTwo.sendBodyAndHeader(2, correlator, correlator); + produceOne.sendBodyAndHeader(3, correlator, correlator); + produceOne.sendBodyAndHeader(4, correlator, correlator); + + mock.assertIsSatisfied(); + } }