Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x e07b08a01 -> ee12b67fa


CAMEL-9017: Camel-Hazelcast: HazelcastAggregationRepository::confirm should 
check useRecovery before using persistedCache


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee12b67f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee12b67f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee12b67f

Branch: refs/heads/camel-2.15.x
Commit: ee12b67fa2c208b2f2c4cc9c4098c2d5a5bf8646
Parents: e07b08a
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Sun Jul 26 12:11:13 2015 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sun Jul 26 12:22:19 2015 +0200

----------------------------------------------------------------------
 .../HazelcastAggregationRepository.java         |  4 +-
 ...azelcastAggregationRepositoryRoutesTest.java | 55 ++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ee12b67f/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
index 66ba725..e67be62 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
@@ -359,7 +359,9 @@ public final class HazelcastAggregationRepository extends 
ServiceSupport
     @Override
     public void confirm(CamelContext camelContext, String exchangeId) {
         LOG.trace("Confirming an exchange with ID {}.", exchangeId);
-        persistedCache.remove(exchangeId);
+        if (useRecovery) {
+            persistedCache.remove(exchangeId);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/ee12b67f/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
index 8becc17..3d56cf5 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
@@ -91,5 +91,60 @@ public class HazelcastAggregationRepositoryRoutesTest 
extends HazelcastAggregati
 
         mock.assertIsSatisfied();
     }
+    
+    @Test
+    public void checkAggregationFromTwoRoutesNoRecovery() throws Exception {
+        final HazelcastAggregationRepository repoOne =
+                new HazelcastAggregationRepository(REPO_NAME, false, 
getFirstInstance());
+        
+
+        final HazelcastAggregationRepository repoTwo =
+                new HazelcastAggregationRepository(REPO_NAME, false, 
getSecondInstance());
+        
+        repoOne.setUseRecovery(false);
+        repoTwo.setUseRecovery(false);
+        
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_ONE).routeId("AggregatingRouteOne")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoOne)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        RouteBuilder rbTwo = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_TWO).routeId("AggregatingRouteTwo")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoTwo)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().addRoutes(rbTwo);
+        context().start();
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(1 + 2 + 3 + 4);
+
+        produceOne.sendBodyAndHeader(1, correlator, correlator);
+        produceTwo.sendBodyAndHeader(2, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+
+        mock.assertIsSatisfied();
+    }
 
 }

Reply via email to