CAMEL-7853: Implement AggregationRepository based on Infinispan data grid - Local aggregation Repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/13ccd90b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/13ccd90b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/13ccd90b Branch: refs/heads/master Commit: 13ccd90bc5b2f5f06eecbc767f2f131ecf3b8364 Parents: 15404ce Author: Andrea Cosentino <anco...@gmail.com> Authored: Sat Apr 23 12:14:00 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sat Apr 23 12:14:41 2016 +0200 ---------------------------------------------------------------------- .../InfinispanLocalAggregationRepository.java | 220 +++++++++++++++++++ ...spanAggregationRepositoryOperationsTest.java | 208 ++++++++++++++++++ .../InfinispanAggregationRepositoryTest.java | 97 ++++++++ 3 files changed, 525 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/13ccd90b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java new file mode 100644 index 0000000..13374b9 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java @@ -0,0 +1,220 @@ +package org.apache.camel.component.infinispan.processor.aggregate; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.infinispan.Cache; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.manager.DefaultCacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfinispanLocalAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { + + private static final Logger LOG = LoggerFactory.getLogger(InfinispanLocalAggregationRepository.class.getName()); + + private boolean useRecovery = true; + private DefaultCacheManager manager; + private String cacheName; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + private boolean allowSerializedHeaders; + private Cache<String, DefaultExchangeHolder> cache; + private Configuration configuration; + + /** + * Creates new {@link InfinispanLocalAggregationRepository} that defaults to non-optimistic locking + * with recoverable behavior and a local Infinispan cache. + */ + public InfinispanLocalAggregationRepository() { + } + + /** + * Creates new {@link InfinispanLocalAggregationRepository} that defaults to non-optimistic locking + * with recoverable behavior and a local Infinispan cache. + * @param cacheName cache name + */ + public InfinispanLocalAggregationRepository(final String cacheName) { + this.cacheName = cacheName; + } + + @Override + public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) { + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + DefaultExchangeHolder oldHolder = (DefaultExchangeHolder) cache.put(key, newHolder); + return unmarshallExchange(camelContext, oldHolder); + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + return unmarshallExchange(camelContext, (DefaultExchangeHolder) cache.get(key)); + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key); + cache.remove(key); + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + cache.remove(exchangeId); + } + + @Override + public Set<String> getKeys() { + return Collections.unmodifiableSet(cache.keySet()); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + Set<String> scanned = Collections.unmodifiableSet(cache.keySet()); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); + return scanned; + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + return useRecovery ? unmarshallExchange(camelContext, cache.get(exchangeId)) : null; + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + protected void doStart() throws Exception { + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + if (ObjectHelper.isEmpty(configuration)) { + manager = new DefaultCacheManager(); + } else { + manager = new DefaultCacheManager(configuration); + } + if (ObjectHelper.isEmpty(cacheName)) { + cache = manager.getCache(); + } else { + cache = manager.getCache(cacheName); + } + } + + @Override + protected void doStop() throws Exception { + + } + + protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + return exchange; + } + + public DefaultCacheManager getManager() { + return manager; + } + + public void setManager(DefaultCacheManager manager) { + this.manager = manager; + } + + public String getCacheName() { + return cacheName; + } + + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + public String getDeadLetterChannel() { + return deadLetterChannel; + } + + public void setDeadLetterChannel(String deadLetterChannel) { + this.deadLetterChannel = deadLetterChannel; + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + public Cache<String, DefaultExchangeHolder> getCache() { + return cache; + } + + public void setCache(Cache<String, DefaultExchangeHolder> cache) { + this.cache = cache; + } + + public Configuration getConfiguration() { + return configuration; + } + + public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/13ccd90b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java new file mode 100644 index 0000000..11312c5 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.aggregate; + +import java.util.Set; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Unite test for {@link InfinispanLocalAggregationRepository} + */ +public class InfinispanAggregationRepositoryOperationsTest { + + private static InfinispanLocalAggregationRepository aggregationRepository; + private CamelContext camelContext = new DefaultCamelContext(); + + @BeforeClass + public static void starting() throws Exception { + aggregationRepository = new InfinispanLocalAggregationRepository("pippo"); + aggregationRepository.start(); + } + + @AfterClass + public static void stopping() throws Exception { + aggregationRepository.stop(); + } + + private boolean exists(String key) { + DefaultExchangeHolder holder = aggregationRepository.getCache().get(key); + if (holder == null) return false; + else return true; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(camelContext); + // When + aggregationRepository.add(camelContext, key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(camelContext); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + + // When + Exchange exchange2 = aggregationRepository.get(camelContext, key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(camelContext, key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(camelContext); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(camelContext, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(camelContext); + assertFalse(exists(key)); + // When + aggregationRepository.remove(camelContext, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = {"GetKeys1", "GetKeys2"}; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(camelContext); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Confirm_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys = new String[3]; + for (int i = 1; i < 4; i++) { + keys[i - 1] = "Confirm" + i; + } + addExchanges(keys); + for (String key : keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Exchange-Confirm5"); + // Then + for (String key : keys) { + assertTrue(exists(key)); + } + } + + private void addExchanges(String... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(camelContext); + exchange.setExchangeId("Exchange-" + key); + aggregationRepository.add(camelContext, key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = {"Scan1", "Scan2"}; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(camelContext); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains(key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = {"Recover1", "Recover2"}; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(camelContext, "Recover2"); + Exchange exchange3 = aggregationRepository.recover(camelContext, "Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/13ccd90b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java new file mode 100644 index 0000000..b656662 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.aggregate; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.infinispan.processor.aggregate.InfinispanLocalAggregationRepository; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + + +public class InfinispanAggregationRepositoryTest extends CamelTestSupport { + + private static final String MOCK_GOTCHA = "mock:gotcha"; + private static final String DIRECT_ONE = "direct:one"; + + @EndpointInject(uri = MOCK_GOTCHA) + private MockEndpoint mock; + + @Produce(uri = DIRECT_ONE) + private ProducerTemplate produceOne; + + + @Test + public void checkAggregationFromOneRoute() throws Exception { + final InfinispanLocalAggregationRepository repoOne = + new InfinispanLocalAggregationRepository("pippo"); + + final int completionSize = 4; + final String correlator = "CORRELATOR"; + RouteBuilder rbOne = new RouteBuilder() { + @Override + public void configure() throws Exception { + + from(DIRECT_ONE).routeId("AggregatingRouteOne") + .aggregate(header(correlator)) + .aggregationRepository(repoOne) + .aggregationStrategy(new SumOfIntsAggregationStrategy()) + .completionSize(completionSize) + .to(MOCK_GOTCHA); + } + }; + + context().addRoutes(rbOne); + context().start(); + + mock.expectedMessageCount(2); + mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21); + + produceOne.sendBodyAndHeader(1, correlator, correlator); + produceOne.sendBodyAndHeader(3, correlator, correlator); + produceOne.sendBodyAndHeader(4, correlator, correlator); + produceOne.sendBodyAndHeader(5, correlator, correlator); + + produceOne.sendBodyAndHeader(6, correlator, correlator); + produceOne.sendBodyAndHeader(7, correlator, correlator); + produceOne.sendBodyAndHeader(20, correlator, correlator); + produceOne.sendBodyAndHeader(21, correlator, correlator); + + mock.assertIsSatisfied(); + } + + class SumOfIntsAggregationStrategy implements AggregationStrategy { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } else { + Integer n = newExchange.getIn().getBody(Integer.class); + Integer o = oldExchange.getIn().getBody(Integer.class); + Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); + oldExchange.getIn().setBody(v, Integer.class); + return oldExchange; + } + } + } + +}