This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.8.x by this push:
     new ba956d2514e CAMEL-21467: camel-core - No longer compute cache changes 
size (#16387)
ba956d2514e is described below

commit ba956d2514e8946d072692fed3e98dbe50dcdf0b
Author: Nicolas Filotto <essob...@users.noreply.github.com>
AuthorDate: Tue Nov 26 21:39:54 2024 +0100

    CAMEL-21467: camel-core - No longer compute cache changes size (#16387)
    
    ## Motivation
    
    In some specific use cases, the eviction of the entries never ends
    
    ## Modifications:
    
    * Use `AtomicReference` instead of `volatile` to keep the changes
    * No longer compute cache changes size
---
 .../camel/support/cache/SimpleLRUCacheTest.java    | 10 ++-------
 .../apache/camel/support/cache/SimpleLRUCache.java | 24 ++++++----------------
 2 files changed, 8 insertions(+), 26 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
index 6c38fbd069a..73c09bed714 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
@@ -336,10 +336,7 @@ class SimpleLRUCacheTest {
     @ParameterizedTest
     @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 1_000 })
     void concurrentPut(int maximumCacheSize) throws Exception {
-        int threads = Runtime.getRuntime().availableProcessors() - 1;
-        if (threads < 1) {
-            threads = 1;
-        }
+        int threads = Runtime.getRuntime().availableProcessors();
         int totalKeysPerThread = 1_000;
         AtomicInteger counter = new AtomicInteger();
         SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, 
maximumCacheSize, v -> counter.incrementAndGet());
@@ -366,10 +363,7 @@ class SimpleLRUCacheTest {
     @ParameterizedTest
     @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 500 })
     void concurrentPutWithCollisions(int maximumCacheSize) throws Exception {
-        int threads = Runtime.getRuntime().availableProcessors() - 1;
-        if (threads < 1) {
-            threads = 1;
-        }
+        int threads = Runtime.getRuntime().availableProcessors();
         int totalKeys = 1_000;
         AtomicInteger counter = new AtomicInteger();
         SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, 
maximumCacheSize, v -> counter.incrementAndGet());
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index ea7e614c805..b6996a3f215 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -25,7 +25,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -56,11 +56,7 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
     /**
      * The last changes recorded.
      */
-    private volatile Deque<Entry<K, V>> lastChanges = new 
ConcurrentLinkedDeque<>();
-    /**
-     * The total amount of changes recorded.
-     */
-    private final LongAdder totalChanges = new LongAdder();
+    private final AtomicReference<Deque<Entry<K, V>>> lastChanges = new 
AtomicReference<>(new ConcurrentLinkedDeque<>());
     /**
      * The function to call when an entry is evicted.
      */
@@ -88,8 +84,7 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
         if (value == null) {
             return null;
         }
-        lastChanges.add(Map.entry(key, value));
-        totalChanges.increment();
+        lastChanges.get().add(Map.entry(key, value));
         return value;
     }
 
@@ -231,7 +226,7 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
      * @return the size of the queue of changes.
      */
     int getQueueSize() {
-        return totalChanges.intValue();
+        return lastChanges.get().size();
     }
 
     /**
@@ -266,27 +261,20 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
      * @return the oldest existing change.
      */
     private Entry<K, V> nextOldestChange() {
-        Entry<K, V> oldest = lastChanges.poll();
-        if (oldest != null) {
-            totalChanges.decrement();
-        }
-        return oldest;
+        return lastChanges.get().poll();
     }
 
     /**
      * Removes duplicates from the queue of changes.
      */
     private void compressChanges() {
-        Deque<Entry<K, V>> currentChanges = this.lastChanges;
         Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
-        this.lastChanges = newChanges;
+        Deque<Entry<K, V>> currentChanges = lastChanges.getAndSet(newChanges);
         Set<K> keys = new HashSet<>();
         Entry<K, V> entry;
         while ((entry = currentChanges.pollLast()) != null) {
             if (keys.add(entry.getKey())) {
                 newChanges.addFirst(entry);
-            } else {
-                totalChanges.decrement();
             }
         }
     }

Reply via email to