Repository: camel Updated Branches: refs/heads/master 15404ced2 -> 6cfd38798
CAMEL-7853: Implement AggregationRepository based on Infinispan data grid - Added Infinispan Remote Aggregation Repository Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6cfd3879 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6cfd3879 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6cfd3879 Branch: refs/heads/master Commit: 6cfd38798d18d3d9e06873f9d86cd118f4c28f09 Parents: 55ab6e1 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sat Apr 23 12:14:10 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sat Apr 23 12:14:41 2016 +0200 ---------------------------------------------------------------------- .../InfinispanLocalAggregationRepository.java | 8 +- .../InfinispanRemoteAggregationRepository.java | 239 +++++++++++++++++++ ...finispanRemoteAggregationRepositoryTest.java | 97 ++++++++ 3 files changed, 340 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6cfd3879/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java index b6bd872..cb47bfa 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java @@ -27,7 +27,7 @@ import org.apache.camel.impl.DefaultExchangeHolder; import org.apache.camel.spi.RecoverableAggregationRepository; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; -import org.infinispan.Cache; +import org.infinispan.commons.api.BasicCache; import org.infinispan.configuration.cache.Configuration; import org.infinispan.manager.DefaultCacheManager; import org.slf4j.Logger; @@ -44,7 +44,7 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme private long recoveryInterval = 5000; private int maximumRedeliveries = 3; private boolean allowSerializedHeaders; - private Cache<String, DefaultExchangeHolder> cache; + private BasicCache<String, DefaultExchangeHolder> cache; private Configuration configuration; /** @@ -220,11 +220,11 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme this.allowSerializedHeaders = allowSerializedHeaders; } - public Cache<String, DefaultExchangeHolder> getCache() { + public BasicCache<String, DefaultExchangeHolder> getCache() { return cache; } - public void setCache(Cache<String, DefaultExchangeHolder> cache) { + public void setCache(BasicCache<String, DefaultExchangeHolder> cache) { this.cache = cache; } http://git-wip-us.apache.org/repos/asf/camel/blob/6cfd3879/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java new file mode 100644 index 0000000..e2e7a34 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepository.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.aggregate; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.Configuration; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.manager.DefaultCacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfinispanRemoteAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { + + private static final Logger LOG = LoggerFactory.getLogger(InfinispanRemoteAggregationRepository.class.getName()); + + private boolean useRecovery = true; + private RemoteCacheManager manager; + private String cacheName; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + private boolean allowSerializedHeaders; + private BasicCache<String, DefaultExchangeHolder> cache; + private Configuration configuration; + + /** + * Creates new {@link InfinispanRemoteAggregationRepository} that defaults to non-optimistic locking + * with recoverable behavior and a local Infinispan cache. + */ + public InfinispanRemoteAggregationRepository() { + } + + /** + * Creates new {@link InfinispanRemoteAggregationRepository} that defaults to non-optimistic locking + * with recoverable behavior and a local Infinispan cache. + * @param cacheName cache name + */ + public InfinispanRemoteAggregationRepository(final String cacheName) { + this.cacheName = cacheName; + } + + @Override + public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) { + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + DefaultExchangeHolder oldHolder = (DefaultExchangeHolder) cache.put(key, newHolder); + return unmarshallExchange(camelContext, oldHolder); + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + return unmarshallExchange(camelContext, (DefaultExchangeHolder) cache.get(key)); + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key); + cache.remove(key); + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + cache.remove(exchangeId); + } + + @Override + public Set<String> getKeys() { + return Collections.unmodifiableSet(cache.keySet()); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + Set<String> scanned = Collections.unmodifiableSet(cache.keySet()); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); + return scanned; + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + return useRecovery ? unmarshallExchange(camelContext, cache.get(exchangeId)) : null; + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + protected void doStart() throws Exception { + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + if (ObjectHelper.isEmpty(configuration)) { + manager = new RemoteCacheManager(); + manager.start(); + } else { + manager = new RemoteCacheManager(configuration); + manager.start(); + } + if (ObjectHelper.isEmpty(cacheName)) { + cache = manager.getCache(); + } else { + cache = manager.getCache(cacheName); + } + } + + @Override + protected void doStop() throws Exception { + manager.stop(); + } + + protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + return exchange; + } + + public RemoteCacheManager getManager() { + return manager; + } + + public void setManager(RemoteCacheManager manager) { + this.manager = manager; + } + + public String getCacheName() { + return cacheName; + } + + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + public String getDeadLetterChannel() { + return deadLetterChannel; + } + + public void setDeadLetterChannel(String deadLetterChannel) { + this.deadLetterChannel = deadLetterChannel; + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + public BasicCache<String, DefaultExchangeHolder> getCache() { + return cache; + } + + public void setCache(BasicCache<String, DefaultExchangeHolder> cache) { + this.cache = cache; + } + + public Configuration getConfiguration() { + return configuration; + } + + public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6cfd3879/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java new file mode 100644 index 0000000..6566e4f --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanRemoteAggregationRepositoryTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.aggregate; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("start a local server with: ./bin/standalone.sh") +public class InfinispanRemoteAggregationRepositoryTest extends CamelTestSupport { + + private static final String MOCK_GOTCHA = "mock:gotcha"; + private static final String DIRECT_ONE = "direct:one"; + + @EndpointInject(uri = MOCK_GOTCHA) + private MockEndpoint mock; + + @Produce(uri = DIRECT_ONE) + private ProducerTemplate produceOne; + + + @Test + public void checkAggregationFromOneRoute() throws Exception { + final InfinispanRemoteAggregationRepository repoOne = + new InfinispanRemoteAggregationRepository(); + + final int completionSize = 4; + final String correlator = "CORRELATOR"; + RouteBuilder rbOne = new RouteBuilder() { + @Override + public void configure() throws Exception { + + from(DIRECT_ONE).routeId("AggregatingRouteOne") + .aggregate(header(correlator)) + .aggregationRepository(repoOne) + .aggregationStrategy(new SumOfIntsAggregationStrategy()) + .completionSize(completionSize) + .to(MOCK_GOTCHA); + } + }; + + context().addRoutes(rbOne); + context().start(); + + mock.expectedMessageCount(2); + mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21); + + produceOne.sendBodyAndHeader(1, correlator, correlator); + produceOne.sendBodyAndHeader(3, correlator, correlator); + produceOne.sendBodyAndHeader(4, correlator, correlator); + produceOne.sendBodyAndHeader(5, correlator, correlator); + + produceOne.sendBodyAndHeader(6, correlator, correlator); + produceOne.sendBodyAndHeader(7, correlator, correlator); + produceOne.sendBodyAndHeader(20, correlator, correlator); + produceOne.sendBodyAndHeader(21, correlator, correlator); + + mock.assertIsSatisfied(); + } + + class SumOfIntsAggregationStrategy implements AggregationStrategy { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } else { + Integer n = newExchange.getIn().getBody(Integer.class); + Integer o = oldExchange.getIn().getBody(Integer.class); + Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); + oldExchange.getIn().setBody(v, Integer.class); + return oldExchange; + } + } + } + +}