This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.8.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push: new ba956d2514e CAMEL-21467: camel-core - No longer compute cache changes size (#16387) ba956d2514e is described below commit ba956d2514e8946d072692fed3e98dbe50dcdf0b Author: Nicolas Filotto <essob...@users.noreply.github.com> AuthorDate: Tue Nov 26 21:39:54 2024 +0100 CAMEL-21467: camel-core - No longer compute cache changes size (#16387) ## Motivation In some specific use cases, the eviction of the entries never ends ## Modifications: * Use `AtomicReference` instead of `volatile` to keep the changes * No longer compute cache changes size --- .../camel/support/cache/SimpleLRUCacheTest.java | 10 ++------- .../apache/camel/support/cache/SimpleLRUCache.java | 24 ++++++---------------- 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java index 6c38fbd069a..73c09bed714 100644 --- a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java @@ -336,10 +336,7 @@ class SimpleLRUCacheTest { @ParameterizedTest @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 1_000 }) void concurrentPut(int maximumCacheSize) throws Exception { - int threads = Runtime.getRuntime().availableProcessors() - 1; - if (threads < 1) { - threads = 1; - } + int threads = Runtime.getRuntime().availableProcessors(); int totalKeysPerThread = 1_000; AtomicInteger counter = new AtomicInteger(); SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, maximumCacheSize, v -> counter.incrementAndGet()); @@ -366,10 +363,7 @@ class SimpleLRUCacheTest { @ParameterizedTest @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 500 }) void concurrentPutWithCollisions(int maximumCacheSize) throws Exception { - int threads = Runtime.getRuntime().availableProcessors() - 1; - if (threads < 1) { - threads = 1; - } + int threads = Runtime.getRuntime().availableProcessors(); int totalKeys = 1_000; AtomicInteger counter = new AtomicInteger(); SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, maximumCacheSize, v -> counter.incrementAndGet()); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index ea7e614c805..b6996a3f215 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -25,7 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -56,11 +56,7 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { /** * The last changes recorded. */ - private volatile Deque<Entry<K, V>> lastChanges = new ConcurrentLinkedDeque<>(); - /** - * The total amount of changes recorded. - */ - private final LongAdder totalChanges = new LongAdder(); + private final AtomicReference<Deque<Entry<K, V>>> lastChanges = new AtomicReference<>(new ConcurrentLinkedDeque<>()); /** * The function to call when an entry is evicted. */ @@ -88,8 +84,7 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { if (value == null) { return null; } - lastChanges.add(Map.entry(key, value)); - totalChanges.increment(); + lastChanges.get().add(Map.entry(key, value)); return value; } @@ -231,7 +226,7 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { * @return the size of the queue of changes. */ int getQueueSize() { - return totalChanges.intValue(); + return lastChanges.get().size(); } /** @@ -266,27 +261,20 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { * @return the oldest existing change. */ private Entry<K, V> nextOldestChange() { - Entry<K, V> oldest = lastChanges.poll(); - if (oldest != null) { - totalChanges.decrement(); - } - return oldest; + return lastChanges.get().poll(); } /** * Removes duplicates from the queue of changes. */ private void compressChanges() { - Deque<Entry<K, V>> currentChanges = this.lastChanges; Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>(); - this.lastChanges = newChanges; + Deque<Entry<K, V>> currentChanges = lastChanges.getAndSet(newChanges); Set<K> keys = new HashSet<>(); Entry<K, V> entry; while ((entry = currentChanges.pollLast()) != null) { if (keys.add(entry.getKey())) { newChanges.addFirst(entry); - } else { - totalChanges.decrement(); } } }