This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch camel-4.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push: new f31747325ec CAMEL-20850: camel-core - compress changes in SimpleLRUCache (#14483) f31747325ec is described below commit f31747325ec7de8c88bb5123bf446dcd5fb8786a Author: Nicolas Filotto <essob...@users.noreply.github.com> AuthorDate: Tue Jun 11 19:16:05 2024 +0200 CAMEL-20850: camel-core - compress changes in SimpleLRUCache (#14483) ## Motivation When the queue of changes is full, some entries can be evicted while the cache is not, which is not expected. ## Modifications: * Compress the changes by removing duplicates before the eviction * Add a minimum size for the queue of changes --- .../camel/support/cache/SimpleLRUCacheTest.java | 131 +++++++++++++++++---- .../apache/camel/support/cache/SimpleLRUCache.java | 69 +++++++++-- 2 files changed, 168 insertions(+), 32 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java index d7e9f4549a5..c6464c71d09 100644 --- a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java @@ -19,11 +19,17 @@ package org.apache.camel.support.cache; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.apache.camel.support.cache.SimpleLRUCache.MINIMUM_QUEUE_SIZE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -56,30 +62,17 @@ class SimpleLRUCacheTest { @Test void queueSize() { assertEquals(0, map.getQueueSize()); - map.put("1", "1"); + for (int i = 1; i <= MINIMUM_QUEUE_SIZE; i++) { + map.put("1", Integer.toString(i)); + assertEquals(1, map.size()); + assertEquals(i, map.getQueueSize()); + } + map.put("1", "A"); assertEquals(1, map.size()); assertEquals(1, map.getQueueSize()); - map.put("1", "2"); + map.put("1", "B"); assertEquals(1, map.size()); assertEquals(2, map.getQueueSize()); - map.put("1", "3"); - assertEquals(1, map.size()); - assertEquals(3, map.getQueueSize()); - map.put("1", "4"); - assertEquals(1, map.size()); - assertEquals(4, map.getQueueSize()); - map.put("1", "5"); - assertEquals(1, map.size()); - assertEquals(5, map.getQueueSize()); - map.put("1", "6"); - assertEquals(1, map.size()); - assertEquals(6, map.getQueueSize()); - map.put("1", "7"); - assertEquals(1, map.size()); - assertEquals(6, map.getQueueSize()); - map.put("1", "8"); - assertEquals(1, map.size()); - assertEquals(6, map.getQueueSize()); } @Test @@ -287,4 +280,102 @@ class SimpleLRUCacheTest { assertEquals(3, map.size()); assertEquals(0, consumed.size()); } + + @Test + void ignoreDuplicates() { + assertEquals(0, map.size()); + for (int i = 0; i < 100; i++) { + map.put("1", Integer.toString(i)); + assertEquals(1, map.size(), String.format("The expected size is 1 but it fails after %d puts", i + 1)); + } + assertEquals("99", map.get("1")); + assertNull(map.put("2", "Two")); + assertEquals(2, map.size()); + assertEquals("99", map.get("1")); + assertNull(map.put("3", "Three")); + assertEquals(3, map.size()); + assertEquals(0, consumed.size()); + assertEquals("99", map.get("1")); + assertNull(map.put("4", "Four")); + assertEquals(3, map.size()); + assertEquals(1, consumed.size()); + assertFalse(map.containsKey("1")); + assertTrue(consumed.contains("99")); + } + + @Test + void ensureEvictionOrdering() { + assertEquals(0, map.size()); + assertNull(map.put("1", "One")); + assertNotNull(map.put("1", "One")); + assertNotNull(map.put("1", "One")); + assertNotNull(map.put("1", "One")); + assertNotNull(map.put("1", "One")); + assertNotNull(map.put("1", "One")); + assertNull(map.put("2", "Two")); + assertNotNull(map.put("1", "One")); + assertNull(map.put("3", "Three")); + assertEquals(3, map.size()); + assertNull(map.put("4", "Four")); + assertEquals(3, map.size()); + assertEquals(1, consumed.size()); + assertFalse(map.containsKey("2")); + assertTrue(consumed.contains("Two")); + } + + @ParameterizedTest + @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 1_000 }) + void concurrentPut(int maximumCacheSize) throws Exception { + int threads = Runtime.getRuntime().availableProcessors(); + int totalKeysPerThread = 1_000; + AtomicInteger counter = new AtomicInteger(); + SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, maximumCacheSize, v -> counter.incrementAndGet()); + CountDownLatch latch = new CountDownLatch(threads); + for (int i = 0; i < threads; i++) { + int threadId = i; + new Thread(() -> { + try { + for (int j = 0; j < totalKeysPerThread; j++) { + cache.put(threadId + "-" + j, Integer.toString(j)); + } + } finally { + latch.countDown(); + } + }).start(); + } + latch.await(); + assertEquals(maximumCacheSize, cache.size()); + assertEquals(totalKeysPerThread * threads - maximumCacheSize, counter.get()); + } + + @ParameterizedTest + @ValueSource(ints = { 1, 2, 5, 10, 20, 50, 100, 500 }) + void concurrentPutWithCollisions(int maximumCacheSize) throws Exception { + int threads = Runtime.getRuntime().availableProcessors(); + int totalKeys = 1_000; + AtomicInteger counter = new AtomicInteger(); + SimpleLRUCache<String, String> cache = new SimpleLRUCache<>(16, maximumCacheSize, v -> counter.incrementAndGet()); + CountDownLatch latch = new CountDownLatch(threads); + for (int i = 0; i < threads; i++) { + new Thread(() -> { + try { + for (int j = 0; j < totalKeys; j++) { + cache.put(Integer.toString(j), Integer.toString(j)); + } + } finally { + latch.countDown(); + } + }).start(); + } + latch.await(); + assertEquals(maximumCacheSize, cache.size()); + counter.set(0); + for (int j = 0; j < maximumCacheSize; j++) { + cache.put(Integer.toString(j), "OK"); + } + assertEquals(maximumCacheSize, counter.get()); + for (int j = 0; j < maximumCacheSize; j++) { + assertEquals("OK", cache.get(Integer.toString(j))); + } + } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index c36c1640259..c1fcb77d41b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -17,12 +17,13 @@ package org.apache.camel.support.cache; import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; @@ -40,6 +41,10 @@ import java.util.function.Function; public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { static final float DEFAULT_LOAD_FACTOR = 0.75f; + /** + * The minimum size of the queue of changes. + */ + static final int MINIMUM_QUEUE_SIZE = 128; /** * The flag indicating that an eviction process is in progress. */ @@ -51,7 +56,7 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { /** * The last changes recorded. */ - private final Queue<Entry<K, V>> lastChanges = new ConcurrentLinkedQueue<>(); + private volatile Deque<Entry<K, V>> lastChanges = new ConcurrentLinkedDeque<>(); /** * The total amount of changes recorded. */ @@ -227,14 +232,31 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { } /** - * Indicates whether an eviction is needed. An eviction can be triggered if the size of the map or the queue of - * changes exceeds the maximum allowed size which is respectively {@code maximumCacheSize} and - * {@code 2 * maximumCacheSize}. + * Indicates whether an eviction is needed. An eviction can be triggered if either the cache or the queue is full. * * @return {@code true} if an eviction is needed, {@code false} otherwise. */ private boolean evictionNeeded() { - return size() > maximumCacheSize || getQueueSize() > 2 * maximumCacheSize; + return isCacheFull() || isQueueFull(); + } + + /** + * Indicates whether the size of the map exceeds the maximum allowed size which is {@code maximumCacheSize}. + * + * @return {@code true} if the cache is full, {@code false} otherwise. + */ + private boolean isCacheFull() { + return size() > maximumCacheSize; + } + + /** + * Indicates whether the size of the queue of changes exceeds the maximum allowed size which is the max value + * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}. + * + * @return {@code true} if the queue is full, {@code false} otherwise. + */ + private boolean isQueueFull() { + return getQueueSize() > Math.max(2 * maximumCacheSize, MINIMUM_QUEUE_SIZE); } /** @@ -248,6 +270,24 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { return oldest; } + /** + * Removes duplicates from the queue of changes. + */ + private void compressChanges() { + Deque<Entry<K, V>> currentChanges = this.lastChanges; + Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>(); + this.lastChanges = newChanges; + Set<K> keys = new HashSet<>(); + Entry<K, V> entry; + while ((entry = currentChanges.pollLast()) != null) { + if (keys.add(entry.getKey())) { + newChanges.addFirst(entry); + } else { + totalChanges.decrement(); + } + } + } + /** * The internal context of all write operations. */ @@ -274,12 +314,17 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { public void close() { if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { try { - while (cache.evictionNeeded()) { - Entry<K, V> oldest = cache.nextOldestChange(); - if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { - cache.evict.accept(oldest.getValue()); + do { + cache.compressChanges(); + if (cache.isCacheFull()) { + Entry<K, V> oldest = cache.nextOldestChange(); + if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { + cache.evict.accept(oldest.getValue()); + } + } else { + break; } - } + } while (cache.evictionNeeded()); } finally { cache.eviction.set(false); }