CAMEL-7853: Implement AggregationRepository based on Infinispan data grid - Fixed CS and minor fixes
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/55ab6e16 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/55ab6e16 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/55ab6e16 Branch: refs/heads/master Commit: 55ab6e164e366d866c803eb2d35540a6192b9071 Parents: 13ccd90 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sat Apr 23 12:14:05 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sat Apr 23 12:14:41 2016 +0200 ---------------------------------------------------------------------- .../InfinispanLocalAggregationRepository.java | 156 ++++++++------ ...spanAggregationRepositoryOperationsTest.java | 208 ------------------ .../InfinispanAggregationRepositoryTest.java | 97 --------- ...ocalAggregationRepositoryOperationsTest.java | 210 +++++++++++++++++++ ...nfinispanLocalAggregationRepositoryTest.java | 96 +++++++++ 5 files changed, 393 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java index 13374b9..b6bd872 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepository.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.infinispan.processor.aggregate; import java.util.Collections; @@ -47,7 +63,7 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme this.cacheName = cacheName; } - @Override + @Override public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) { LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); @@ -55,41 +71,41 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme return unmarshallExchange(camelContext, oldHolder); } - @Override - public Exchange get(CamelContext camelContext, String key) { + @Override + public Exchange get(CamelContext camelContext, String key) { return unmarshallExchange(camelContext, (DefaultExchangeHolder) cache.get(key)); - } + } - @Override - public void remove(CamelContext camelContext, String key, Exchange exchange) { + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key); - cache.remove(key); - } + cache.remove(key); + } - @Override - public void confirm(CamelContext camelContext, String exchangeId) { + @Override + public void confirm(CamelContext camelContext, String exchangeId) { LOG.trace("Confirming an exchange with ID {}.", exchangeId); - cache.remove(exchangeId); - } + cache.remove(exchangeId); + } - @Override - public Set<String> getKeys() { + @Override + public Set<String> getKeys() { return Collections.unmodifiableSet(cache.keySet()); - } + } - @Override - public Set<String> scan(CamelContext camelContext) { + @Override + public Set<String> scan(CamelContext camelContext) { LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); Set<String> scanned = Collections.unmodifiableSet(cache.keySet()); LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); return scanned; - } + } - @Override - public Exchange recover(CamelContext camelContext, String exchangeId) { + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { LOG.trace("Recovering an Exchange with ID {}.", exchangeId); return useRecovery ? unmarshallExchange(camelContext, cache.get(exchangeId)) : null; - } + } @Override public void setRecoveryInterval(long interval, TimeUnit timeUnit) { @@ -101,10 +117,10 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme this.recoveryInterval = interval; } - @Override - public long getRecoveryIntervalInMillis() { + @Override + public long getRecoveryIntervalInMillis() { return recoveryInterval; - } + } @Override public void setUseRecovery(boolean useRecovery) { @@ -136,8 +152,8 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme return maximumRedeliveries; } - @Override - protected void doStart() throws Exception { + @Override + protected void doStart() throws Exception { if (maximumRedeliveries < 0) { throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); } @@ -146,20 +162,22 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme } if (ObjectHelper.isEmpty(configuration)) { manager = new DefaultCacheManager(); + manager.start(); } else { - manager = new DefaultCacheManager(configuration); + manager = new DefaultCacheManager(configuration); + manager.start(); } if (ObjectHelper.isEmpty(cacheName)) { cache = manager.getCache(); } else { - cache = manager.getCache(cacheName); + cache = manager.getCache(cacheName); } - } + } - @Override - protected void doStop() throws Exception { - - } + @Override + protected void doStop() throws Exception { + manager.stop(); + } protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { Exchange exchange = null; @@ -170,51 +188,51 @@ public class InfinispanLocalAggregationRepository extends ServiceSupport impleme return exchange; } - public DefaultCacheManager getManager() { - return manager; - } + public DefaultCacheManager getManager() { + return manager; + } - public void setManager(DefaultCacheManager manager) { - this.manager = manager; - } + public void setManager(DefaultCacheManager manager) { + this.manager = manager; + } - public String getCacheName() { - return cacheName; - } + public String getCacheName() { + return cacheName; + } - public void setCacheName(String cacheName) { - this.cacheName = cacheName; - } + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } - public String getDeadLetterChannel() { - return deadLetterChannel; - } + public String getDeadLetterChannel() { + return deadLetterChannel; + } - public void setDeadLetterChannel(String deadLetterChannel) { - this.deadLetterChannel = deadLetterChannel; - } + public void setDeadLetterChannel(String deadLetterChannel) { + this.deadLetterChannel = deadLetterChannel; + } - public boolean isAllowSerializedHeaders() { - return allowSerializedHeaders; - } + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } - public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { - this.allowSerializedHeaders = allowSerializedHeaders; - } + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } - public Cache<String, DefaultExchangeHolder> getCache() { - return cache; - } + public Cache<String, DefaultExchangeHolder> getCache() { + return cache; + } - public void setCache(Cache<String, DefaultExchangeHolder> cache) { - this.cache = cache; - } + public void setCache(Cache<String, DefaultExchangeHolder> cache) { + this.cache = cache; + } - public Configuration getConfiguration() { - return configuration; - } + public Configuration getConfiguration() { + return configuration; + } - public void setConfiguration(Configuration configuration) { - this.configuration = configuration; - } + public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java deleted file mode 100644 index 11312c5..0000000 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryOperationsTest.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan.processor.aggregate; - -import java.util.Set; - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.impl.DefaultExchangeHolder; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * Unite test for {@link InfinispanLocalAggregationRepository} - */ -public class InfinispanAggregationRepositoryOperationsTest { - - private static InfinispanLocalAggregationRepository aggregationRepository; - private CamelContext camelContext = new DefaultCamelContext(); - - @BeforeClass - public static void starting() throws Exception { - aggregationRepository = new InfinispanLocalAggregationRepository("pippo"); - aggregationRepository.start(); - } - - @AfterClass - public static void stopping() throws Exception { - aggregationRepository.stop(); - } - - private boolean exists(String key) { - DefaultExchangeHolder holder = aggregationRepository.getCache().get(key); - if (holder == null) return false; - else return true; - } - - @Test - public void testAdd() { - // Given - String key = "Add"; - assertFalse(exists(key)); - Exchange exchange = new DefaultExchange(camelContext); - // When - aggregationRepository.add(camelContext, key, exchange); - // Then - assertTrue(exists(key)); - } - - @Test - public void testGetExists() { - // Given - String key = "Get_Exists"; - Exchange exchange = new DefaultExchange(camelContext); - aggregationRepository.add(camelContext, key, exchange); - assertTrue(exists(key)); - - // When - Exchange exchange2 = aggregationRepository.get(camelContext, key); - // Then - assertNotNull(exchange2); - assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); - } - - @Test - public void testGetNotExists() { - // Given - String key = "Get_NotExists"; - assertFalse(exists(key)); - // When - Exchange exchange2 = aggregationRepository.get(camelContext, key); - // Then - assertNull(exchange2); - } - - @Test - public void testRemoveExists() { - // Given - String key = "Remove_Exists"; - Exchange exchange = new DefaultExchange(camelContext); - aggregationRepository.add(camelContext, key, exchange); - assertTrue(exists(key)); - // When - aggregationRepository.remove(camelContext, key, exchange); - // Then - assertFalse(exists(key)); - } - - @Test - public void testRemoveNotExists() { - // Given - String key = "RemoveNotExists"; - Exchange exchange = new DefaultExchange(camelContext); - assertFalse(exists(key)); - // When - aggregationRepository.remove(camelContext, key, exchange); - // Then - assertFalse(exists(key)); - } - - @Test - public void testGetKeys() { - // Given - String[] keys = {"GetKeys1", "GetKeys2"}; - addExchanges(keys); - // When - Set<String> keySet = aggregationRepository.getKeys(); - // Then - for (String key : keys) { - assertTrue(keySet.contains(key)); - } - } - - @Test - public void testConfirmExist() { - // Given - for (int i = 1; i < 4; i++) { - String key = "Confirm_" + i; - Exchange exchange = new DefaultExchange(camelContext); - exchange.setExchangeId("Exchange_" + i); - aggregationRepository.add(camelContext, key, exchange); - assertTrue(exists(key)); - } - // When - aggregationRepository.confirm(camelContext, "Confirm_2"); - // Then - assertTrue(exists("Confirm_1")); - assertFalse(exists("Confirm_2")); - assertTrue(exists("Confirm_3")); - } - - @Test - public void testConfirmNotExist() { - // Given - String[] keys = new String[3]; - for (int i = 1; i < 4; i++) { - keys[i - 1] = "Confirm" + i; - } - addExchanges(keys); - for (String key : keys) { - assertTrue(exists(key)); - } - // When - aggregationRepository.confirm(camelContext, "Exchange-Confirm5"); - // Then - for (String key : keys) { - assertTrue(exists(key)); - } - } - - private void addExchanges(String... keys) { - for (String key : keys) { - Exchange exchange = new DefaultExchange(camelContext); - exchange.setExchangeId("Exchange-" + key); - aggregationRepository.add(camelContext, key, exchange); - } - } - - @Test - public void testScan() { - // Given - String[] keys = {"Scan1", "Scan2"}; - addExchanges(keys); - // When - Set<String> exchangeIdSet = aggregationRepository.scan(camelContext); - // Then - for (String key : keys) { - assertTrue(exchangeIdSet.contains(key)); - } - } - - @Test - public void testRecover() { - // Given - String[] keys = {"Recover1", "Recover2"}; - addExchanges(keys); - // When - Exchange exchange2 = aggregationRepository.recover(camelContext, "Recover2"); - Exchange exchange3 = aggregationRepository.recover(camelContext, "Recover3"); - // Then - assertNotNull(exchange2); - assertNull(exchange3); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java deleted file mode 100644 index b656662..0000000 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanAggregationRepositoryTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.infinispan.processor.aggregate; - -import org.apache.camel.EndpointInject; -import org.apache.camel.Exchange; -import org.apache.camel.Produce; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.infinispan.processor.aggregate.InfinispanLocalAggregationRepository; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.processor.aggregate.AggregationStrategy; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Test; - - -public class InfinispanAggregationRepositoryTest extends CamelTestSupport { - - private static final String MOCK_GOTCHA = "mock:gotcha"; - private static final String DIRECT_ONE = "direct:one"; - - @EndpointInject(uri = MOCK_GOTCHA) - private MockEndpoint mock; - - @Produce(uri = DIRECT_ONE) - private ProducerTemplate produceOne; - - - @Test - public void checkAggregationFromOneRoute() throws Exception { - final InfinispanLocalAggregationRepository repoOne = - new InfinispanLocalAggregationRepository("pippo"); - - final int completionSize = 4; - final String correlator = "CORRELATOR"; - RouteBuilder rbOne = new RouteBuilder() { - @Override - public void configure() throws Exception { - - from(DIRECT_ONE).routeId("AggregatingRouteOne") - .aggregate(header(correlator)) - .aggregationRepository(repoOne) - .aggregationStrategy(new SumOfIntsAggregationStrategy()) - .completionSize(completionSize) - .to(MOCK_GOTCHA); - } - }; - - context().addRoutes(rbOne); - context().start(); - - mock.expectedMessageCount(2); - mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21); - - produceOne.sendBodyAndHeader(1, correlator, correlator); - produceOne.sendBodyAndHeader(3, correlator, correlator); - produceOne.sendBodyAndHeader(4, correlator, correlator); - produceOne.sendBodyAndHeader(5, correlator, correlator); - - produceOne.sendBodyAndHeader(6, correlator, correlator); - produceOne.sendBodyAndHeader(7, correlator, correlator); - produceOne.sendBodyAndHeader(20, correlator, correlator); - produceOne.sendBodyAndHeader(21, correlator, correlator); - - mock.assertIsSatisfied(); - } - - class SumOfIntsAggregationStrategy implements AggregationStrategy { - @Override - public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { - if (oldExchange == null) { - return newExchange; - } else { - Integer n = newExchange.getIn().getBody(Integer.class); - Integer o = oldExchange.getIn().getBody(Integer.class); - Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); - oldExchange.getIn().setBody(v, Integer.class); - return oldExchange; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java new file mode 100644 index 0000000..f1265c7 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryOperationsTest.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.aggregate; + +import java.util.Set; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Unite test for {@link InfinispanLocalAggregationRepository} + */ +public class InfinispanLocalAggregationRepositoryOperationsTest { + + private static InfinispanLocalAggregationRepository aggregationRepository; + private CamelContext camelContext = new DefaultCamelContext(); + + @BeforeClass + public static void starting() throws Exception { + aggregationRepository = new InfinispanLocalAggregationRepository("pippo"); + aggregationRepository.start(); + } + + @AfterClass + public static void stopping() throws Exception { + aggregationRepository.stop(); + } + + private boolean exists(String key) { + DefaultExchangeHolder holder = aggregationRepository.getCache().get(key); + if (holder == null) { + return false; + } + return true; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(camelContext); + // When + aggregationRepository.add(camelContext, key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(camelContext); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + + // When + Exchange exchange2 = aggregationRepository.get(camelContext, key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(camelContext, key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(camelContext); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(camelContext, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(camelContext); + assertFalse(exists(key)); + // When + aggregationRepository.remove(camelContext, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = {"GetKeys1", "GetKeys2"}; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(camelContext); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Confirm_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys = new String[3]; + for (int i = 1; i < 4; i++) { + keys[i - 1] = "Confirm" + i; + } + addExchanges(keys); + for (String key : keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Exchange-Confirm5"); + // Then + for (String key : keys) { + assertTrue(exists(key)); + } + } + + private void addExchanges(String... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(camelContext); + exchange.setExchangeId("Exchange-" + key); + aggregationRepository.add(camelContext, key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = {"Scan1", "Scan2"}; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(camelContext); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains(key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = {"Recover1", "Recover2"}; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(camelContext, "Recover2"); + Exchange exchange3 = aggregationRepository.recover(camelContext, "Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/55ab6e16/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java new file mode 100644 index 0000000..5090c20 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/aggregate/InfinispanLocalAggregationRepositoryTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan.processor.aggregate; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + + +public class InfinispanLocalAggregationRepositoryTest extends CamelTestSupport { + + private static final String MOCK_GOTCHA = "mock:gotcha"; + private static final String DIRECT_ONE = "direct:one"; + + @EndpointInject(uri = MOCK_GOTCHA) + private MockEndpoint mock; + + @Produce(uri = DIRECT_ONE) + private ProducerTemplate produceOne; + + + @Test + public void checkAggregationFromOneRoute() throws Exception { + final InfinispanLocalAggregationRepository repoOne = + new InfinispanLocalAggregationRepository("pippo"); + + final int completionSize = 4; + final String correlator = "CORRELATOR"; + RouteBuilder rbOne = new RouteBuilder() { + @Override + public void configure() throws Exception { + + from(DIRECT_ONE).routeId("AggregatingRouteOne") + .aggregate(header(correlator)) + .aggregationRepository(repoOne) + .aggregationStrategy(new SumOfIntsAggregationStrategy()) + .completionSize(completionSize) + .to(MOCK_GOTCHA); + } + }; + + context().addRoutes(rbOne); + context().start(); + + mock.expectedMessageCount(2); + mock.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21); + + produceOne.sendBodyAndHeader(1, correlator, correlator); + produceOne.sendBodyAndHeader(3, correlator, correlator); + produceOne.sendBodyAndHeader(4, correlator, correlator); + produceOne.sendBodyAndHeader(5, correlator, correlator); + + produceOne.sendBodyAndHeader(6, correlator, correlator); + produceOne.sendBodyAndHeader(7, correlator, correlator); + produceOne.sendBodyAndHeader(20, correlator, correlator); + produceOne.sendBodyAndHeader(21, correlator, correlator); + + mock.assertIsSatisfied(); + } + + class SumOfIntsAggregationStrategy implements AggregationStrategy { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } else { + Integer n = newExchange.getIn().getBody(Integer.class); + Integer o = oldExchange.getIn().getBody(Integer.class); + Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); + oldExchange.getIn().setBody(v, Integer.class); + return oldExchange; + } + } + } + +}