This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-20850/compress-changes
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b5b913d5295ff942ca8023ae5e2f1f052158456f
Author: Nicolas Filotto <nfilo...@talend.com>
AuthorDate: Fri Jun 7 22:50:07 2024 +0200

    CAMEL-20850: camel-core - compress changes in SimpleLRUCache
---
 .../camel/support/cache/SimpleLRUCacheTest.java    | 25 ++++++++-
 .../apache/camel/support/cache/SimpleLRUCache.java | 60 ++++++++++++++++++----
 2 files changed, 73 insertions(+), 12 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
index d7e9f4549a5..cee0ae019e1 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
@@ -76,10 +76,10 @@ class SimpleLRUCacheTest {
         assertEquals(6, map.getQueueSize());
         map.put("1", "7");
         assertEquals(1, map.size());
-        assertEquals(6, map.getQueueSize());
+        assertEquals(1, map.getQueueSize());
         map.put("1", "8");
         assertEquals(1, map.size());
-        assertEquals(6, map.getQueueSize());
+        assertEquals(2, map.getQueueSize());
     }
 
     @Test
@@ -287,4 +287,25 @@ class SimpleLRUCacheTest {
         assertEquals(3, map.size());
         assertEquals(0, consumed.size());
     }
+
+    @Test
+    void ignoreDuplicates() {
+        assertEquals(0, map.size());
+        for (int i = 0; i < 100; i++) {
+            map.put("1", Integer.toString(i));
+            assertEquals(1, map.size(), String.format("The expected size is 1 
but it fails after %d puts", i + 1));
+        }
+        assertEquals("99", map.get("1"));
+        assertNull(map.put("2", "Two"));
+        assertEquals(2, map.size());
+        assertEquals("99", map.get("1"));
+        assertNull(map.put("3", "Three"));
+        assertEquals(3, map.size());
+        assertEquals(0, consumed.size());
+        assertEquals("99", map.get("1"));
+        assertNull(map.put("4", "Four"));
+        assertEquals(3, map.size());
+        assertEquals(1, consumed.size());
+        assertFalse(map.containsKey("1"));
+    }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index c36c1640259..c232ee01512 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -17,12 +17,13 @@
 package org.apache.camel.support.cache;
 
 import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
@@ -51,7 +52,7 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
     /**
      * The last changes recorded.
      */
-    private final Queue<Entry<K, V>> lastChanges = new 
ConcurrentLinkedQueue<>();
+    private volatile Deque<Entry<K, V>> lastChanges = new 
ConcurrentLinkedDeque<>();
     /**
      * The total amount of changes recorded.
      */
@@ -227,14 +228,31 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
     }
 
     /**
-     * Indicates whether an eviction is needed. An eviction can be triggered 
if the size of the map or the queue of
-     * changes exceeds the maximum allowed size which is respectively {@code 
maximumCacheSize} and
-     * {@code 2 * maximumCacheSize}.
+     * Indicates whether an eviction is needed. An eviction can be triggered 
if either the cache or the queue is full.
      *
      * @return {@code true} if an eviction is needed, {@code false} otherwise.
      */
     private boolean evictionNeeded() {
-        return size() > maximumCacheSize || getQueueSize() > 2 * 
maximumCacheSize;
+        return isCacheFull() || isQueueFull();
+    }
+
+    /**
+     * Indicates whether the size of the map exceeds the maximum allowed size 
which is {@code maximumCacheSize}.
+     *
+     * @return {@code true} if the cache is full, {@code false} otherwise.
+     */
+    private boolean isCacheFull() {
+        return size() > maximumCacheSize;
+    }
+
+    /**
+     * Indicates whether the size of the queue of changes exceeds the maximum 
allowed size which is {@code 2 *
+     * maximumCacheSize}.
+     *
+     * @return {@code true} if the queue is full, {@code false} otherwise.
+     */
+    private boolean isQueueFull() {
+        return getQueueSize() > 2 * maximumCacheSize;
     }
 
     /**
@@ -248,6 +266,24 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
         return oldest;
     }
 
+    /**
+     * Removes duplicates from the queue of changes.
+     */
+    private void compressChanges() {
+        Deque<Entry<K, V>> currentChanges = this.lastChanges;
+        Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
+        this.lastChanges = newChanges;
+        Set<K> keys = new HashSet<>();
+        Entry<K, V> entry;
+        while ((entry = currentChanges.pollLast()) != null) {
+            if (keys.add(entry.getKey())) {
+                newChanges.addFirst(entry);
+            } else {
+                totalChanges.decrement();
+            }
+        }
+    }
+
     /**
      * The internal context of all write operations.
      */
@@ -275,9 +311,13 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
             if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, 
true)) {
                 try {
                     while (cache.evictionNeeded()) {
-                        Entry<K, V> oldest = cache.nextOldestChange();
-                        if (oldest != null && cache.remove(oldest.getKey(), 
oldest.getValue())) {
-                            cache.evict.accept(oldest.getValue());
+                        if (cache.isCacheFull()) {
+                            Entry<K, V> oldest = cache.nextOldestChange();
+                            if (oldest != null && 
cache.remove(oldest.getKey(), oldest.getValue())) {
+                                cache.evict.accept(oldest.getValue());
+                            }
+                        } else {
+                            cache.compressChanges();
                         }
                     }
                 } finally {

Reply via email to