This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-20039/add-soft-lru-cache-4.0 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 491cd91636298d927afb4c1d412070f32e1c0e90 Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Tue Oct 31 14:46:16 2023 +0100 CAMEL-20039: camel-core - SimpleLRUCache add support for soft cache --- .../SimpleLRUCacheTest.java} | 10 +- .../camel/support/cache/SimpleSoftCacheTest.java | 317 ++++++++++++++++ .../camel/support/DefaultLRUCacheFactory.java | 330 ++--------------- .../apache/camel/support/cache/SimpleLRUCache.java | 289 +++++++++++++++ .../camel/support/cache/SimpleSoftCache.java | 397 +++++++++++++++++++++ 5 files changed, 1045 insertions(+), 298 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/support/DefaultLRUCacheFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java similarity index 96% rename from core/camel-core/src/test/java/org/apache/camel/support/DefaultLRUCacheFactoryTest.java rename to core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java index c9f7fdda9b4..d7e9f4549a5 100644 --- a/core/camel-core/src/test/java/org/apache/camel/support/DefaultLRUCacheFactoryTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.support; +package org.apache.camel.support.cache; import java.util.ArrayList; import java.util.List; @@ -29,14 +29,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * The test class for {@link DefaultLRUCacheFactory}. + * The test class for {@link SimpleLRUCache}. */ -class DefaultLRUCacheFactoryTest { +class SimpleLRUCacheTest { private final List<String> consumed = new ArrayList<>(); - private final DefaultLRUCacheFactory.SimpleLRUCache<String, String> map - = (DefaultLRUCacheFactory.SimpleLRUCache<String, String>) new DefaultLRUCacheFactory().<String, - String> createLRUCache(3, consumed::add); + private final SimpleLRUCache<String, String> map = new SimpleLRUCache<>(16, 3, consumed::add); @Test void forbiddenOperations() { diff --git a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleSoftCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleSoftCacheTest.java new file mode 100644 index 00000000000..3dd3bdea625 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleSoftCacheTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.cache; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * The test class for {@link SimpleSoftCache}. + */ +class SimpleSoftCacheTest { + + private final SimpleSoftCache<Integer, Object> cache = new SimpleSoftCache<>(new ConcurrentHashMap<>()); + + @Test + void testSoftCacheGetAndPut() { + + cache.put(1, "foo"); + cache.put(2, "bar"); + + assertEquals("foo", cache.get(1)); + assertEquals("bar", cache.get(2)); + assertNull(cache.get(3)); + + assertEquals(2, cache.size()); + + cache.getInnerCache().get(1).clear(); + assertEquals(2, cache.size()); + assertNull(cache.get(1)); + assertEquals(1, cache.size()); + } + + @Test + void testSoftCacheContainsValue() { + cache.put(1, "foo"); + + assertTrue(cache.containsValue("foo")); + assertFalse(cache.containsValue("bar")); + + assertFalse(cache.isEmpty()); + cache.getInnerCache().get(1).clear(); + assertFalse(cache.containsValue("foo")); + assertTrue(cache.isEmpty()); + } + + @Test + void testSoftCacheForEach() { + cache.put(1, "foo"); + cache.put(2, "bar"); + + Map<Integer, Object> tmp = new HashMap<>(); + cache.forEach(tmp::put); + + assertEquals("foo", tmp.get(1)); + assertEquals("bar", tmp.get(2)); + assertNull(tmp.get(3)); + + assertEquals(2, tmp.size()); + + cache.getInnerCache().get(1).clear(); + + tmp = new HashMap<>(); + cache.forEach(tmp::put); + + assertNull(tmp.get(1)); + assertEquals("bar", tmp.get(2)); + assertNull(tmp.get(3)); + + assertEquals(1, tmp.size()); + } + + @Test + void testSoftCacheReplaceAll() { + cache.put(1, "foo"); + cache.put(2, "bar"); + + cache.replaceAll((k, v) -> v + "2"); + + assertEquals("foo2", cache.get(1)); + assertEquals("bar2", cache.get(2)); + + assertEquals(2, cache.size()); + } + + @Test + void testSoftCachePutIfAbsent() { + cache.put(1, "foo"); + + assertEquals("foo", cache.putIfAbsent(1, "bar")); + assertEquals("foo", cache.get(1)); + + assertNull(cache.putIfAbsent(2, "bar")); + assertEquals("bar", cache.get(2)); + } + + @Test + void testSoftCacheRemove() { + cache.put(1, "foo"); + assertFalse(cache.remove(2, "foo")); + assertFalse(cache.remove(1, "bar")); + assertEquals("foo", cache.get(1)); + assertFalse(cache.isEmpty()); + assertTrue(cache.remove(1, "foo")); + assertNull(cache.get(1)); + assertTrue(cache.isEmpty()); + } + + @Test + void testSoftCacheReplaceSpecific() { + cache.put(1, "foo"); + assertFalse(cache.replace(2, "foo", "bar")); + assertFalse(cache.replace(1, "bar", "foo")); + assertEquals("foo", cache.get(1)); + assertTrue(cache.replace(1, "foo", "bar")); + assertEquals("bar", cache.get(1)); + } + + @Test + void testSoftCacheReplace() { + cache.put(1, "foo"); + assertNull(cache.replace(2, "bar")); + assertEquals("foo", cache.get(1)); + assertEquals("foo", cache.replace(1, "bar")); + assertEquals("bar", cache.get(1)); + } + + @Test + void testSoftCacheComputeIfAbsent() { + cache.put(1, "foo"); + assertEquals("foo", cache.computeIfAbsent(1, k -> "bar")); + assertEquals("foo", cache.get(1)); + assertEquals("bar", cache.computeIfAbsent(2, k -> "bar")); + assertEquals("bar", cache.get(2)); + } + + @Test + void testSoftCacheComputeIfPresent() { + cache.put(1, "foo"); + assertEquals("bar", cache.computeIfPresent(1, (k, v) -> "bar")); + assertEquals("bar", cache.get(1)); + assertNull(cache.computeIfPresent(1, (k, v) -> null)); + assertNull(cache.get(1)); + assertNull(cache.computeIfPresent(1, (k, v) -> "bar")); + } + + @Test + void testSoftCacheCompute() { + cache.put(1, "foo"); + assertEquals("bar", cache.compute(1, (k, v) -> "bar")); + assertEquals("bar", cache.get(1)); + assertNull(cache.compute(1, (k, v) -> null)); + assertNull(cache.get(1)); + assertEquals("bar", cache.compute(1, (k, v) -> "bar")); + assertEquals("bar", cache.get(1)); + assertNull(cache.compute(2, (k, v) -> null)); + assertNull(cache.get(2)); + } + + @Test + void testSoftCacheMerge() { + cache.put(1, "foo"); + assertEquals("foo-2", cache.merge(1, "2", (v1, v2) -> v1 + "-" + v2)); + assertEquals("foo-2", cache.get(1)); + assertNull(cache.merge(1, "2", (v1, v2) -> null)); + assertNull(cache.get(1)); + assertEquals("2", cache.merge(1, "2", (v1, v2) -> "bar")); + assertEquals("2", cache.get(1)); + assertEquals("2", cache.merge(2, "2", (v1, v2) -> null)); + assertEquals("2", cache.get(2)); + } + + @Test + void testSimpleSoftCachePutOverride() { + Object old = cache.put(1, "foo"); + assertNull(old); + old = cache.put(2, "bar"); + assertNull(old); + + assertEquals("foo", cache.get(1)); + assertEquals("bar", cache.get(2)); + + old = cache.put(1, "changed"); + assertEquals("foo", old); + assertEquals("changed", cache.get(1)); + + assertEquals(2, cache.size()); + } + + @Test + void testSimpleSoftCachePutAll() { + Map<Integer, Object> map = new HashMap<>(); + map.put(1, "foo"); + map.put(2, "bar"); + + cache.putAll(map); + + assertEquals("foo", cache.get(1)); + assertEquals("bar", cache.get(2)); + assertNull(cache.get(3)); + assertEquals(2, cache.size()); + } + + @Test + void testSimpleSoftCacheRemove() { + cache.put(1, "foo"); + cache.put(2, "bar"); + + assertEquals("bar", cache.get(2)); + cache.remove(2); + assertNull(cache.get(2)); + } + + @Test + void testSimpleSoftCacheValues() { + cache.put(1, "foo"); + cache.put(2, "bar"); + + Collection<Object> col = cache.values(); + assertEquals(2, col.size()); + + Iterator<Object> it = col.iterator(); + assertEquals("foo", it.next()); + assertEquals("bar", it.next()); + } + + @Test + void testSimpleSoftCacheEmpty() { + assertTrue(cache.isEmpty()); + + cache.put(1, "foo"); + assertFalse(cache.isEmpty()); + + cache.put(2, "bar"); + assertFalse(cache.isEmpty()); + + cache.remove(2); + assertFalse(cache.isEmpty()); + + cache.clear(); + assertTrue(cache.isEmpty()); + + } + + @Test + void testSimpleSoftCacheContainsKey() { + assertFalse(cache.containsKey(1)); + cache.put(1, "foo"); + assertTrue(cache.containsKey(1)); + + assertFalse(cache.containsKey(2)); + cache.put(2, "foo"); + assertTrue(cache.containsKey(2)); + } + + @Test + void testSimpleSoftCacheKeySet() { + cache.put(1, "foo"); + cache.put(2, "foo"); + + Set<Integer> keys = cache.keySet(); + assertEquals(2, keys.size()); + + Iterator<Integer> it = keys.iterator(); + assertEquals(1, it.next().intValue()); + assertEquals(2, it.next().intValue()); + } + + @Test + void testSimpleSoftCacheNotRunOutOfMemory() { + // we should not run out of memory using the soft cache + // if you run this test with a regular cache then you will run out of memory + int maximumCacheSize = 1024; + for (int i = 0; i < maximumCacheSize; i++) { + Object data = new LargeObject(); + Integer key = Integer.valueOf(i); + cache.put(key, data); + } + + Map<Integer, Object> tmp = new HashMap<>(cache); + int size = tmp.size(); + assertTrue(size < maximumCacheSize, "Cache size should not be max, was: " + size); + } + + public static class LargeObject { + + byte[] data; + + public LargeObject() { + this.data = new byte[100 * 1024 * 1024]; // 100 MB + } + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java index d5a21b23d1b..f42d3cf1726 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java @@ -16,20 +16,13 @@ */ package org.apache.camel.support; -import java.util.Collections; +import java.lang.ref.SoftReference; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.LongAdder; -import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; +import org.apache.camel.support.cache.SimpleLRUCache; +import org.apache.camel.support.cache.SimpleSoftCache; import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +42,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { */ @Override public <K, V> Map<K, V> createLRUCache(int maximumCacheSize) { - LOG.trace("Creating LRUCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(maximumCacheSize); + return createLRUCache(16, maximumCacheSize); } /** @@ -61,7 +53,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { */ @Override public <K, V> Map<K, V> createLRUCache(int maximumCacheSize, Consumer<V> onEvict) { - LOG.trace("Creating LRUCache with maximumCacheSize: {}", maximumCacheSize); + LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, with onEvict", 16, maximumCacheSize); return new SimpleLRUCache<>(16, maximumCacheSize, onEvict); } @@ -75,8 +67,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { */ @Override public <K, V> Map<K, V> createLRUCache(int initialCapacity, int maximumCacheSize) { - LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize); + return createLRUCache(initialCapacity, maximumCacheSize, maximumCacheSize > 0); } /** @@ -92,7 +83,9 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { public <K, V> Map<K, V> createLRUCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, maximumCacheSize, stopOnEviction); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction); + return new SimpleLRUCache<K, V>( + initialCapacity, maximumCacheSize, + stopOnEviction ? DefaultLRUCacheFactory.this::doStop : DefaultLRUCacheFactory.this::doNothing); } /** @@ -104,21 +97,36 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { */ @Override public <K, V> Map<K, V> createLRUSoftCache(int maximumCacheSize) { - LOG.trace("Creating LRUSoftCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(maximumCacheSize); + return createLRUSoftCache(16, maximumCacheSize); } @Override public <K, V> Map<K, V> createLRUSoftCache(int initialCapacity, int maximumCacheSize) { - LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize); + return createLRUSoftCache(initialCapacity, maximumCacheSize, maximumCacheSize > 0); } @Override public <K, V> Map<K, V> createLRUSoftCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { - LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, + LOG.trace("Creating LRUSoftCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, maximumCacheSize, stopOnEviction); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction); + return new SimpleSoftCache<>( + new SimpleLRUCache<K, SoftReference<V>>( + initialCapacity, maximumCacheSize, + asSoftReferenceConsumer(stopOnEviction + ? DefaultLRUCacheFactory.this::doStop : DefaultLRUCacheFactory.this::doNothing))); + } + + /** + * Converts a consumer of values of type {@code V} into a consumer of referent of {@code SoftReference} of type + * {@code V}. + */ + private static <V> Consumer<SoftReference<V>> asSoftReferenceConsumer(Consumer<V> evicted) { + return ref -> { + V v = ref.get(); + if (v != null) { + evicted.accept(v); + } + }; } /** @@ -131,287 +139,25 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { @Override @Deprecated public <K, V> Map<K, V> createLRUWeakCache(int maximumCacheSize) { - LOG.trace("Creating LRUWeakCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(maximumCacheSize); + return createLRUWeakCache(16, maximumCacheSize); } @Override @Deprecated public <K, V> Map<K, V> createLRUWeakCache(int initialCapacity, int maximumCacheSize) { - LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize); + return createLRUWeakCache(initialCapacity, maximumCacheSize, maximumCacheSize > 0); } @Override @Deprecated public <K, V> Map<K, V> createLRUWeakCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { - LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, + LOG.trace("Creating LRUWeakCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, maximumCacheSize, stopOnEviction); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction); - } - - class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { - - static final float DEFAULT_LOAD_FACTOR = 0.75f; - /** - * The flag indicating that an eviction process is in progress. - */ - private final AtomicBoolean eviction = new AtomicBoolean(); - /** - * The maximum cache size. - */ - private final int maximumCacheSize; - /** - * The last changes recorded. - */ - private final Queue<Entry<K, V>> lastChanges = new ConcurrentLinkedQueue<>(); - /** - * The total amount of changes recorded. - */ - private final LongAdder totalChanges = new LongAdder(); - /** - * The function to call when an entry is evicted. - */ - private final Consumer<V> evict; - - public SimpleLRUCache(int maximumCacheSize) { - this(16, maximumCacheSize, maximumCacheSize > 0); - } - - public SimpleLRUCache(int initialCapacity, int maximumCacheSize) { - this(initialCapacity, maximumCacheSize, maximumCacheSize > 0); - } - - public SimpleLRUCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { - this(initialCapacity, maximumCacheSize, - stopOnEviction ? DefaultLRUCacheFactory.this::doStop : DefaultLRUCacheFactory.this::doNothing); - } - - public SimpleLRUCache(int initialCapacity, int maximumCacheSize, Consumer<V> evicted) { - super(initialCapacity, DEFAULT_LOAD_FACTOR); - this.maximumCacheSize = maximumCacheSize; - this.evict = Objects.requireNonNull(evicted); - } - - /** - * Adds a new change in case the mapping function doesn't return {@code null}. - * - * @param context the context of the write operation - * @param mappingFunction the mapping function to apply. - * @return the result of the mapping function. - */ - private V addChange(OperationContext<K, V> context, Function<? super K, ? extends V> mappingFunction) { - K key = context.key; - V value = mappingFunction.apply(key); - if (value == null) { - return null; - } - lastChanges.add(Map.entry(key, value)); - totalChanges.increment(); - return value; - } - - @Override - public V put(K key, V value) { - if (key == null || value == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.compute( - key, - (k, v) -> { - context.result = v; - return addChange(context, x -> value); - }); - return context.result; - } - } - - @Override - public V putIfAbsent(K key, V value) { - if (key == null || value == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.compute( - key, - (k, v) -> { - context.result = v; - if (v != null) { - return v; - } - return addChange(context, x -> value); - }); - return context.result; - } - } - - @Override - public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { - if (key == null || mappingFunction == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.computeIfAbsent(key, k -> addChange(context, mappingFunction)); - } - } - - @Override - public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - if (key == null || remappingFunction == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.computeIfPresent(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, v))); - } - } - - @Override - public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - if (key == null || remappingFunction == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.compute(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, v))); - } - } - - @Override - public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { - if (key == null || value == null || remappingFunction == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.compute( - key, - (k, oldValue) -> { - V newValue = (oldValue == null) ? value : remappingFunction.apply(oldValue, value); - return addChange(context, x -> newValue); - }); - } - } - - @Override - public boolean replace(K key, V oldValue, V newValue) { - if (key == null || oldValue == null || newValue == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.computeIfPresent( - key, - (k, v) -> { - if (Objects.equals(oldValue, v)) { - context.result = addChange(context, x -> newValue); - return context.result; - } - return v; - }); - return context.result != null && Objects.equals(context.result, newValue); - } - } - - @Override - public V replace(K key, V value) { - if (key == null || value == null) { - throw new NullPointerException(); - } - try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.computeIfPresent( - key, - (k, v) -> { - context.result = v; - return addChange(context, x -> value); - }); - return context.result; - } - } - - @Override - public void putAll(Map<? extends K, ? extends V> m) { - for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) { - put(e.getKey(), e.getValue()); - } - } - - @Override - public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { - for (Map.Entry<? extends K, ? extends V> e : entrySet()) { - replace(e.getKey(), e.getValue(), function.apply(e.getKey(), e.getValue())); - } - } - - @Override - public Set<Entry<K, V>> entrySet() { - return Collections.unmodifiableSet(super.entrySet()); - } - - /** - * @return the size of the queue of changes. - */ - int getQueueSize() { - return totalChanges.intValue(); - } - - /** - * Indicates whether an eviction is needed. An eviction can be triggered if the size of the map or the queue of - * changes exceeds the maximum allowed size which is respectively {@code maximumCacheSize} and - * {@code 2 * maximumCacheSize}. - * - * @return {@code true} if an eviction is needed, {@code false} otherwise. - */ - private boolean evictionNeeded() { - return size() > maximumCacheSize || getQueueSize() > 2 * maximumCacheSize; - } - - /** - * @return the oldest existing change. - */ - private Entry<K, V> nextOldestChange() { - Entry<K, V> oldest = lastChanges.poll(); - if (oldest != null) { - totalChanges.decrement(); - } - return oldest; - } - - /** - * The internal context of all write operations. - */ - private static class OperationContext<K, V> implements AutoCloseable { - /** - * The result of the corresponding operation when applicable. - */ - V result; - /** - * The key against which the operation is made. - */ - final K key; - /** - * The underlying cache. - */ - private final SimpleLRUCache<K, V> cache; - - OperationContext(SimpleLRUCache<K, V> cache, K key) { - this.cache = cache; - this.key = key; - } - - @Override - public void close() { - if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { - try { - while (cache.evictionNeeded()) { - Entry<K, V> oldest = cache.nextOldestChange(); - if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { - cache.evict.accept(oldest.getValue()); - } - } - } finally { - cache.eviction.set(false); - } - } - } - } + return new SimpleSoftCache<>( + new SimpleLRUCache<K, SoftReference<V>>( + initialCapacity, maximumCacheSize, + asSoftReferenceConsumer(stopOnEviction + ? DefaultLRUCacheFactory.this::doStop : DefaultLRUCacheFactory.this::doNothing))); } <V> void doNothing(V value) { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java new file mode 100644 index 00000000000..c36c1640259 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.cache; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * {@code SimpleLRUCache} is a simple implementation of a cache of type Least Recently Used . The implementation doesn't + * accept null values. Generally speaking, the parameters of all the public methods must have a value otherwise a + * {@code NullPointerException} is thrown. + * + * @param <K> type of the key + * @param <V> type of the value + */ +public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { + + static final float DEFAULT_LOAD_FACTOR = 0.75f; + /** + * The flag indicating that an eviction process is in progress. + */ + private final AtomicBoolean eviction = new AtomicBoolean(); + /** + * The maximum cache size. + */ + private final int maximumCacheSize; + /** + * The last changes recorded. + */ + private final Queue<Entry<K, V>> lastChanges = new ConcurrentLinkedQueue<>(); + /** + * The total amount of changes recorded. + */ + private final LongAdder totalChanges = new LongAdder(); + /** + * The function to call when an entry is evicted. + */ + private final Consumer<V> evict; + + public SimpleLRUCache(int initialCapacity, int maximumCacheSize, Consumer<V> evicted) { + super(initialCapacity, DEFAULT_LOAD_FACTOR); + this.maximumCacheSize = maximumCacheSize; + this.evict = Objects.requireNonNull(evicted); + } + + /** + * Adds a new change in case the mapping function doesn't return {@code null}. + * + * @param context the context of the write operation + * @param mappingFunction the mapping function to apply. + * @return the result of the mapping function. + */ + private V addChange(OperationContext<K, V> context, Function<? super K, ? extends V> mappingFunction) { + K key = context.key; + V value = mappingFunction.apply(key); + if (value == null) { + return null; + } + lastChanges.add(Map.entry(key, value)); + totalChanges.increment(); + return value; + } + + @Override + public V put(K key, V value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + super.compute( + key, + (k, v) -> { + context.result = v; + return addChange(context, x -> value); + }); + return context.result; + } + } + + @Override + public V putIfAbsent(K key, V value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + super.compute( + key, + (k, v) -> { + context.result = v; + if (v != null) { + return v; + } + return addChange(context, x -> value); + }); + return context.result; + } + } + + @Override + public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { + if (key == null || mappingFunction == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + return super.computeIfAbsent(key, k -> addChange(context, mappingFunction)); + } + } + + @Override + public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + if (key == null || remappingFunction == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + return super.computeIfPresent(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, v))); + } + } + + @Override + public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + if (key == null || remappingFunction == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + return super.compute(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, v))); + } + } + + @Override + public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { + if (key == null || value == null || remappingFunction == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + return super.compute( + key, + (k, oldValue) -> { + V newValue = (oldValue == null) ? value : remappingFunction.apply(oldValue, value); + return addChange(context, x -> newValue); + }); + } + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + if (key == null || oldValue == null || newValue == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + super.computeIfPresent( + key, + (k, v) -> { + if (Objects.equals(oldValue, v)) { + context.result = addChange(context, x -> newValue); + return context.result; + } + return v; + }); + return context.result != null && Objects.equals(context.result, newValue); + } + } + + @Override + public V replace(K key, V value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + super.computeIfPresent( + key, + (k, v) -> { + context.result = v; + return addChange(context, x -> value); + }); + return context.result; + } + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + for (Entry<? extends K, ? extends V> e : m.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + @Override + public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { + for (Entry<? extends K, ? extends V> e : entrySet()) { + replace(e.getKey(), e.getValue(), function.apply(e.getKey(), e.getValue())); + } + } + + @Override + public Set<Entry<K, V>> entrySet() { + return Collections.unmodifiableSet(super.entrySet()); + } + + /** + * @return the size of the queue of changes. + */ + int getQueueSize() { + return totalChanges.intValue(); + } + + /** + * Indicates whether an eviction is needed. An eviction can be triggered if the size of the map or the queue of + * changes exceeds the maximum allowed size which is respectively {@code maximumCacheSize} and + * {@code 2 * maximumCacheSize}. + * + * @return {@code true} if an eviction is needed, {@code false} otherwise. + */ + private boolean evictionNeeded() { + return size() > maximumCacheSize || getQueueSize() > 2 * maximumCacheSize; + } + + /** + * @return the oldest existing change. + */ + private Entry<K, V> nextOldestChange() { + Entry<K, V> oldest = lastChanges.poll(); + if (oldest != null) { + totalChanges.decrement(); + } + return oldest; + } + + /** + * The internal context of all write operations. + */ + private static class OperationContext<K, V> implements AutoCloseable { + /** + * The result of the corresponding operation when applicable. + */ + V result; + /** + * The key against which the operation is made. + */ + final K key; + /** + * The underlying cache. + */ + private final SimpleLRUCache<K, V> cache; + + OperationContext(SimpleLRUCache<K, V> cache, K key) { + this.cache = cache; + this.key = key; + } + + @Override + public void close() { + if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { + try { + while (cache.evictionNeeded()) { + Entry<K, V> oldest = cache.nextOldestChange(); + if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { + cache.evict.accept(oldest.getValue()); + } + } + } finally { + cache.eviction.set(false); + } + } + } + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleSoftCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleSoftCache.java new file mode 100644 index 00000000000..ee2b8b70f82 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleSoftCache.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.cache; + +import java.lang.ref.Reference; +import java.lang.ref.SoftReference; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * {@code SimpleSoftCache} is a simple implementation of a cache where values are soft references which allows the + * Garbage Collector to clear the referents in response of a memory demand to potentially prevent + * {@code OutOfMemoryError}. The entries where the referent is missing are removed lazily when they are accessed + * directly or indirectly through the {@code Map} API. The implementation doesn't accept null values. Generally + * speaking, the parameters of all the public methods must have a value otherwise a {@code NullPointerException} is + * thrown. + * + * @param <K> type of the key + * @param <V> type of the value + * @see SimpleLRUCache + */ +public class SimpleSoftCache<K, V> implements Map<K, V> { + + /** + * The underlying cache to which the modifications are applied to. + */ + private final Map<K, SoftReference<V>> delegate; + + /** + * Constructs a {@code SimpleSoftCache} with the given underlying cache. + * + * @param delegate the underlying cache to which the modifications are applied to. Be aware that the implementation + * of the provided map must accept concurrent modifications to allow lazy evictions of empty + * references. + */ + public SimpleSoftCache(Map<K, SoftReference<V>> delegate) { + this.delegate = delegate; + } + + /** + * @return the size of the cache without considering if the soft references still have a referent set for the sake + * of simplicity and efficiency. + */ + @Override + public int size() { + return delegate.size(); + } + + /** + * Returns true if this map contains no key-value mappings without considering if the soft references still have a + * referent set for the sake of simplicity and efficiency. + */ + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return get(key) != null; + } + + @Override + public boolean containsValue(Object value) { + for (Entry<K, SoftReference<V>> entry : delegate.entrySet()) { + SoftReference<V> ref = entry.getValue(); + V refVal = ref.get(); + if (refVal == null) { + delegate.remove(entry.getKey(), ref); + } else if (Objects.equals(value, refVal)) { + return true; + } + } + return false; + } + + @Override + public V get(Object key) { + SoftReference<V> ref = delegate.get(key); + if (ref == null) { + return null; + } + V v = ref.get(); + if (v == null) { + delegate.remove(key, ref); + } + return v; + } + + @Override + public V put(K key, V value) { + SoftReference<V> prev = delegate.put(key, new SoftReference<>(value)); + return prev == null ? null : prev.get(); + } + + @Override + public V remove(Object key) { + SoftReference<V> prev = delegate.remove(key); + return prev == null ? null : prev.get(); + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + for (Entry<? extends K, ? extends V> e : m.entrySet()) { + delegate.put(e.getKey(), new SoftReference<>(e.getValue())); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set<K> keySet() { + return delegate.keySet(); + } + + @Override + public Collection<V> values() { + return delegate.values().stream().map(Reference::get).filter(Objects::nonNull).collect(Collectors.toList()); + } + + @Override + public Set<Entry<K, V>> entrySet() { + Set<Entry<K, V>> result = new HashSet<>(delegate.size()); + for (Entry<K, SoftReference<V>> entry : delegate.entrySet()) { + SoftReference<V> ref = entry.getValue(); + V v = ref.get(); + if (v == null) { + delegate.remove(entry.getKey(), ref); + continue; + } + result.add(Map.entry(entry.getKey(), v)); + } + return result; + } + + @Override + public void forEach(BiConsumer<? super K, ? super V> action) { + delegate.forEach((k, ref) -> { + V v = ref.get(); + if (v == null) { + delegate.remove(k, ref); + } else { + action.accept(k, v); + } + }); + } + + @Override + public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { + for (Entry<? extends K, ? extends V> e : entrySet()) { + replace(e.getKey(), e.getValue(), function.apply(e.getKey(), e.getValue())); + } + } + + @Override + public V putIfAbsent(K key, V value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + V prev = null; + if (ref == null) { + SoftReference<V> prevRef = delegate.putIfAbsent(key, new SoftReference<>(value)); + if (prevRef != null && (prev = prevRef.get()) == null) { + // The referent is missing let's try again + delegate.remove(key, prevRef); + continue; + } + } else { + prev = ref.get(); + if (prev == null && !delegate.replace(key, ref, new SoftReference<>(value))) { + // The state has changed, let's try again + continue; + } + } + return prev; + } + } + + @Override + public boolean remove(Object key, Object value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + if (ref != null) { + V v = ref.get(); + if (v == null || Objects.equals(v, value)) { + if (delegate.remove(key, ref)) { + return v != null; + } + // The state has changed, let's try again + continue; + } + } + return false; + } + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + if (key == null || oldValue == null || newValue == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + if (ref != null) { + V v = ref.get(); + if (v == null) { + if (!delegate.remove(key, ref)) { + // The state has changed, let's try again + continue; + } + } else if (Objects.equals(v, oldValue)) { + if (!delegate.replace(key, ref, new SoftReference<>(newValue))) { + // The state has changed, let's try again + continue; + } + return true; + } + } + return false; + } + } + + @Override + public V replace(K key, V value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + if (ref != null) { + V v = ref.get(); + if (v == null) { + if (!delegate.remove(key, ref)) { + // The state has changed, let's try again + continue; + } + } else if (!delegate.replace(key, ref, new SoftReference<>(value))) { + // The state has changed, let's try again + continue; + } + return v; + } + return null; + } + } + + @Override + public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { + if (key == null || mappingFunction == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + if (ref == null) { + V newValue = mappingFunction.apply(key); + if (newValue != null && delegate.putIfAbsent(key, new SoftReference<>(newValue)) != null) { + // The state has changed, let's try again + continue; + } + return newValue; + } else { + V v = ref.get(); + if (v == null) { + // The referent is missing let's try again + delegate.remove(key, ref); + continue; + } + return v; + } + } + } + + @Override + public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + if (key == null || remappingFunction == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + if (ref != null) { + V v = ref.get(); + if (v == null) { + if (delegate.remove(key, ref)) { + return null; + } + // The state has changed, let's try again + continue; + } + V newValue = remappingFunction.apply(key, v); + if (newValue == null) { + if (!delegate.remove(key, ref)) { + // The state has changed, let's try again + continue; + } + } else if (!delegate.replace(key, ref, new SoftReference<>(newValue))) { + // The state has changed, let's try again + continue; + } + return newValue; + } + return null; + } + } + + @Override + public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + if (key == null || remappingFunction == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + V oldValue = ref == null ? null : ref.get(); + V newValue = remappingFunction.apply(key, oldValue); + if (newValue == null) { + // delete mapping + if (ref != null && !delegate.remove(key, ref)) { + // The state has changed, let's try again + continue; + } + } else if (ref == null) { + if (delegate.putIfAbsent(key, new SoftReference<>(newValue)) != null) { + // The state has changed, let's try again + continue; + } + } else if (!delegate.replace(key, ref, new SoftReference<>(newValue))) { + // The state has changed, let's try again + continue; + } + return newValue; + } + } + + @Override + public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { + if (key == null || value == null || remappingFunction == null) { + throw new NullPointerException(); + } + for (;;) { + SoftReference<V> ref = delegate.get(key); + V oldValue = ref == null ? null : ref.get(); + V newValue = oldValue == null ? value : remappingFunction.apply(oldValue, value); + if (newValue == null) { + if (!delegate.remove(key, ref)) { + // The state has changed, let's try again + continue; + } + } else if (ref == null) { + if (delegate.putIfAbsent(key, new SoftReference<>(newValue)) != null) { + // The state has changed, let's try again + continue; + } + } else if (!delegate.replace(key, ref, new SoftReference<>(newValue))) { + // The state has changed, let's try again + continue; + } + return newValue; + } + } + + /** + * Only meant for testing purpose. + * + * @return the underlying cache. + */ + Map<K, SoftReference<V>> getInnerCache() { + return delegate; + } +}