Repository: camel Updated Branches: refs/heads/master 13b79316b -> ccfa6c527
CAMEL-9959 : Create an ehcache based aggregation repository Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ccfa6c52 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ccfa6c52 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ccfa6c52 Branch: refs/heads/master Commit: ccfa6c5275d5a8acc098a9f3e561c2d3963e0341 Parents: 13b7931 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed May 11 23:00:08 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed May 11 23:02:38 2016 +0200 ---------------------------------------------------------------------- .../aggregate/EhcacheAggregationRepository.java | 226 +++++++++++++++++++ .../component/ehcache/EhcacheTestSupport.java | 34 ++- ...cacheAggregationRepositoryOperationTest.java | 198 ++++++++++++++++ .../EhcacheAggregationRepositoryRoutesTest.java | 88 ++++++++ .../test/resources/ehcache/ehcache-config.xml | 4 + .../src/test/resources/log4j.properties | 2 +- 6 files changed, 546 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ccfa6c52/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepository.java b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepository.java new file mode 100644 index 0000000..0219982 --- /dev/null +++ b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepository.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ehcache.processor.aggregate; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.ehcache.Cache; +import org.ehcache.CacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EhcacheAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(EhcacheAggregationRepository.class); + + private CamelContext camelContext; + private CacheManager cacheManager; + private String cacheName; + private Cache<String, DefaultExchangeHolder> cache; + private boolean allowSerializedHeaders; + + private boolean useRecovery = true; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public CacheManager getCacheManager() { + return cacheManager; + } + + public void setCacheManager(CacheManager cacheManager) { + this.cacheManager = cacheManager; + } + + public String getCacheName() { + return cacheName; + } + + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + public Cache<String, DefaultExchangeHolder> getCache() { + return cache; + } + + public void setCache(Cache<String, DefaultExchangeHolder> cache) { + this.cache = cache; + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + public String getDeadLetterChannel() { + return deadLetterChannel; + } + + public void setDeadLetterChannel(String deadLetterChannel) { + this.deadLetterChannel = deadLetterChannel; + } + + public long getRecoveryInterval() { + return recoveryInterval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setRecoveryInterval(long recoveryInterval) { + this.recoveryInterval = recoveryInterval; + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) { + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + + final DefaultExchangeHolder oldHolder = cache.get(key); + final DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + + cache.put(key, newHolder); + + return unmarshallExchange(camelContext, oldHolder); + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + return unmarshallExchange(camelContext, cache.get(key)); + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key); + cache.remove(key); + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + cache.remove(exchangeId); + } + + @Override + public Set<String> getKeys() { + Set<String> keys = new HashSet<>(); + cache.forEach(e -> keys.add(e.getKey())); + + return Collections.unmodifiableSet(keys); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + Set<String> scanned = Collections.unmodifiableSet(getKeys()); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); + return scanned; + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + return useRecovery ? unmarshallExchange(camelContext, cache.get(exchangeId)) : null; + } + + @Override + protected void doStart() throws Exception { + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + + if (cache == null) { + ObjectHelper.notNull(cacheManager, "cacheManager"); + cache = cacheManager.getCache(cacheName, String.class, DefaultExchangeHolder.class); + } + } + + @Override + protected void doStop() throws Exception { + } + + public static Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + + return exchange; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ccfa6c52/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java index 3899ad4..3aec949 100644 --- a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java @@ -21,11 +21,14 @@ import java.net.URL; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.camel.builder.FluentProducerTemplate; +import org.apache.camel.component.ehcache.processor.aggregate.EhcacheAggregationRepository; +import org.apache.camel.impl.DefaultExchangeHolder; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.ehcache.Cache; @@ -43,6 +46,7 @@ public class EhcacheTestSupport extends CamelTestSupport { public static final String EHCACHE_CONFIG = "/ehcache/ehcache-config.xml"; public static final String TEST_CACHE_NAME = "mycache"; public static final String IDEMPOTENT_TEST_CACHE_NAME = "idempotent"; + public static final String AGGREGATE_TEST_CACHE_NAME = "aggregate"; @Rule public final TestName testName = new TestName(); @@ -82,27 +86,47 @@ public class EhcacheTestSupport extends CamelTestSupport { return cacheManager.getCache(TEST_CACHE_NAME, Object.class, Object.class); } - protected Cache<String, Boolean> getIdempotentCache() { return cacheManager.getCache(IDEMPOTENT_TEST_CACHE_NAME, String.class, Boolean.class); } - protected String generateRandomString() { + protected Cache<String, DefaultExchangeHolder> getAggregateCache() { + return cacheManager.getCache(AGGREGATE_TEST_CACHE_NAME, String.class, DefaultExchangeHolder.class); + } + + protected EhcacheAggregationRepository createAggregateRepository() throws Exception { + EhcacheAggregationRepository repository = new EhcacheAggregationRepository(); + repository.setCache(getAggregateCache()); + repository.setCacheName("aggregate"); + + return repository; + } + + protected static int[] generateRandomArrayOfInt(int size, int lower, int upper) { + Random random = new Random(); + int[] array = new int[size]; + + Arrays.setAll(array, i -> random.nextInt(upper - lower) + lower); + + return array; + } + + protected static String generateRandomString() { return UUID.randomUUID().toString(); } - protected String[] generateRandomArrayOfStrings(int size) { + protected static String[] generateRandomArrayOfStrings(int size) { String[] array = new String[size]; Arrays.setAll(array, i -> generateRandomString()); return array; } - protected List<String> generateRandomListOfStrings(int size) { + protected static List<String> generateRandomListOfStrings(int size) { return Arrays.asList(generateRandomArrayOfStrings(size)); } - protected Map<String, String> generateRandomMapOfString(int size) { + protected static Map<String, String> generateRandomMapOfString(int size) { return IntStream.range(0, size).boxed().collect(Collectors.toMap( i -> i + "-" + generateRandomString(), i -> i + "-" + generateRandomString() http://git-wip-us.apache.org/repos/asf/camel/blob/ccfa6c52/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryOperationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryOperationTest.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryOperationTest.java new file mode 100644 index 0000000..fbffcc9 --- /dev/null +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryOperationTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ehcache.processor.aggregate; + +import java.util.Set; + +import org.apache.camel.Exchange; +import org.apache.camel.component.ehcache.EhcacheTestSupport; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.junit.Test; + +public class EhcacheAggregationRepositoryOperationTest extends EhcacheTestSupport { + private EhcacheAggregationRepository aggregationRepository; + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + aggregationRepository = createAggregateRepository(); + aggregationRepository.start(); + } + + @Override + public void tearDown() throws Exception { + aggregationRepository.stop(); + super.tearDown(); + } + + private boolean exists(String key) { + DefaultExchangeHolder holder = aggregationRepository.getCache().get(key); + if (holder == null) { + return false; + } + return true; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(context()); + // When + aggregationRepository.add(context(), key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(context()); + aggregationRepository.add(context(), key, exchange); + assertTrue(exists(key)); + + // When + Exchange exchange2 = aggregationRepository.get(context(), key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(context(), key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(context()); + aggregationRepository.add(context(), key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(context(), key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(context()); + assertFalse(exists(key)); + // When + aggregationRepository.remove(context(), key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = {"GetKeys1", "GetKeys2"}; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(context()); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(context(), key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context(), "Confirm_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys = new String[3]; + for (int i = 1; i < 4; i++) { + keys[i - 1] = "Confirm" + i; + } + addExchanges(keys); + for (String key : keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context(), "Exchange-Confirm5"); + // Then + for (String key : keys) { + assertTrue(exists(key)); + } + } + + private void addExchanges(String... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(context()); + exchange.setExchangeId("Exchange-" + key); + aggregationRepository.add(context(), key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = {"Scan1", "Scan2"}; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(context()); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains(key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = {"Recover1", "Recover2"}; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(context(), "Recover2"); + Exchange exchange3 = aggregationRepository.recover(context(), "Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ccfa6c52/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryRoutesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryRoutesTest.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryRoutesTest.java new file mode 100644 index 0000000..133af4e --- /dev/null +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/aggregate/EhcacheAggregationRepositoryRoutesTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ehcache.processor.aggregate; + +import java.util.stream.IntStream; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.ehcache.EhcacheTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.junit.Test; + + +public class EhcacheAggregationRepositoryRoutesTest extends EhcacheTestSupport { + private static final String ENDPOINT_MOCK = "mock:result"; + private static final String ENDPOINT_DIRECT = "direct:one"; + private static final int[] VALUES = generateRandomArrayOfInt(10, 0, 30); + private static final int SUM = IntStream.of(VALUES).reduce(0, (a, b) -> a + b); + private static final String CORRELATOR = "CORRELATOR"; + + @EndpointInject(uri = ENDPOINT_MOCK) + private MockEndpoint mock; + + @Produce(uri = ENDPOINT_DIRECT) + private ProducerTemplate producer; + + @Test + public void checkAggregationFromOneRoute() throws Exception { + mock.expectedMessageCount(VALUES.length); + mock.expectedBodiesReceived(SUM); + + IntStream.of(VALUES).forEach( + i -> producer.sendBodyAndHeader(i, CORRELATOR, CORRELATOR) + ); + + mock.assertIsSatisfied(); + } + + private Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } else { + Integer n = newExchange.getIn().getBody(Integer.class); + Integer o = oldExchange.getIn().getBody(Integer.class); + Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); + + oldExchange.getIn().setBody(v, Integer.class); + + return oldExchange; + } + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(ENDPOINT_DIRECT) + .routeId("AggregatingRouteOne") + .aggregate(header(CORRELATOR)) + .aggregationRepository(createAggregateRepository()) + .aggregationStrategy(EhcacheAggregationRepositoryRoutesTest.this::aggregate) + .completionSize(VALUES.length) + .to("log:org.apache.camel.component.ehcache.processor.aggregate^level=INFO%showAll=true&mulltiline=true") + .to(ENDPOINT_MOCK); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ccfa6c52/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml b/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml index dd61f90..d5bf6a9 100644 --- a/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml +++ b/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml @@ -26,6 +26,10 @@ <ehcache:key-type>java.lang.String</ehcache:key-type> <ehcache:value-type>java.lang.Boolean</ehcache:value-type> </ehcache:cache> + <ehcache:cache alias="aggregate" uses-template="default-template"> + <ehcache:key-type>java.lang.String</ehcache:key-type> + <ehcache:value-type>org.apache.camel.impl.DefaultExchangeHolder</ehcache:value-type> + </ehcache:cache> <ehcache:cache-template name="default-template"> <ehcache:expiry> http://git-wip-us.apache.org/repos/asf/camel/blob/ccfa6c52/components/camel-ehcache/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/resources/log4j.properties b/components/camel-ehcache/src/test/resources/log4j.properties index 2cb2227..574952c 100644 --- a/components/camel-ehcache/src/test/resources/log4j.properties +++ b/components/camel-ehcache/src/test/resources/log4j.properties @@ -5,7 +5,7 @@ log4j.rootLogger=INFO, file # uncomment the following line to turn on Camel debugging #log4j.logger.org.apache.camel=DEBUG -log4j.logger.org.apache.camel.component.ehcache=DEBUG +log4j.logger.org.apache.camel.component.ehcache=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender