Repository: camel
Updated Branches:
  refs/heads/master bcaddc146 -> c483de770


CAMEL-9017: Camel-Hazelcast: HazelcastAggregationRepository::confirm should 
check useRecovery before using persistedCache


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c483de77
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c483de77
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c483de77

Branch: refs/heads/master
Commit: c483de7707f82ec59a1fc06c481754cecffe502f
Parents: bcaddc1
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Sun Jul 26 12:11:13 2015 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sun Jul 26 12:11:13 2015 +0200

----------------------------------------------------------------------
 .../HazelcastAggregationRepository.java         |  4 +-
 ...azelcastAggregationRepositoryRoutesTest.java | 55 ++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c483de77/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
index 8b85b3a..65207c2 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
@@ -358,7 +358,9 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
     @Override
     public void confirm(CamelContext camelContext, String exchangeId) {
         LOG.trace("Confirming an exchange with ID {}.", exchangeId);
-        persistedCache.remove(exchangeId);
+        if (useRecovery) {
+            persistedCache.remove(exchangeId);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/c483de77/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
index 8becc17..3d56cf5 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
@@ -91,5 +91,60 @@ public class HazelcastAggregationRepositoryRoutesTest 
extends HazelcastAggregati
 
         mock.assertIsSatisfied();
     }
+    
+    @Test
+    public void checkAggregationFromTwoRoutesNoRecovery() throws Exception {
+        final HazelcastAggregationRepository repoOne =
+                new HazelcastAggregationRepository(REPO_NAME, false, 
getFirstInstance());
+        
+
+        final HazelcastAggregationRepository repoTwo =
+                new HazelcastAggregationRepository(REPO_NAME, false, 
getSecondInstance());
+        
+        repoOne.setUseRecovery(false);
+        repoTwo.setUseRecovery(false);
+        
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_ONE).routeId("AggregatingRouteOne")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoOne)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        RouteBuilder rbTwo = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_TWO).routeId("AggregatingRouteTwo")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoTwo)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().addRoutes(rbTwo);
+        context().start();
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(1 + 2 + 3 + 4);
+
+        produceOne.sendBodyAndHeader(1, correlator, correlator);
+        produceTwo.sendBodyAndHeader(2, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+
+        mock.assertIsSatisfied();
+    }
 
 }

Reply via email to