This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.4.x by this push:
     new 1b3d20ff0d5 CAMEL-20850: camel-core - compress changes in 
SimpleLRUCache (#14482)
1b3d20ff0d5 is described below

commit 1b3d20ff0d55d438040c9e2c084b4ca3f571ee23
Author: Nicolas Filotto <essob...@users.noreply.github.com>
AuthorDate: Tue Jun 11 19:16:48 2024 +0200

    CAMEL-20850: camel-core - compress changes in SimpleLRUCache (#14482)
    
    ## Motivation
    
    When the queue of changes is full, some entries can be evicted while the 
cache is not, which is not expected.
    
    ## Modifications:
    
    * Compress the changes by removing duplicates before the eviction
    * Add a minimum size for the queue of changes
---
 .../camel/support/cache/SimpleLRUCacheTest.java    | 131 +++++++++++++++++----
 .../apache/camel/support/cache/SimpleLRUCache.java |  69 +++++++++--
 2 files changed, 168 insertions(+), 32 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
index d7e9f4549a5..c6464c71d09 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java
@@ -19,11 +19,17 @@ package org.apache.camel.support.cache;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import static org.apache.camel.support.cache.SimpleLRUCache.MINIMUM_QUEUE_SIZE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -56,30 +62,17 @@ class SimpleLRUCacheTest {
     @Test
     void queueSize() {
         assertEquals(0, map.getQueueSize());
-        map.put("1", "1");
+        for (int i = 1; i <= MINIMUM_QUEUE_SIZE; i++) {
+            map.put("1", Integer.toString(i));
+            assertEquals(1, map.size());
+            assertEquals(i, map.getQueueSize());
+        }
+        map.put("1", "A");
         assertEquals(1, map.size());
         assertEquals(1, map.getQueueSize());
-        map.put("1", "2");
+        map.put("1", "B");
         assertEquals(1, map.size());
         assertEquals(2, map.getQueueSize());
-        map.put("1", "3");
-        assertEquals(1, map.size());
-        assertEquals(3, map.getQueueSize());
-        map.put("1", "4");
-        assertEquals(1, map.size());
-        assertEquals(4, map.getQueueSize());
-        map.put("1", "5");
-        assertEquals(1, map.size());
-        assertEquals(5, map.getQueueSize());
-        map.put("1", "6");
-        assertEquals(1, map.size());
-        assertEquals(6, map.getQueueSize());
-        map.put("1", "7");
-        assertEquals(1, map.size());
-        assertEquals(6, map.getQueueSize());
-        map.put("1", "8");
-        assertEquals(1, map.size());
-        assertEquals(6, map.getQueueSize());
     }
 
     @Test
@@ -287,4 +280,102 @@ class SimpleLRUCacheTest {
         assertEquals(3, map.size());
         assertEquals(0, consumed.size());
     }
+
+    @Test
+    void ignoreDuplicates() {
+        assertEquals(0, map.size());
+        for (int i = 0; i < 100; i++) {
+            map.put("1", Integer.toString(i));
+            assertEquals(1, map.size(), String.format("The expected size is 1 
but it fails after %d puts", i + 1));
+        }
+        assertEquals("99", map.get("1"));
+        assertNull(map.put("2", "Two"));
+        assertEquals(2, map.size());
+        assertEquals("99", map.get("1"));
+        assertNull(map.put("3", "Three"));
+        assertEquals(3, map.size());
+        assertEquals(0, consumed.size());
+        assertEquals("99", map.get("1"));
+        assertNull(map.put("4", "Four"));
+        assertEquals(3, map.size());
+        assertEquals(1, consumed.size());
+        assertFalse(map.containsKey("1"));
+        assertTrue(consumed.contains("99"));
+    }
+
+    @Test
+    void ensureEvictionOrdering() {
+        assertEquals(0, map.size());
+        assertNull(map.put("1", "One"));
+        assertNotNull(map.put("1", "One"));
+        assertNotNull(map.put("1", "One"));
+        assertNotNull(map.put("1", "One"));
+        assertNotNull(map.put("1", "One"));
+        assertNotNull(map.put("1", "One"));
+        assertNull(map.put("2", "Two"));
+        assertNotNull(map.put("1", "One"));
+        assertNull(map.put("3", "Three"));
+        assertEquals(3, map.size());
+        assertNull(map.put("4", "Four"));
+        assertEquals(3, map.size());
+        assertEquals(1, consumed.size());
+        assertFalse(map.containsKey("2"));
+        assertTrue(consumed.contains("Two"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 1_000 })
+    void concurrentPut(int maximumCacheSize) throws Exception {
+        int threads = Runtime.getRuntime().availableProcessors();
+        int totalKeysPerThread = 1_000;
+        AtomicInteger counter = new AtomicInteger();
+        SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, 
maximumCacheSize, v -> counter.incrementAndGet());
+        CountDownLatch latch = new CountDownLatch(threads);
+        for (int i = 0; i < threads; i++) {
+            int threadId = i;
+            new Thread(() -> {
+                try {
+                    for (int j = 0; j < totalKeysPerThread; j++) {
+                        cache.put(threadId + "-" + j, Integer.toString(j));
+                    }
+                } finally {
+                    latch.countDown();
+                }
+            }).start();
+        }
+        latch.await();
+        assertEquals(maximumCacheSize, cache.size());
+        assertEquals(totalKeysPerThread * threads - maximumCacheSize, 
counter.get());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 500 })
+    void concurrentPutWithCollisions(int maximumCacheSize) throws Exception {
+        int threads = Runtime.getRuntime().availableProcessors();
+        int totalKeys = 1_000;
+        AtomicInteger counter = new AtomicInteger();
+        SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, 
maximumCacheSize, v -> counter.incrementAndGet());
+        CountDownLatch latch = new CountDownLatch(threads);
+        for (int i = 0; i < threads; i++) {
+            new Thread(() -> {
+                try {
+                    for (int j = 0; j < totalKeys; j++) {
+                        cache.put(Integer.toString(j), Integer.toString(j));
+                    }
+                } finally {
+                    latch.countDown();
+                }
+            }).start();
+        }
+        latch.await();
+        assertEquals(maximumCacheSize, cache.size());
+        counter.set(0);
+        for (int j = 0; j < maximumCacheSize; j++) {
+            cache.put(Integer.toString(j), "OK");
+        }
+        assertEquals(maximumCacheSize, counter.get());
+        for (int j = 0; j < maximumCacheSize; j++) {
+            assertEquals("OK", cache.get(Integer.toString(j)));
+        }
+    }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index c36c1640259..c1fcb77d41b 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -17,12 +17,13 @@
 package org.apache.camel.support.cache;
 
 import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
@@ -40,6 +41,10 @@ import java.util.function.Function;
 public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> {
 
     static final float DEFAULT_LOAD_FACTOR = 0.75f;
+    /**
+     * The minimum size of the queue of changes.
+     */
+    static final int MINIMUM_QUEUE_SIZE = 128;
     /**
      * The flag indicating that an eviction process is in progress.
      */
@@ -51,7 +56,7 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
     /**
      * The last changes recorded.
      */
-    private final Queue<Entry<K, V>> lastChanges = new 
ConcurrentLinkedQueue<>();
+    private volatile Deque<Entry<K, V>> lastChanges = new 
ConcurrentLinkedDeque<>();
     /**
      * The total amount of changes recorded.
      */
@@ -227,14 +232,31 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
     }
 
     /**
-     * Indicates whether an eviction is needed. An eviction can be triggered 
if the size of the map or the queue of
-     * changes exceeds the maximum allowed size which is respectively {@code 
maximumCacheSize} and
-     * {@code 2 * maximumCacheSize}.
+     * Indicates whether an eviction is needed. An eviction can be triggered 
if either the cache or the queue is full.
      *
      * @return {@code true} if an eviction is needed, {@code false} otherwise.
      */
     private boolean evictionNeeded() {
-        return size() > maximumCacheSize || getQueueSize() > 2 * 
maximumCacheSize;
+        return isCacheFull() || isQueueFull();
+    }
+
+    /**
+     * Indicates whether the size of the map exceeds the maximum allowed size 
which is {@code maximumCacheSize}.
+     *
+     * @return {@code true} if the cache is full, {@code false} otherwise.
+     */
+    private boolean isCacheFull() {
+        return size() > maximumCacheSize;
+    }
+
+    /**
+     * Indicates whether the size of the queue of changes exceeds the maximum 
allowed size which is the max value
+     * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}.
+     *
+     * @return {@code true} if the queue is full, {@code false} otherwise.
+     */
+    private boolean isQueueFull() {
+        return getQueueSize() > Math.max(2 * maximumCacheSize, 
MINIMUM_QUEUE_SIZE);
     }
 
     /**
@@ -248,6 +270,24 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
         return oldest;
     }
 
+    /**
+     * Removes duplicates from the queue of changes.
+     */
+    private void compressChanges() {
+        Deque<Entry<K, V>> currentChanges = this.lastChanges;
+        Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
+        this.lastChanges = newChanges;
+        Set<K> keys = new HashSet<>();
+        Entry<K, V> entry;
+        while ((entry = currentChanges.pollLast()) != null) {
+            if (keys.add(entry.getKey())) {
+                newChanges.addFirst(entry);
+            } else {
+                totalChanges.decrement();
+            }
+        }
+    }
+
     /**
      * The internal context of all write operations.
      */
@@ -274,12 +314,17 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
         public void close() {
             if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, 
true)) {
                 try {
-                    while (cache.evictionNeeded()) {
-                        Entry<K, V> oldest = cache.nextOldestChange();
-                        if (oldest != null && cache.remove(oldest.getKey(), 
oldest.getValue())) {
-                            cache.evict.accept(oldest.getValue());
+                    do {
+                        cache.compressChanges();
+                        if (cache.isCacheFull()) {
+                            Entry<K, V> oldest = cache.nextOldestChange();
+                            if (oldest != null && 
cache.remove(oldest.getKey(), oldest.getValue())) {
+                                cache.evict.accept(oldest.getValue());
+                            }
+                        } else {
+                            break;
                         }
-                    }
+                    } while (cache.evictionNeeded());
                 } finally {
                     cache.eviction.set(false);
                 }

Reply via email to