CAMEL-7853: Implement AggregationRepository based on Infinispan data grid - 
Fixed CS and minor fixes


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/55ab6e16
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/55ab6e16
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/55ab6e16

Branch: refs/heads/master
Commit: 55ab6e164e366d866c803eb2d35540a6192b9071
Parents: 13ccd90
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Sat Apr 23 12:14:05 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sat Apr 23 12:14:41 2016 +0200

----------------------------------------------------------------------
 .../InfinispanLocalAggregationRepository.java   | 156 ++++++++------
 ...spanAggregationRepositoryOperationsTest.java | 208 ------------------
 .../InfinispanAggregationRepositoryTest.java    |  97 ---------
 ...ocalAggregationRepositoryOperationsTest.java | 210 +++++++++++++++++++
 ...nfinispanLocalAggregationRepositoryTest.java |  96 +++++++++
 5 files changed, 393 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
index 13374b9..b6bd872 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.infinispan.processor.aggregate;
 
 import java.util.Collections;
@@ -47,7 +63,7 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         this.cacheName = cacheName;
     }
 
-       @Override
+    @Override
     public Exchange add(final CamelContext camelContext, final String key, 
final Exchange exchange) {
         LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe 
manner.", exchange.getExchangeId(), key);
         DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
@@ -55,41 +71,41 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         return unmarshallExchange(camelContext, oldHolder);
     }
 
-       @Override
-       public Exchange get(CamelContext camelContext, String key) {
+    @Override
+    public Exchange get(CamelContext camelContext, String key) {
         return unmarshallExchange(camelContext, (DefaultExchangeHolder) 
cache.get(key));
-       }
+    }
 
-       @Override
-       public void remove(CamelContext camelContext, String key, Exchange 
exchange) {
+    @Override
+    public void remove(CamelContext camelContext, String key, Exchange 
exchange) {
         LOG.trace("Removing an exchange with ID {} for key {} ", 
exchange.getExchangeId(), key);
-        cache.remove(key);             
-       }
+        cache.remove(key);
+    }
 
-       @Override
-       public void confirm(CamelContext camelContext, String exchangeId) {
+    @Override
+    public void confirm(CamelContext camelContext, String exchangeId) {
         LOG.trace("Confirming an exchange with ID {}.", exchangeId);
-        cache.remove(exchangeId);      
-       }
+        cache.remove(exchangeId);
+    }
 
-       @Override
-       public Set<String> getKeys() {
+    @Override
+    public Set<String> getKeys() {
         return Collections.unmodifiableSet(cache.keySet());
-       }
+    }
 
-       @Override
-       public Set<String> scan(CamelContext camelContext) {
+    @Override
+    public Set<String> scan(CamelContext camelContext) {
         LOG.trace("Scanning for exchanges to recover in {} context", 
camelContext.getName());
         Set<String> scanned = Collections.unmodifiableSet(cache.keySet());
         LOG.trace("Found {} keys for exchanges to recover in {} context", 
scanned.size(), camelContext.getName());
         return scanned;
-       }
+    }
 
-       @Override
-       public Exchange recover(CamelContext camelContext, String exchangeId) {
+    @Override
+    public Exchange recover(CamelContext camelContext, String exchangeId) {
         LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
         return useRecovery ? unmarshallExchange(camelContext, 
cache.get(exchangeId)) : null;
-       }
+    }
 
     @Override
     public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
@@ -101,10 +117,10 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         this.recoveryInterval = interval;
     }
     
-       @Override
-       public long getRecoveryIntervalInMillis() {
+    @Override
+    public long getRecoveryIntervalInMillis() {
         return recoveryInterval;
-       }
+    }
 
     @Override
     public void setUseRecovery(boolean useRecovery) {
@@ -136,8 +152,8 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         return maximumRedeliveries;
     }
 
-       @Override
-       protected void doStart() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
         if (maximumRedeliveries < 0) {
             throw new IllegalArgumentException("Maximum redelivery retries 
must be zero or a positive integer.");
         }
@@ -146,20 +162,22 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         }
         if (ObjectHelper.isEmpty(configuration)) {
             manager = new DefaultCacheManager();
+            manager.start();
         } else {
-               manager = new DefaultCacheManager(configuration);               
+            manager = new DefaultCacheManager(configuration);
+            manager.start();
         }        
         if (ObjectHelper.isEmpty(cacheName)) {
             cache = manager.getCache();
         } else {
-            cache = manager.getCache(cacheName);               
+            cache = manager.getCache(cacheName);
         }
-       }
+    }
 
-       @Override
-       protected void doStop() throws Exception {
-               
-       }
+    @Override
+    protected void doStop() throws Exception {
+        manager.stop();
+    }
 
     protected Exchange unmarshallExchange(CamelContext camelContext, 
DefaultExchangeHolder holder) {
         Exchange exchange = null;
@@ -170,51 +188,51 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         return exchange;
     }
     
-       public DefaultCacheManager getManager() {
-               return manager;
-       }
+    public DefaultCacheManager getManager() {
+        return manager;
+    }
 
-       public void setManager(DefaultCacheManager manager) {
-               this.manager = manager;
-       }
+    public void setManager(DefaultCacheManager manager) {
+        this.manager = manager;
+    }
 
-       public String getCacheName() {
-               return cacheName;
-       }
+    public String getCacheName() {
+        return cacheName;
+    }
 
-       public void setCacheName(String cacheName) {
-               this.cacheName = cacheName;
-       }
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
 
-       public String getDeadLetterChannel() {
-               return deadLetterChannel;
-       }
+    public String getDeadLetterChannel() {
+        return deadLetterChannel;
+    }
 
-       public void setDeadLetterChannel(String deadLetterChannel) {
-               this.deadLetterChannel = deadLetterChannel;
-       }
+    public void setDeadLetterChannel(String deadLetterChannel) {
+        this.deadLetterChannel = deadLetterChannel;
+    }
 
-       public boolean isAllowSerializedHeaders() {
-               return allowSerializedHeaders;
-       }
+    public boolean isAllowSerializedHeaders() {
+        return allowSerializedHeaders;
+    }
 
-       public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
-               this.allowSerializedHeaders = allowSerializedHeaders;
-       }
+    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+        this.allowSerializedHeaders = allowSerializedHeaders;
+    }
 
-       public Cache<String, DefaultExchangeHolder> getCache() {
-               return cache;
-       }
+    public Cache<String, DefaultExchangeHolder> getCache() {
+        return cache;
+    }
 
-       public void setCache(Cache<String, DefaultExchangeHolder> cache) {
-               this.cache = cache;
-       }
+    public void setCache(Cache<String, DefaultExchangeHolder> cache) {
+        this.cache = cache;
+    }
 
-       public Configuration getConfiguration() {
-               return configuration;
-       }
+    public Configuration getConfiguration() {
+        return configuration;
+    }
 
-       public void setConfiguration(Configuration configuration) {
-               this.configuration = configuration;
-       }
+    public void setConfiguration(Configuration configuration) {
+        this.configuration = configuration;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
deleted file mode 100644
index 11312c5..0000000
--- 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.processor.aggregate;
-
-import java.util.Set;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.DefaultExchangeHolder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Unite test for {@link InfinispanLocalAggregationRepository}
- */
-public class InfinispanAggregationRepositoryOperationsTest {
-
-    private static InfinispanLocalAggregationRepository aggregationRepository;
-    private CamelContext camelContext = new DefaultCamelContext();
-
-    @BeforeClass
-    public static void starting() throws Exception {
-        aggregationRepository = new 
InfinispanLocalAggregationRepository("pippo");
-        aggregationRepository.start();
-    }
-
-    @AfterClass
-    public static void stopping() throws Exception {
-        aggregationRepository.stop();
-    }
-
-    private boolean exists(String key) {
-        DefaultExchangeHolder holder = 
aggregationRepository.getCache().get(key);
-        if (holder == null) return false;
-        else return true;
-    }
-
-    @Test
-    public void testAdd() {
-        // Given
-        String key = "Add";
-        assertFalse(exists(key));
-        Exchange exchange = new DefaultExchange(camelContext);
-        // When
-        aggregationRepository.add(camelContext, key, exchange);
-        // Then
-        assertTrue(exists(key));
-    }
-
-    @Test
-    public void testGetExists() {
-        // Given
-        String key = "Get_Exists";
-        Exchange exchange = new DefaultExchange(camelContext);
-        aggregationRepository.add(camelContext, key, exchange);
-        assertTrue(exists(key));
-        
-        // When
-        Exchange exchange2 = aggregationRepository.get(camelContext, key);
-        // Then
-        assertNotNull(exchange2);
-        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
-    }
-
-    @Test
-    public void testGetNotExists() {
-        // Given
-        String key = "Get_NotExists";
-        assertFalse(exists(key));
-        // When
-        Exchange exchange2 = aggregationRepository.get(camelContext, key);
-        // Then
-        assertNull(exchange2);
-    }
-
-    @Test
-    public void testRemoveExists() {
-        // Given
-        String key = "Remove_Exists";
-        Exchange exchange = new DefaultExchange(camelContext);
-        aggregationRepository.add(camelContext, key, exchange);
-        assertTrue(exists(key));
-        // When
-        aggregationRepository.remove(camelContext, key, exchange);
-        // Then
-        assertFalse(exists(key));
-    }
-
-    @Test
-    public void testRemoveNotExists() {
-        // Given
-        String key = "RemoveNotExists";
-        Exchange exchange = new DefaultExchange(camelContext);
-        assertFalse(exists(key));
-        // When
-        aggregationRepository.remove(camelContext, key, exchange);
-        // Then
-        assertFalse(exists(key));
-    }
-
-    @Test
-    public void testGetKeys() {
-        // Given
-        String[] keys = {"GetKeys1", "GetKeys2"};
-        addExchanges(keys);
-        // When
-        Set<String> keySet = aggregationRepository.getKeys();
-        // Then
-        for (String key : keys) {
-            assertTrue(keySet.contains(key));
-        }
-    }
-
-    @Test
-    public void testConfirmExist() {
-        // Given
-        for (int i = 1; i < 4; i++) {
-            String key = "Confirm_" + i;
-            Exchange exchange = new DefaultExchange(camelContext);
-            exchange.setExchangeId("Exchange_" + i);
-            aggregationRepository.add(camelContext, key, exchange);
-            assertTrue(exists(key));
-        }
-        // When
-        aggregationRepository.confirm(camelContext, "Confirm_2");
-        // Then
-        assertTrue(exists("Confirm_1"));
-        assertFalse(exists("Confirm_2"));
-        assertTrue(exists("Confirm_3"));
-    }
-
-    @Test
-    public void testConfirmNotExist() {
-        // Given
-        String[] keys = new String[3];
-        for (int i = 1; i < 4; i++) {
-            keys[i - 1] = "Confirm" + i;
-        }
-        addExchanges(keys);
-        for (String key : keys) {
-            assertTrue(exists(key));
-        }
-        // When
-        aggregationRepository.confirm(camelContext, "Exchange-Confirm5");
-        // Then
-        for (String key : keys) {
-            assertTrue(exists(key));
-        }
-    }
-
-    private void addExchanges(String... keys) {
-        for (String key : keys) {
-            Exchange exchange = new DefaultExchange(camelContext);
-            exchange.setExchangeId("Exchange-" + key);
-            aggregationRepository.add(camelContext, key, exchange);
-        }
-    }
-
-    @Test
-    public void testScan() {
-        // Given
-        String[] keys = {"Scan1", "Scan2"};
-        addExchanges(keys);
-        // When
-        Set<String> exchangeIdSet = aggregationRepository.scan(camelContext);
-        // Then
-        for (String key : keys) {
-            assertTrue(exchangeIdSet.contains(key));
-        }
-    }
-
-    @Test
-    public void testRecover() {
-        // Given
-        String[] keys = {"Recover1", "Recover2"};
-        addExchanges(keys);
-        // When
-        Exchange exchange2 = aggregationRepository.recover(camelContext, 
"Recover2");
-        Exchange exchange3 = aggregationRepository.recover(camelContext, 
"Recover3");
-        // Then
-        assertNotNull(exchange2);
-        assertNull(exchange3);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
deleted file mode 100644
index b656662..0000000
--- 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.processor.aggregate;
-
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.Produce;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import 
org.apache.camel.component.infinispan.processor.aggregate.InfinispanLocalAggregationRepository;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-
-public class InfinispanAggregationRepositoryTest extends CamelTestSupport {
-
-    private static final String MOCK_GOTCHA = "mock:gotcha";
-    private static final String DIRECT_ONE = "direct:one";
-
-    @EndpointInject(uri = MOCK_GOTCHA)
-    private MockEndpoint mock;
-
-    @Produce(uri = DIRECT_ONE)
-    private ProducerTemplate produceOne;
-
-
-    @Test
-    public void checkAggregationFromOneRoute() throws Exception {
-        final InfinispanLocalAggregationRepository repoOne =
-                new InfinispanLocalAggregationRepository("pippo");
-
-        final int completionSize = 4;
-        final String correlator = "CORRELATOR";
-        RouteBuilder rbOne = new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-
-                from(DIRECT_ONE).routeId("AggregatingRouteOne")
-                        .aggregate(header(correlator))
-                        .aggregationRepository(repoOne)
-                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
-                        .completionSize(completionSize)
-                        .to(MOCK_GOTCHA);
-            }
-        };
-
-        context().addRoutes(rbOne);
-        context().start();
-
-        mock.expectedMessageCount(2);
-        mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21);
-
-        produceOne.sendBodyAndHeader(1, correlator, correlator);
-        produceOne.sendBodyAndHeader(3, correlator, correlator);
-        produceOne.sendBodyAndHeader(4, correlator, correlator);
-        produceOne.sendBodyAndHeader(5, correlator, correlator);
-        
-        produceOne.sendBodyAndHeader(6, correlator, correlator);
-        produceOne.sendBodyAndHeader(7, correlator, correlator);
-        produceOne.sendBodyAndHeader(20, correlator, correlator);
-        produceOne.sendBodyAndHeader(21, correlator, correlator);
-
-        mock.assertIsSatisfied();
-    }
-    
-    class SumOfIntsAggregationStrategy implements AggregationStrategy {
-        @Override
-        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-            if (oldExchange == null) {
-                return newExchange;
-            } else {
-                Integer n = newExchange.getIn().getBody(Integer.class);
-                Integer o = oldExchange.getIn().getBody(Integer.class);
-                Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
-                oldExchange.getIn().setBody(v, Integer.class);
-                return oldExchange;
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java
new file mode 100644
index 0000000..f1265c7
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unite test for {@link InfinispanLocalAggregationRepository}
+ */
+public class InfinispanLocalAggregationRepositoryOperationsTest {
+
+    private static InfinispanLocalAggregationRepository aggregationRepository;
+    private CamelContext camelContext = new DefaultCamelContext();
+
+    @BeforeClass
+    public static void starting() throws Exception {
+        aggregationRepository = new 
InfinispanLocalAggregationRepository("pippo");
+        aggregationRepository.start();
+    }
+
+    @AfterClass
+    public static void stopping() throws Exception {
+        aggregationRepository.stop();
+    }
+
+    private boolean exists(String key) {
+        DefaultExchangeHolder holder = 
aggregationRepository.getCache().get(key);
+        if (holder == null) {
+            return false;
+        } 
+        return true;
+    }
+
+    @Test
+    public void testAdd() {
+        // Given
+        String key = "Add";
+        assertFalse(exists(key));
+        Exchange exchange = new DefaultExchange(camelContext);
+        // When
+        aggregationRepository.add(camelContext, key, exchange);
+        // Then
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testGetExists() {
+        // Given
+        String key = "Get_Exists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        aggregationRepository.add(camelContext, key, exchange);
+        assertTrue(exists(key));
+        
+        // When
+        Exchange exchange2 = aggregationRepository.get(camelContext, key);
+        // Then
+        assertNotNull(exchange2);
+        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+    }
+
+    @Test
+    public void testGetNotExists() {
+        // Given
+        String key = "Get_NotExists";
+        assertFalse(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(camelContext, key);
+        // Then
+        assertNull(exchange2);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        aggregationRepository.add(camelContext, key, exchange);
+        assertTrue(exists(key));
+        // When
+        aggregationRepository.remove(camelContext, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "RemoveNotExists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        assertFalse(exists(key));
+        // When
+        aggregationRepository.remove(camelContext, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testGetKeys() {
+        // Given
+        String[] keys = {"GetKeys1", "GetKeys2"};
+        addExchanges(keys);
+        // When
+        Set<String> keySet = aggregationRepository.getKeys();
+        // Then
+        for (String key : keys) {
+            assertTrue(keySet.contains(key));
+        }
+    }
+
+    @Test
+    public void testConfirmExist() {
+        // Given
+        for (int i = 1; i < 4; i++) {
+            String key = "Confirm_" + i;
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setExchangeId("Exchange_" + i);
+            aggregationRepository.add(camelContext, key, exchange);
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Confirm_2");
+        // Then
+        assertTrue(exists("Confirm_1"));
+        assertFalse(exists("Confirm_2"));
+        assertTrue(exists("Confirm_3"));
+    }
+
+    @Test
+    public void testConfirmNotExist() {
+        // Given
+        String[] keys = new String[3];
+        for (int i = 1; i < 4; i++) {
+            keys[i - 1] = "Confirm" + i;
+        }
+        addExchanges(keys);
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Exchange-Confirm5");
+        // Then
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+    }
+
+    private void addExchanges(String... keys) {
+        for (String key : keys) {
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setExchangeId("Exchange-" + key);
+            aggregationRepository.add(camelContext, key, exchange);
+        }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = {"Scan1", "Scan2"};
+        addExchanges(keys);
+        // When
+        Set<String> exchangeIdSet = aggregationRepository.scan(camelContext);
+        // Then
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains(key));
+        }
+    }
+
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = {"Recover1", "Recover2"};
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(camelContext, 
"Recover2");
+        Exchange exchange3 = aggregationRepository.recover(camelContext, 
"Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java
new file mode 100644
index 0000000..5090c20
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class InfinispanLocalAggregationRepositoryTest extends CamelTestSupport 
{
+
+    private static final String MOCK_GOTCHA = "mock:gotcha";
+    private static final String DIRECT_ONE = "direct:one";
+
+    @EndpointInject(uri = MOCK_GOTCHA)
+    private MockEndpoint mock;
+
+    @Produce(uri = DIRECT_ONE)
+    private ProducerTemplate produceOne;
+
+
+    @Test
+    public void checkAggregationFromOneRoute() throws Exception {
+        final InfinispanLocalAggregationRepository repoOne =
+                new InfinispanLocalAggregationRepository("pippo");
+
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_ONE).routeId("AggregatingRouteOne")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoOne)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().start();
+
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21);
+
+        produceOne.sendBodyAndHeader(1, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+        produceOne.sendBodyAndHeader(5, correlator, correlator);
+        
+        produceOne.sendBodyAndHeader(6, correlator, correlator);
+        produceOne.sendBodyAndHeader(7, correlator, correlator);
+        produceOne.sendBodyAndHeader(20, correlator, correlator);
+        produceOne.sendBodyAndHeader(21, correlator, correlator);
+
+        mock.assertIsSatisfied();
+    }
+    
+    class SumOfIntsAggregationStrategy implements AggregationStrategy {
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            } else {
+                Integer n = newExchange.getIn().getBody(Integer.class);
+                Integer o = oldExchange.getIn().getBody(Integer.class);
+                Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+                oldExchange.getIn().setBody(v, Integer.class);
+                return oldExchange;
+            }
+        }
+    }
+
+}

Reply via email to