This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-20850/compress-changes in repository https://gitbox.apache.org/repos/asf/camel.git
commit b5b913d5295ff942ca8023ae5e2f1f052158456f Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Fri Jun 7 22:50:07 2024 +0200 CAMEL-20850: camel-core - compress changes in SimpleLRUCache --- .../camel/support/cache/SimpleLRUCacheTest.java | 25 ++++++++- .../apache/camel/support/cache/SimpleLRUCache.java | 60 ++++++++++++++++++---- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java index d7e9f4549a5..cee0ae019e1 100644 --- a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java @@ -76,10 +76,10 @@ class SimpleLRUCacheTest { assertEquals(6, map.getQueueSize()); map.put("1", "7"); assertEquals(1, map.size()); - assertEquals(6, map.getQueueSize()); + assertEquals(1, map.getQueueSize()); map.put("1", "8"); assertEquals(1, map.size()); - assertEquals(6, map.getQueueSize()); + assertEquals(2, map.getQueueSize()); } @Test @@ -287,4 +287,25 @@ class SimpleLRUCacheTest { assertEquals(3, map.size()); assertEquals(0, consumed.size()); } + + @Test + void ignoreDuplicates() { + assertEquals(0, map.size()); + for (int i = 0; i < 100; i++) { + map.put("1", Integer.toString(i)); + assertEquals(1, map.size(), String.format("The expected size is 1 but it fails after %d puts", i + 1)); + } + assertEquals("99", map.get("1")); + assertNull(map.put("2", "Two")); + assertEquals(2, map.size()); + assertEquals("99", map.get("1")); + assertNull(map.put("3", "Three")); + assertEquals(3, map.size()); + assertEquals(0, consumed.size()); + assertEquals("99", map.get("1")); + assertNull(map.put("4", "Four")); + assertEquals(3, map.size()); + assertEquals(1, consumed.size()); + assertFalse(map.containsKey("1")); + } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index c36c1640259..c232ee01512 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -17,12 +17,13 @@ package org.apache.camel.support.cache; import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; @@ -51,7 +52,7 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { /** * The last changes recorded. */ - private final Queue<Entry<K, V>> lastChanges = new ConcurrentLinkedQueue<>(); + private volatile Deque<Entry<K, V>> lastChanges = new ConcurrentLinkedDeque<>(); /** * The total amount of changes recorded. */ @@ -227,14 +228,31 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { } /** - * Indicates whether an eviction is needed. An eviction can be triggered if the size of the map or the queue of - * changes exceeds the maximum allowed size which is respectively {@code maximumCacheSize} and - * {@code 2 * maximumCacheSize}. + * Indicates whether an eviction is needed. An eviction can be triggered if either the cache or the queue is full. * * @return {@code true} if an eviction is needed, {@code false} otherwise. */ private boolean evictionNeeded() { - return size() > maximumCacheSize || getQueueSize() > 2 * maximumCacheSize; + return isCacheFull() || isQueueFull(); + } + + /** + * Indicates whether the size of the map exceeds the maximum allowed size which is {@code maximumCacheSize}. + * + * @return {@code true} if the cache is full, {@code false} otherwise. + */ + private boolean isCacheFull() { + return size() > maximumCacheSize; + } + + /** + * Indicates whether the size of the queue of changes exceeds the maximum allowed size which is {@code 2 * + * maximumCacheSize}. + * + * @return {@code true} if the queue is full, {@code false} otherwise. + */ + private boolean isQueueFull() { + return getQueueSize() > 2 * maximumCacheSize; } /** @@ -248,6 +266,24 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { return oldest; } + /** + * Removes duplicates from the queue of changes. + */ + private void compressChanges() { + Deque<Entry<K, V>> currentChanges = this.lastChanges; + Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>(); + this.lastChanges = newChanges; + Set<K> keys = new HashSet<>(); + Entry<K, V> entry; + while ((entry = currentChanges.pollLast()) != null) { + if (keys.add(entry.getKey())) { + newChanges.addFirst(entry); + } else { + totalChanges.decrement(); + } + } + } + /** * The internal context of all write operations. */ @@ -275,9 +311,13 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { try { while (cache.evictionNeeded()) { - Entry<K, V> oldest = cache.nextOldestChange(); - if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { - cache.evict.accept(oldest.getValue()); + if (cache.isCacheFull()) { + Entry<K, V> oldest = cache.nextOldestChange(); + if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { + cache.evict.accept(oldest.getValue()); + } + } else { + cache.compressChanges(); } } } finally {