This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a402c15 [CAMEL-17481] Caffeine cache improvements (#6728) a402c15 is described below commit a402c1543eb4aeeedd80bab14983219b33a93f7a Author: Andreas Klug <andreas.kl...@de.bosch.com> AuthorDate: Fri Jan 14 08:38:00 2022 +0100 [CAMEL-17481] Caffeine cache improvements (#6728) * CAMEL-17481: Caffeine cache improvements * CAMEL-17481: Caffeine cache improvements (rework) Co-authored-by: Klug Andreas (CI/XDM1) <kga...@bosch.com> --- .../src/main/docs/caffeine-cache-component.adoc | 4 ++ .../component/caffeine/CaffeineConstants.java | 1 + .../caffeine/cache/CaffeineCacheEndpoint.java | 40 +++++++++------- .../caffeine/cache/CaffeineCacheProducer.java | 18 +++++++- .../caffeine/load/CaffeineLoadCacheEndpoint.java | 42 ++++++++++------- .../caffeine/load/CaffeineLoadCacheProducer.java | 17 ++++++- .../caffeine/cache/CaffeineCacheProducerTest.java | 46 +++++++++++++++++++ ... CaffeineCacheRemovalListenerProducerTest.java} | 2 +- .../caffeine/cache/MetricsStatsCounter.java | 8 ++-- .../loadcache/CaffeineLoadCacheProducerTest.java | 53 ++++++++++++++++++++++ 10 files changed, 191 insertions(+), 40 deletions(-) diff --git a/components/camel-caffeine/src/main/docs/caffeine-cache-component.adoc b/components/camel-caffeine/src/main/docs/caffeine-cache-component.adoc index 0d33c12..543824b 100644 --- a/components/camel-caffeine/src/main/docs/caffeine-cache-component.adoc +++ b/components/camel-caffeine/src/main/docs/caffeine-cache-component.adoc @@ -56,6 +56,10 @@ You can use your cache with the following code: [source,java] ------------------------------------------------------------ + +@BindToRegistry("cache") +Cache cache = Caffeine.newBuilder().recordStats().build(); + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/CaffeineConstants.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/CaffeineConstants.java index e5d551e..e7ad766 100644 --- a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/CaffeineConstants.java +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/CaffeineConstants.java @@ -32,4 +32,5 @@ public interface CaffeineConstants { String ACTION_GET_ALL = "GET_ALL"; String ACTION_INVALIDATE = "INVALIDATE"; String ACTION_INVALIDATE_ALL = "INVALIDATE_ALL"; + String ACTION_AS_MAP = "AS_MAP"; } diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheEndpoint.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheEndpoint.java index bf64db8..521bfe5 100644 --- a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheEndpoint.java +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheEndpoint.java @@ -64,27 +64,33 @@ public class CaffeineCacheEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { + cache = CamelContextHelper.lookup(getCamelContext(), cacheName, Cache.class); if (cache == null) { - Caffeine<?, ?> builder = Caffeine.newBuilder(); - if (configuration.getEvictionType() == EvictionType.SIZE_BASED) { - builder.initialCapacity(configuration.getInitialCapacity()); - builder.maximumSize(configuration.getMaximumSize()); - } else if (configuration.getEvictionType() == EvictionType.TIME_BASED) { - builder.expireAfterAccess(configuration.getExpireAfterAccessTime(), TimeUnit.SECONDS); - builder.expireAfterWrite(configuration.getExpireAfterWriteTime(), TimeUnit.SECONDS); - } - if (configuration.isStatsEnabled()) { - if (ObjectHelper.isEmpty(configuration.getStatsCounter())) { - builder.recordStats(); - } else { - builder.recordStats(configuration::getStatsCounter); + if (configuration.isCreateCacheIfNotExist()) { + Caffeine<?, ?> builder = Caffeine.newBuilder(); + if (configuration.getEvictionType() == EvictionType.SIZE_BASED) { + builder.initialCapacity(configuration.getInitialCapacity()); + builder.maximumSize(configuration.getMaximumSize()); + } else if (configuration.getEvictionType() == EvictionType.TIME_BASED) { + builder.expireAfterAccess(configuration.getExpireAfterAccessTime(), TimeUnit.SECONDS); + builder.expireAfterWrite(configuration.getExpireAfterWriteTime(), TimeUnit.SECONDS); } + if (configuration.isStatsEnabled()) { + if (ObjectHelper.isEmpty(configuration.getStatsCounter())) { + builder.recordStats(); + } else { + builder.recordStats(configuration::getStatsCounter); + } + } + if (ObjectHelper.isNotEmpty(configuration.getRemovalListener())) { + builder.removalListener(configuration.getRemovalListener()); + } + cache = builder.build(); + } else { + throw new IllegalArgumentException( + "Cache instance '" + cacheName + "' not found and createCacheIfNotExist is set to false"); } - if (ObjectHelper.isNotEmpty(configuration.getRemovalListener())) { - builder.removalListener(configuration.getRemovalListener()); - } - cache = builder.build(); } super.doStart(); } diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducer.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducer.java index 8a61087..713b82e 100644 --- a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducer.java +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducer.java @@ -87,11 +87,27 @@ public class CaffeineCacheProducer extends HeaderSelectorProducer { @InvokeOnHeader(CaffeineConstants.ACTION_INVALIDATE_ALL) public void onInvalidateAll(Message message) { - cache.invalidateAll(message.getHeader(CaffeineConstants.KEYS, Collections::emptySet, Set.class)); + + Set<?> keys = message.getHeader(CaffeineConstants.KEYS, Set.class); + /* Empty cache if no key set is provided + - implies no deletions at all if an empty key set is provided */ + if (keys == null) { + cache.invalidateAll(); + } else { + cache.invalidateAll(keys); + } setResult(message, true, null, null); } + @InvokeOnHeader(CaffeineConstants.ACTION_AS_MAP) + public void onAsMap(Message message) { + Map<?, ?> result = cache.asMap(); + + message.setHeader(CaffeineConstants.KEYS, result.keySet()); + setResult(message, true, result, null); + } + // **************************** // Helpers // **************************** diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheEndpoint.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheEndpoint.java index 6e09285..c15e89e 100644 --- a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheEndpoint.java +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheEndpoint.java @@ -18,6 +18,7 @@ package org.apache.camel.component.caffeine.load; import java.util.concurrent.TimeUnit; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.camel.Category; @@ -64,27 +65,34 @@ public class CaffeineLoadCacheEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { + cache = CamelContextHelper.lookup(getCamelContext(), cacheName, LoadingCache.class); if (cache == null) { - Caffeine<Object, Object> builder = Caffeine.newBuilder(); - if (configuration.getEvictionType() == EvictionType.SIZE_BASED) { - builder.initialCapacity(configuration.getInitialCapacity()); - builder.maximumSize(configuration.getMaximumSize()); - } else if (configuration.getEvictionType() == EvictionType.TIME_BASED) { - builder.expireAfterAccess(configuration.getExpireAfterAccessTime(), TimeUnit.SECONDS); - builder.expireAfterWrite(configuration.getExpireAfterWriteTime(), TimeUnit.SECONDS); - } - if (configuration.isStatsEnabled()) { - if (ObjectHelper.isEmpty(configuration.getStatsCounter())) { - builder.recordStats(); - } else { - builder.recordStats(configuration::getStatsCounter); + if (configuration.isCreateCacheIfNotExist()) { + Caffeine<Object, Object> builder = Caffeine.newBuilder(); + if (configuration.getEvictionType() == EvictionType.SIZE_BASED) { + builder.initialCapacity(configuration.getInitialCapacity()); + builder.maximumSize(configuration.getMaximumSize()); + } else if (configuration.getEvictionType() == EvictionType.TIME_BASED) { + builder.expireAfterAccess(configuration.getExpireAfterAccessTime(), TimeUnit.SECONDS); + builder.expireAfterWrite(configuration.getExpireAfterWriteTime(), TimeUnit.SECONDS); } + if (configuration.isStatsEnabled()) { + if (ObjectHelper.isEmpty(configuration.getStatsCounter())) { + builder.recordStats(); + } else { + builder.recordStats(configuration::getStatsCounter); + } + } + if (ObjectHelper.isNotEmpty(configuration.getRemovalListener())) { + builder.removalListener(configuration.getRemovalListener()); + } + cache = builder.build(configuration.getCacheLoader()); + } else { + throw new IllegalArgumentException( + "Loading cache instance '" + cacheName + + "' not found and createCacheIfNotExist is set to false"); } - if (ObjectHelper.isNotEmpty(configuration.getRemovalListener())) { - builder.removalListener(configuration.getRemovalListener()); - } - cache = builder.build(configuration.getCacheLoader()); } super.doStart(); } diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheProducer.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheProducer.java index 51a7c5b..c2236c8 100644 --- a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheProducer.java +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/load/CaffeineLoadCacheProducer.java @@ -87,11 +87,26 @@ public class CaffeineLoadCacheProducer extends HeaderSelectorProducer { @InvokeOnHeader(CaffeineConstants.ACTION_INVALIDATE_ALL) public void onInvalidateAll(Message message) { - cache.invalidateAll(message.getHeader(CaffeineConstants.KEYS, Collections::emptySet, Set.class)); + Set<?> keys = message.getHeader(CaffeineConstants.KEYS, Set.class); + /* Empty cache if no key set is provided + - implies no deletions at all if an empty key set is provided */ + if (keys == null) { + cache.invalidateAll(); + } else { + cache.invalidateAll(keys); + } setResult(message, true, null, null); } + @InvokeOnHeader(CaffeineConstants.ACTION_AS_MAP) + public void onAsMap(Message message) { + Map<?, ?> result = cache.asMap(); + + message.setHeader(CaffeineConstants.KEYS, result.keySet()); + setResult(message, true, result, null); + } + // **************************** // Helpers // **************************** diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducerTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducerTest.java index 98f3b52..db9ac7f 100644 --- a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducerTest.java +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheProducerTest.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.stream.Collectors; import com.github.benmanes.caffeine.cache.Cache; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.caffeine.CaffeineConstants; import org.apache.camel.component.mock.MockEndpoint; @@ -194,6 +195,51 @@ public class CaffeineCacheProducerTest extends CaffeineCacheTestSupport { } @Test + void testCacheInvalidateAllWithoutKeys() throws Exception { + final Cache<Object, Object> cache = getTestCache(); + final Map<String, String> map = generateRandomMapOfString(3); + + cache.putAll(map); + + final MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_HAS_RESULT, false); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_SUCCEEDED, true); + + fluentTemplate().withHeader(CaffeineConstants.ACTION, CaffeineConstants.ACTION_INVALIDATE_ALL) + .to("direct://start").send(); + + assertMockEndpointsSatisfied(); + + assertTrue(getTestCache().asMap().keySet().isEmpty()); + } + + @Test + void testCacheAsMap() throws Exception { + final Cache<Object, Object> cache = getTestCache(); + final Map<String, String> map = generateRandomMapOfString(3); + final Set<String> keys = map.keySet(); + + cache.putAll(map); + + final MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_HAS_RESULT, true); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_SUCCEEDED, true); + + final Exchange exchange = fluentTemplate().withHeader(CaffeineConstants.ACTION, CaffeineConstants.ACTION_AS_MAP) + .to("direct://start").send(); + + assertMockEndpointsSatisfied(); + + final Map<String, String> elements = exchange.getMessage().getBody(Map.class); + keys.forEach(k -> { + assertTrue(elements.containsKey(k)); + assertEquals(map.get(k), elements.get(k)); + }); + } + + @Test void testStats() { final Map<String, String> map = generateRandomMapOfString(3); final Set<String> keys = map.keySet().stream().limit(2).collect(Collectors.toSet()); diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheRemovaListenerProducerTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheRemovalListenerProducerTest.java similarity index 99% rename from components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheRemovaListenerProducerTest.java rename to components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheRemovalListenerProducerTest.java index 3bd9656..accb622 100644 --- a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheRemovaListenerProducerTest.java +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/CaffeineCacheRemovalListenerProducerTest.java @@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class CaffeineCacheRemovaListenerProducerTest extends CaffeineCacheTestSupport { +public class CaffeineCacheRemovalListenerProducerTest extends CaffeineCacheTestSupport { // **************************** // Clear diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/MetricsStatsCounter.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/MetricsStatsCounter.java index 6a576a4..bfe963d 100644 --- a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/MetricsStatsCounter.java +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/cache/MetricsStatsCounter.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; import com.github.benmanes.caffeine.cache.stats.StatsCounter; @@ -66,19 +67,20 @@ public class MetricsStatsCounter implements StatsCounter { } @Override + @Deprecated public void recordEviction() { - recordEviction(1); + recordEviction(1, RemovalCause.EXPLICIT); } @Override - public void recordEviction(int weight) { + public void recordEviction(int weight, RemovalCause removalCause) { evictionCount.inc(); evictionWeight.inc(weight); } @Override public CacheStats snapshot() { - return new CacheStats( + return CacheStats.of( hitCount.getCount(), missCount.getCount(), loadSuccessCount.getCount(), loadFailureCount.getCount(), totalLoadTime.getCount(), evictionCount.getCount(), evictionWeight.getCount()); diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/loadcache/CaffeineLoadCacheProducerTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/loadcache/CaffeineLoadCacheProducerTest.java index 8377f25..18507db 100644 --- a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/loadcache/CaffeineLoadCacheProducerTest.java +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/loadcache/CaffeineLoadCacheProducerTest.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.stream.Collectors; import com.github.benmanes.caffeine.cache.Cache; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.caffeine.CaffeineConstants; import org.apache.camel.component.mock.MockEndpoint; @@ -200,6 +201,58 @@ public class CaffeineLoadCacheProducerTest extends CaffeineLoadCacheTestSupport }); } + @Test + void testCacheInvalidateAllWithoutKeys() throws Exception { + final Cache<Object, Object> cache = getTestCache(); + final Map<Integer, Integer> map = new HashMap<>(); + map.put(1, 1); + map.put(2, 2); + map.put(3, 3); + + cache.putAll(map); + + final MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_HAS_RESULT, false); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_SUCCEEDED, true); + + fluentTemplate().withHeader(CaffeineConstants.ACTION, CaffeineConstants.ACTION_INVALIDATE_ALL) + .to("direct://start").send(); + + assertMockEndpointsSatisfied(); + + assertTrue(getTestCache().asMap().keySet().isEmpty()); + } + + @Test + void testCacheAsMap() throws Exception { + final Cache<Object, Object> cache = getTestCache(); + final Map<String, String> map = new HashMap<>(); + map.put("A", "AA"); + map.put("B", "BB"); + map.put("C", "CC"); + + final Set<String> keys = map.keySet(); + + cache.putAll(map); + + final MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_HAS_RESULT, true); + mock.expectedHeaderReceived(CaffeineConstants.ACTION_SUCCEEDED, true); + + final Exchange exchange = fluentTemplate().withHeader(CaffeineConstants.ACTION, CaffeineConstants.ACTION_AS_MAP) + .to("direct://start").send(); + + assertMockEndpointsSatisfied(); + + final Map<String, String> elements = exchange.getMessage().getBody(Map.class); + keys.forEach(k -> { + assertTrue(elements.containsKey(k)); + assertEquals(map.get(k), elements.get(k)); + }); + } + // **************************** // Route // ****************************