Repository: camel
Updated Branches:
  refs/heads/master 15404ced2 -> 6cfd38798


CAMEL-7853: Implement AggregationRepository based on Infinispan data grid - 
Added Infinispan Remote Aggregation Repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6cfd3879
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6cfd3879
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6cfd3879

Branch: refs/heads/master
Commit: 6cfd38798d18d3d9e06873f9d86cd118f4c28f09
Parents: 55ab6e1
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Sat Apr 23 12:14:10 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sat Apr 23 12:14:41 2016 +0200

----------------------------------------------------------------------
 .../InfinispanLocalAggregationRepository.java   |   8 +-
 .../InfinispanRemoteAggregationRepository.java  | 239 +++++++++++++++++++
 ...finispanRemoteAggregationRepositoryTest.java |  97 ++++++++
 3 files changed, 340 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6cfd3879/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
index b6bd872..cb47bfa 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java
@@ -27,7 +27,7 @@ import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
-import org.infinispan.Cache;
+import org.infinispan.commons.api.BasicCache;
 import org.infinispan.configuration.cache.Configuration;
 import org.infinispan.manager.DefaultCacheManager;
 import org.slf4j.Logger;
@@ -44,7 +44,7 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
     private long recoveryInterval = 5000;
     private int maximumRedeliveries = 3;
     private boolean allowSerializedHeaders;
-    private Cache<String, DefaultExchangeHolder> cache;
+    private BasicCache<String, DefaultExchangeHolder> cache;
     private Configuration configuration;
 
     /**
@@ -220,11 +220,11 @@ public class InfinispanLocalAggregationRepository extends 
ServiceSupport impleme
         this.allowSerializedHeaders = allowSerializedHeaders;
     }
 
-    public Cache<String, DefaultExchangeHolder> getCache() {
+    public BasicCache<String, DefaultExchangeHolder> getCache() {
         return cache;
     }
 
-    public void setCache(Cache<String, DefaultExchangeHolder> cache) {
+    public void setCache(BasicCache<String, DefaultExchangeHolder> cache) {
         this.cache = cache;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6cfd3879/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java
new file mode 100644
index 0000000..e2e7a34
--- /dev/null
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanRemoteAggregationRepository extends ServiceSupport 
implements RecoverableAggregationRepository {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InfinispanRemoteAggregationRepository.class.getName());
+    
+    private boolean useRecovery = true;
+    private RemoteCacheManager manager;
+    private String cacheName;
+    private String deadLetterChannel;
+    private long recoveryInterval = 5000;
+    private int maximumRedeliveries = 3;
+    private boolean allowSerializedHeaders;
+    private BasicCache<String, DefaultExchangeHolder> cache;
+    private Configuration configuration;
+
+    /**
+     * Creates new {@link InfinispanRemoteAggregationRepository} that defaults 
to non-optimistic locking
+     * with recoverable behavior and a local Infinispan cache. 
+     */
+    public InfinispanRemoteAggregationRepository() {
+    }
+    
+    /**
+     * Creates new {@link InfinispanRemoteAggregationRepository} that defaults 
to non-optimistic locking
+     * with recoverable behavior and a local Infinispan cache. 
+     * @param cacheName cache name
+     */
+    public InfinispanRemoteAggregationRepository(final String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    @Override
+    public Exchange add(final CamelContext camelContext, final String key, 
final Exchange exchange) {
+        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe 
manner.", exchange.getExchangeId(), key);
+        DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
+        DefaultExchangeHolder oldHolder = (DefaultExchangeHolder) 
cache.put(key, newHolder);
+        return unmarshallExchange(camelContext, oldHolder);
+    }
+
+    @Override
+    public Exchange get(CamelContext camelContext, String key) {
+        return unmarshallExchange(camelContext, (DefaultExchangeHolder) 
cache.get(key));
+    }
+
+    @Override
+    public void remove(CamelContext camelContext, String key, Exchange 
exchange) {
+        LOG.trace("Removing an exchange with ID {} for key {} ", 
exchange.getExchangeId(), key);
+        cache.remove(key);
+    }
+
+    @Override
+    public void confirm(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Confirming an exchange with ID {}.", exchangeId);
+        cache.remove(exchangeId);
+    }
+
+    @Override
+    public Set<String> getKeys() {
+        return Collections.unmodifiableSet(cache.keySet());
+    }
+
+    @Override
+    public Set<String> scan(CamelContext camelContext) {
+        LOG.trace("Scanning for exchanges to recover in {} context", 
camelContext.getName());
+        Set<String> scanned = Collections.unmodifiableSet(cache.keySet());
+        LOG.trace("Found {} keys for exchanges to recover in {} context", 
scanned.size(), camelContext.getName());
+        return scanned;
+    }
+
+    @Override
+    public Exchange recover(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
+        return useRecovery ? unmarshallExchange(camelContext, 
cache.get(exchangeId)) : null;
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryInterval = timeUnit.toMillis(interval);
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval) {
+        this.recoveryInterval = interval;
+    }
+    
+    @Override
+    public long getRecoveryIntervalInMillis() {
+        return recoveryInterval;
+    }
+
+    @Override
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
+    @Override
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    @Override
+    public void setDeadLetterUri(String deadLetterUri) {
+        this.deadLetterChannel = deadLetterUri;
+    }
+
+    @Override
+    public String getDeadLetterUri() {
+        return deadLetterChannel;
+    }
+
+    @Override
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    @Override
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (maximumRedeliveries < 0) {
+            throw new IllegalArgumentException("Maximum redelivery retries 
must be zero or a positive integer.");
+        }
+        if (recoveryInterval < 0) {
+            throw new IllegalArgumentException("Recovery interval must be zero 
or a positive integer.");
+        }
+        if (ObjectHelper.isEmpty(configuration)) {
+            manager = new RemoteCacheManager();
+            manager.start();
+        } else {
+            manager = new RemoteCacheManager(configuration);
+            manager.start();
+        }        
+        if (ObjectHelper.isEmpty(cacheName)) {
+            cache = manager.getCache();
+        } else {
+            cache = manager.getCache(cacheName);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        manager.stop();
+    }
+
+    protected Exchange unmarshallExchange(CamelContext camelContext, 
DefaultExchangeHolder holder) {
+        Exchange exchange = null;
+        if (holder != null) {
+            exchange = new DefaultExchange(camelContext);
+            DefaultExchangeHolder.unmarshal(exchange, holder);
+        }
+        return exchange;
+    }
+    
+    public RemoteCacheManager getManager() {
+        return manager;
+    }
+
+    public void setManager(RemoteCacheManager manager) {
+        this.manager = manager;
+    }
+
+    public String getCacheName() {
+        return cacheName;
+    }
+
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    public String getDeadLetterChannel() {
+        return deadLetterChannel;
+    }
+
+    public void setDeadLetterChannel(String deadLetterChannel) {
+        this.deadLetterChannel = deadLetterChannel;
+    }
+
+    public boolean isAllowSerializedHeaders() {
+        return allowSerializedHeaders;
+    }
+
+    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+        this.allowSerializedHeaders = allowSerializedHeaders;
+    }
+
+    public BasicCache<String, DefaultExchangeHolder> getCache() {
+        return cache;
+    }
+
+    public void setCache(BasicCache<String, DefaultExchangeHolder> cache) {
+        this.cache = cache;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(Configuration configuration) {
+        this.configuration = configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6cfd3879/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java
new file mode 100644
index 0000000..6566e4f
--- /dev/null
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.processor.aggregate;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("start a local server with: ./bin/standalone.sh")
+public class InfinispanRemoteAggregationRepositoryTest extends 
CamelTestSupport {
+
+    private static final String MOCK_GOTCHA = "mock:gotcha";
+    private static final String DIRECT_ONE = "direct:one";
+
+    @EndpointInject(uri = MOCK_GOTCHA)
+    private MockEndpoint mock;
+
+    @Produce(uri = DIRECT_ONE)
+    private ProducerTemplate produceOne;
+
+
+    @Test
+    public void checkAggregationFromOneRoute() throws Exception {
+        final InfinispanRemoteAggregationRepository repoOne =
+                new InfinispanRemoteAggregationRepository();
+
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_ONE).routeId("AggregatingRouteOne")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoOne)
+                        .aggregationStrategy(new 
SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().start();
+
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21);
+
+        produceOne.sendBodyAndHeader(1, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+        produceOne.sendBodyAndHeader(5, correlator, correlator);
+        
+        produceOne.sendBodyAndHeader(6, correlator, correlator);
+        produceOne.sendBodyAndHeader(7, correlator, correlator);
+        produceOne.sendBodyAndHeader(20, correlator, correlator);
+        produceOne.sendBodyAndHeader(21, correlator, correlator);
+
+        mock.assertIsSatisfied();
+    }
+    
+    class SumOfIntsAggregationStrategy implements AggregationStrategy {
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            } else {
+                Integer n = newExchange.getIn().getBody(Integer.class);
+                Integer o = oldExchange.getIn().getBody(Integer.class);
+                Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+                oldExchange.getIn().setBody(v, Integer.class);
+                return oldExchange;
+            }
+        }
+    }
+
+}

Reply via email to