Repository: camel Updated Branches: refs/heads/master 73410a0c7 -> d5e35c95b
CAMEL-11439 - Camel-Caffeine: Create an Aggregation Repository using Caffeine Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5e35c95 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5e35c95 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5e35c95 Branch: refs/heads/master Commit: d5e35c95b6d751ed4bf7c97c5e5737f49959864a Parents: 73410a0 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Jun 23 13:22:38 2017 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Jun 23 13:22:38 2017 +0200 ---------------------------------------------------------------------- .../CaffeineAggregationRepository.java | 205 +++++++++++++++++++ ...feineAggregationRepositoryOperationTest.java | 198 ++++++++++++++++++ ...CaffeineAggregationRepositoryRoutesTest.java | 103 ++++++++++ 3 files changed, 506 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java new file mode 100644 index 0000000..702a8e6 --- /dev/null +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.caffeine.processor.aggregate; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.ServiceSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CaffeineAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(CaffeineAggregationRepository.class); + + private CamelContext camelContext; + private Cache<String, DefaultExchangeHolder> cache; + private boolean allowSerializedHeaders; + + private boolean useRecovery = true; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public Cache<String, DefaultExchangeHolder> getCache() { + return cache; + } + + public void setCache(Cache<String, DefaultExchangeHolder> cache) { + this.cache = cache; + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + public String getDeadLetterChannel() { + return deadLetterChannel; + } + + public void setDeadLetterChannel(String deadLetterChannel) { + this.deadLetterChannel = deadLetterChannel; + } + + public long getRecoveryInterval() { + return recoveryInterval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setRecoveryInterval(long recoveryInterval) { + this.recoveryInterval = recoveryInterval; + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) { + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + + final DefaultExchangeHolder oldHolder = cache.getIfPresent(key); + final DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + + cache.put(key, newHolder); + + return unmarshallExchange(camelContext, oldHolder); + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + return unmarshallExchange(camelContext, cache.getIfPresent(key)); + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key); + cache.invalidate(key); + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + cache.invalidate(exchangeId); + } + + @Override + public Set<String> getKeys() { + Set<String> keys = cache.asMap().keySet(); + + return Collections.unmodifiableSet(keys); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + Set<String> scanned = Collections.unmodifiableSet(getKeys()); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); + return scanned; + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + return useRecovery ? unmarshallExchange(camelContext, cache.getIfPresent(exchangeId)) : null; + } + + @Override + protected void doStart() throws Exception { + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + + if (cache == null) { + Caffeine<Object, Object> builder = Caffeine.newBuilder(); + cache = builder.build(); + } + } + + @Override + protected void doStop() throws Exception { + } + + public static Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + + return exchange; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java new file mode 100644 index 0000000..754f7f3 --- /dev/null +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.caffeine.processor.aggregate; + +import java.util.Set; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class CaffeineAggregationRepositoryOperationTest extends CamelTestSupport { + private CaffeineAggregationRepository aggregationRepository; + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + aggregationRepository = new CaffeineAggregationRepository(); + aggregationRepository.start(); + } + + @Override + public void tearDown() throws Exception { + aggregationRepository.stop(); + super.tearDown(); + } + + private boolean exists(String key) { + DefaultExchangeHolder holder = aggregationRepository.getCache().getIfPresent(key); + if (holder == null) { + return false; + } + return true; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(context()); + // When + aggregationRepository.add(context(), key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(context()); + aggregationRepository.add(context(), key, exchange); + assertTrue(exists(key)); + + // When + Exchange exchange2 = aggregationRepository.get(context(), key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(context(), key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(context()); + aggregationRepository.add(context(), key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(context(), key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(context()); + assertFalse(exists(key)); + // When + aggregationRepository.remove(context(), key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = {"GetKeys1", "GetKeys2"}; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(context()); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(context(), key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context(), "Confirm_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys = new String[3]; + for (int i = 1; i < 4; i++) { + keys[i - 1] = "Confirm" + i; + } + addExchanges(keys); + for (String key : keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context(), "Exchange-Confirm5"); + // Then + for (String key : keys) { + assertTrue(exists(key)); + } + } + + private void addExchanges(String... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(context()); + exchange.setExchangeId("Exchange-" + key); + aggregationRepository.add(context(), key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = {"Scan1", "Scan2"}; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(context()); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains(key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = {"Recover1", "Recover2"}; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(context(), "Recover2"); + Exchange exchange3 = aggregationRepository.recover(context(), "Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java new file mode 100644 index 0000000..2c60215 --- /dev/null +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.caffeine.processor.aggregate; + +import java.util.Arrays; +import java.util.Random; +import java.util.stream.IntStream; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class CaffeineAggregationRepositoryRoutesTest extends CamelTestSupport { + private static final String ENDPOINT_MOCK = "mock:result"; + private static final String ENDPOINT_DIRECT = "direct:one"; + private static final int[] VALUES = generateRandomArrayOfInt(10, 0, 30); + private static final int SUM = IntStream.of(VALUES).reduce(0, (a, b) -> a + b); + private static final String CORRELATOR = "CORRELATOR"; + + @EndpointInject(uri = ENDPOINT_MOCK) + private MockEndpoint mock; + + @Produce(uri = ENDPOINT_DIRECT) + private ProducerTemplate producer; + + @Test + public void checkAggregationFromOneRoute() throws Exception { + mock.expectedMessageCount(VALUES.length); + mock.expectedBodiesReceived(SUM); + + IntStream.of(VALUES).forEach( + i -> producer.sendBodyAndHeader(i, CORRELATOR, CORRELATOR) + ); + + mock.assertIsSatisfied(); + } + + private Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } else { + Integer n = newExchange.getIn().getBody(Integer.class); + Integer o = oldExchange.getIn().getBody(Integer.class); + Integer v = (o == null ? 0 : o) + (n == null ? 0 : n); + + oldExchange.getIn().setBody(v, Integer.class); + + return oldExchange; + } + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(ENDPOINT_DIRECT) + .routeId("AggregatingRouteOne") + .aggregate(header(CORRELATOR)) + .aggregationRepository(createAggregateRepository()) + .aggregationStrategy(CaffeineAggregationRepositoryRoutesTest.this::aggregate) + .completionSize(VALUES.length) + .to("log:org.apache.camel.component.caffeine.processor.aggregate?level=INFO&showAll=true&multiline=true") + .to(ENDPOINT_MOCK); + } + }; + } + + protected static int[] generateRandomArrayOfInt(int size, int lower, int upper) { + Random random = new Random(); + int[] array = new int[size]; + + Arrays.setAll(array, i -> random.nextInt(upper - lower) + lower); + + return array; + } + + protected CaffeineAggregationRepository createAggregateRepository() throws Exception { + CaffeineAggregationRepository repository = new CaffeineAggregationRepository(); + + return repository; + } +}