CAMEL-7853: Implement AggregationRepository based on Infinispan data grid - 
Local aggregation Repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/13ccd90b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/13ccd90b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/13ccd90b

Branch: refs/heads/master
Commit: 13ccd90bc5b2f5f06eecbc767f2f131ecf3b8364
Parents: 15404ce
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Sat Apr 23 12:14:00 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sat Apr 23 12:14:41 2016 +0200

----------------------------------------------------------------------
 .../InfinispanLocalAggregationRepository.java   | 220 +++++++++++++++++++
 ...spanAggregationRepositoryOperationsTest.java | 208 ++++++++++++++++++
 .../InfinispanAggregationRepositoryTest.java    |  97 ++++++++
 3 files changed, 525 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/13ccd90b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
new file mode 100644
index 0000000..13374b9
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
@@ -0,0 +1,220 @@
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.infinispan.Cache;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.manager.DefaultCacheManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanLocalAggregationRepository extends ServiceSupport 
implements RecoverableAggregationRepository {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InfinispanLocalAggregationRepository.class.getName());
+    
+    private boolean useRecovery = true;
+    private DefaultCacheManager manager;
+    private String cacheName;
+    private String deadLetterChannel;
+    private long recoveryInterval = 5000;
+    private int maximumRedeliveries = 3;
+    private boolean allowSerializedHeaders;
+    private Cache<String, DefaultExchangeHolder> cache;
+    private Configuration configuration;
+
+    /**
+     * Creates new {@link InfinispanLocalAggregationRepository} that defaults 
to non-optimistic locking
+     * with recoverable behavior and a local Infinispan cache. 
+     */
+    public InfinispanLocalAggregationRepository() {
+    }
+    
+    /**
+     * Creates new {@link InfinispanLocalAggregationRepository} that defaults 
to non-optimistic locking
+     * with recoverable behavior and a local Infinispan cache. 
+     * @param cacheName cache name
+     */
+    public InfinispanLocalAggregationRepository(final String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+       @Override
+    public Exchange add(final CamelContext camelContext, final String key, 
final Exchange exchange) {
+        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe 
manner.", exchange.getExchangeId(), key);
+        DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
+        DefaultExchangeHolder oldHolder = (DefaultExchangeHolder) 
cache.put(key, newHolder);
+        return unmarshallExchange(camelContext, oldHolder);
+    }
+
+       @Override
+       public Exchange get(CamelContext camelContext, String key) {
+        return unmarshallExchange(camelContext, (DefaultExchangeHolder) 
cache.get(key));
+       }
+
+       @Override
+       public void remove(CamelContext camelContext, String key, Exchange 
exchange) {
+        LOG.trace("Removing an exchange with ID {} for key {} ", 
exchange.getExchangeId(), key);
+        cache.remove(key);             
+       }
+
+       @Override
+       public void confirm(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Confirming an exchange with ID {}.", exchangeId);
+        cache.remove(exchangeId);      
+       }
+
+       @Override
+       public Set<String> getKeys() {
+        return Collections.unmodifiableSet(cache.keySet());
+       }
+
+       @Override
+       public Set<String> scan(CamelContext camelContext) {
+        LOG.trace("Scanning for exchanges to recover in {} context", 
camelContext.getName());
+        Set<String> scanned = Collections.unmodifiableSet(cache.keySet());
+        LOG.trace("Found {} keys for exchanges to recover in {} context", 
scanned.size(), camelContext.getName());
+        return scanned;
+       }
+
+       @Override
+       public Exchange recover(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
+        return useRecovery ? unmarshallExchange(camelContext, 
cache.get(exchangeId)) : null;
+       }
+
+    @Override
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryInterval = timeUnit.toMillis(interval);
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval) {
+        this.recoveryInterval = interval;
+    }
+    
+       @Override
+       public long getRecoveryIntervalInMillis() {
+        return recoveryInterval;
+       }
+
+    @Override
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
+    @Override
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    @Override
+    public void setDeadLetterUri(String deadLetterUri) {
+        this.deadLetterChannel = deadLetterUri;
+    }
+
+    @Override
+    public String getDeadLetterUri() {
+        return deadLetterChannel;
+    }
+
+    @Override
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    @Override
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+       @Override
+       protected void doStart() throws Exception {
+        if (maximumRedeliveries < 0) {
+            throw new IllegalArgumentException("Maximum redelivery retries 
must be zero or a positive integer.");
+        }
+        if (recoveryInterval < 0) {
+            throw new IllegalArgumentException("Recovery interval must be zero 
or a positive integer.");
+        }
+        if (ObjectHelper.isEmpty(configuration)) {
+            manager = new DefaultCacheManager();
+        } else {
+               manager = new DefaultCacheManager(configuration);               
+        }        
+        if (ObjectHelper.isEmpty(cacheName)) {
+            cache = manager.getCache();
+        } else {
+            cache = manager.getCache(cacheName);               
+        }
+       }
+
+       @Override
+       protected void doStop() throws Exception {
+               
+       }
+
+    protected Exchange unmarshallExchange(CamelContext camelContext, 
DefaultExchangeHolder holder) {
+        Exchange exchange = null;
+        if (holder != null) {
+            exchange = new DefaultExchange(camelContext);
+            DefaultExchangeHolder.unmarshal(exchange, holder);
+        }
+        return exchange;
+    }
+    
+       public DefaultCacheManager getManager() {
+               return manager;
+       }
+
+       public void setManager(DefaultCacheManager manager) {
+               this.manager = manager;
+       }
+
+       public String getCacheName() {
+               return cacheName;
+       }
+
+       public void setCacheName(String cacheName) {
+               this.cacheName = cacheName;
+       }
+
+       public String getDeadLetterChannel() {
+               return deadLetterChannel;
+       }
+
+       public void setDeadLetterChannel(String deadLetterChannel) {
+               this.deadLetterChannel = deadLetterChannel;
+       }
+
+       public boolean isAllowSerializedHeaders() {
+               return allowSerializedHeaders;
+       }
+
+       public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+               this.allowSerializedHeaders = allowSerializedHeaders;
+       }
+
+       public Cache<String, DefaultExchangeHolder> getCache() {
+               return cache;
+       }
+
+       public void setCache(Cache<String, DefaultExchangeHolder> cache) {
+               this.cache = cache;
+       }
+
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+
+       public void setConfiguration(Configuration configuration) {
+               this.configuration = configuration;
+       }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/13ccd90b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
new file mode 100644
index 0000000..11312c5
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unite test for {@link InfinispanLocalAggregationRepository}
+ */
+public class InfinispanAggregationRepositoryOperationsTest {
+
+    private static InfinispanLocalAggregationRepository aggregationRepository;
+    private CamelContext camelContext = new DefaultCamelContext();
+
+    @BeforeClass
+    public static void starting() throws Exception {
+        aggregationRepository = new 
InfinispanLocalAggregationRepository("pippo");
+        aggregationRepository.start();
+    }
+
+    @AfterClass
+    public static void stopping() throws Exception {
+        aggregationRepository.stop();
+    }
+
+    private boolean exists(String key) {
+        DefaultExchangeHolder holder = 
aggregationRepository.getCache().get(key);
+        if (holder == null) return false;
+        else return true;
+    }
+
+    @Test
+    public void testAdd() {
+        // Given
+        String key = "Add";
+        assertFalse(exists(key));
+        Exchange exchange = new DefaultExchange(camelContext);
+        // When
+        aggregationRepository.add(camelContext, key, exchange);
+        // Then
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testGetExists() {
+        // Given
+        String key = "Get_Exists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        aggregationRepository.add(camelContext, key, exchange);
+        assertTrue(exists(key));
+        
+        // When
+        Exchange exchange2 = aggregationRepository.get(camelContext, key);
+        // Then
+        assertNotNull(exchange2);
+        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+    }
+
+    @Test
+    public void testGetNotExists() {
+        // Given
+        String key = "Get_NotExists";
+        assertFalse(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(camelContext, key);
+        // Then
+        assertNull(exchange2);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        aggregationRepository.add(camelContext, key, exchange);
+        assertTrue(exists(key));
+        // When
+        aggregationRepository.remove(camelContext, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "RemoveNotExists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        assertFalse(exists(key));
+        // When
+        aggregationRepository.remove(camelContext, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testGetKeys() {
+        // Given
+        String[] keys = {"GetKeys1", "GetKeys2"};
+        addExchanges(keys);
+        // When
+        Set<String> keySet = aggregationRepository.getKeys();
+        // Then
+        for (String key : keys) {
+            assertTrue(keySet.contains(key));
+        }
+    }
+
+    @Test
+    public void testConfirmExist() {
+        // Given
+        for (int i = 1; i < 4; i++) {
+            String key = "Confirm_" + i;
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setExchangeId("Exchange_" + i);
+            aggregationRepository.add(camelContext, key, exchange);
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Confirm_2");
+        // Then
+        assertTrue(exists("Confirm_1"));
+        assertFalse(exists("Confirm_2"));
+        assertTrue(exists("Confirm_3"));
+    }
+
+    @Test
+    public void testConfirmNotExist() {
+        // Given
+        String[] keys = new String[3];
+        for (int i = 1; i < 4; i++) {
+            keys[i - 1] = "Confirm" + i;
+        }
+        addExchanges(keys);
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Exchange-Confirm5");
+        // Then
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+    }
+
+    private void addExchanges(String... keys) {
+        for (String key : keys) {
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setExchangeId("Exchange-" + key);
+            aggregationRepository.add(camelContext, key, exchange);
+        }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = {"Scan1", "Scan2"};
+        addExchanges(keys);
+        // When
+        Set<String> exchangeIdSet = aggregationRepository.scan(camelContext);
+        // Then
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains(key));
+        }
+    }
+
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = {"Recover1", "Recover2"};
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(camelContext, 
"Recover2");
+        Exchange exchange3 = aggregationRepository.recover(camelContext, 
"Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/13ccd90b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
new file mode 100644
index 0000000..b656662
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import 
org.apache.camel.component.infinispan.processor.aggregate.InfinispanLocalAggregationRepository;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class InfinispanAggregationRepositoryTest extends CamelTestSupport {
+
+    private static final String MOCK_GOTCHA = "mock:gotcha";
+    private static final String DIRECT_ONE = "direct:one";
+
+    @EndpointInject(uri = MOCK_GOTCHA)
+    private MockEndpoint mock;
+
+    @Produce(uri = DIRECT_ONE)
+    private ProducerTemplate produceOne;
+
+
+    @Test
+    public void checkAggregationFromOneRoute() throws Exception {
+        final InfinispanLocalAggregationRepository repoOne =
+                new InfinispanLocalAggregationRepository("pippo");
+
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_ONE).routeId("AggregatingRouteOne")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoOne)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().start();
+
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21);
+
+        produceOne.sendBodyAndHeader(1, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+        produceOne.sendBodyAndHeader(5, correlator, correlator);
+        
+        produceOne.sendBodyAndHeader(6, correlator, correlator);
+        produceOne.sendBodyAndHeader(7, correlator, correlator);
+        produceOne.sendBodyAndHeader(20, correlator, correlator);
+        produceOne.sendBodyAndHeader(21, correlator, correlator);
+
+        mock.assertIsSatisfied();
+    }
+    
+    class SumOfIntsAggregationStrategy implements AggregationStrategy {
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            } else {
+                Integer n = newExchange.getIn().getBody(Integer.class);
+                Integer o = oldExchange.getIn().getBody(Integer.class);
+                Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+                oldExchange.getIn().setBody(v, Integer.class);
+                return oldExchange;
+            }
+        }
+    }
+
+}

Reply via email to