This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push: new 7e47d618145 CAMEL-21888: camel-core - Compress changes only if needed - 4.10 (#17559) 7e47d618145 is described below commit 7e47d618145c4f3ffebc9338a66e66b054fbf919 Author: Nicolas Filotto <essob...@users.noreply.github.com> AuthorDate: Mon Mar 24 16:29:21 2025 +0100 CAMEL-21888: camel-core - Compress changes only if needed - 4.10 (#17559) ## Motivation When the cache size is big, compressing the changes is costly in terms of CPU so it needs to be avoided when possible ## Modifications: * Only compress changes when the queue is full * Add a value holder to uniquely identify each key/value pair to avoid evicting a key/value pair that has been added multiple times to the cache * Avoid an O(N) to get the size of the changes by counting them using an AtomicInteger. --- .../camel/support/cache/SimpleLRUCacheTest.java | 142 ++++++++++ .../apache/camel/support/cache/SimpleLRUCache.java | 294 +++++++++++++++++---- 2 files changed, 387 insertions(+), 49 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java index cb1a17ee60d..58e44a4b215 100644 --- a/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/cache/SimpleLRUCacheTest.java @@ -17,6 +17,7 @@ package org.apache.camel.support.cache; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -60,9 +61,12 @@ class SimpleLRUCacheTest { assertNull(map.put("1", "One")); assertEquals(1, map.size()); assertEquals(1, map.getQueueSize()); + assertEquals("One", map.get("1")); map.entrySet().iterator().next().setValue("bar"); assertEquals(1, map.size()); assertEquals(2, map.getQueueSize()); + assertEquals("bar", map.get("1")); + assertThrows(NullPointerException.class, () -> map.entrySet().iterator().next().setValue(null)); } @Test @@ -79,6 +83,80 @@ class SimpleLRUCacheTest { map.put("1", "B"); assertEquals(1, map.size()); assertEquals(2, map.getQueueSize()); + map.put("2", "A"); + assertEquals(2, map.size()); + assertEquals(3, map.getQueueSize()); + map.put("3", "A"); + assertEquals(3, map.size()); + assertEquals(4, map.getQueueSize()); + map.put("4", "A"); + assertEquals(3, map.size()); + assertEquals(3, map.getQueueSize()); + } + + @Test + void size() { + assertEquals(0, map.size()); + assertNull(map.put("1", "One")); + assertEquals(1, map.size()); + } + + @Test + void isEmpty() { + assertTrue(map.isEmpty()); + assertNull(map.put("1", "One")); + assertFalse(map.isEmpty()); + map.remove("1"); + assertTrue(map.isEmpty()); + } + + @Test + void containsKey() { + assertFalse(map.containsKey("1")); + assertNull(map.put("1", "One")); + assertTrue(map.containsKey("1")); + map.remove("1"); + assertFalse(map.containsKey("1")); + assertThrows(NullPointerException.class, () -> map.containsKey(null)); + } + + @Test + void containsValue() { + assertFalse(map.containsValue("One")); + assertNull(map.put("1", "One")); + assertTrue(map.containsValue("One")); + map.remove("1"); + assertFalse(map.containsValue("One")); + assertThrows(NullPointerException.class, () -> map.containsValue(null)); + } + + @Test + void remove() { + assertTrue(map.isEmpty()); + map.remove("1"); + assertTrue(map.isEmpty()); + assertNull(map.put("1", "One")); + assertFalse(map.isEmpty()); + map.remove("1"); + assertTrue(map.isEmpty()); + assertThrows(NullPointerException.class, () -> map.remove(null)); + } + + @Test + void removeWithValue() { + assertTrue(map.isEmpty()); + map.remove("1", "One"); + assertTrue(map.isEmpty()); + assertNull(map.put("1", "One")); + assertFalse(map.isEmpty()); + map.remove("1", "Two"); + assertFalse(map.isEmpty()); + map.remove("2", "One"); + assertFalse(map.isEmpty()); + map.remove("1", "One"); + assertTrue(map.isEmpty()); + assertThrows(NullPointerException.class, () -> map.remove(null, "A")); + assertThrows(NullPointerException.class, () -> map.remove("A", null)); } @Test @@ -101,6 +179,54 @@ class SimpleLRUCacheTest { assertEquals(1, consumed.size()); assertTrue(map.containsKey("2")); assertEquals("Two v2", map.get("2")); + assertThrows(NullPointerException.class, () -> map.put("A", null)); + assertThrows(NullPointerException.class, () -> map.put(null, "A")); + } + + @Test + void putAll() { + assertEquals(0, map.size()); + Map<String, String> data = new LinkedHashMap<>(); + data.put("1", "One"); + data.put("2", "Two"); + data.put("3", "Three"); + map.putAll(data); + assertEquals(3, map.size()); + assertEquals(0, consumed.size()); + data.clear(); + data.put("4", "Four"); + data.put("5", "Five"); + map.putAll(data); + assertEquals(3, map.size()); + assertEquals(2, consumed.size()); + assertFalse(map.containsKey("1")); + assertFalse(map.containsKey("2")); + assertTrue(consumed.contains("One")); + assertTrue(consumed.contains("Two")); + assertThrows(NullPointerException.class, () -> map.putAll(null)); + } + + @Test + void clear() { + assertEquals(0, map.size()); + map.putAll(Map.of("1", "One", "2", "Two", "3", "Three")); + assertEquals(3, map.size()); + map.clear(); + assertEquals(0, map.size()); + } + + @Test + void replaceAll() { + map.replaceAll((k, v) -> v + " v2"); + assertEquals(0, map.size()); + map.putAll(Map.of("1", "One", "2", "Two", "3", "Three")); + assertEquals(3, map.size()); + map.replaceAll((k, v) -> v + " v2"); + assertEquals(3, map.size()); + assertEquals("One v2", map.get("1")); + assertEquals("Two v2", map.get("2")); + assertEquals("Three v2", map.get("3")); + assertThrows(NullPointerException.class, () -> map.replaceAll(null)); } @Test @@ -128,6 +254,8 @@ class SimpleLRUCacheTest { assertEquals(2, consumed.size()); assertFalse(map.containsKey("2")); assertTrue(consumed.contains("Two")); + assertThrows(NullPointerException.class, () -> map.putIfAbsent("A", null)); + assertThrows(NullPointerException.class, () -> map.putIfAbsent(null, "A")); } @Test @@ -164,6 +292,8 @@ class SimpleLRUCacheTest { assertEquals("Five", map.computeIfAbsent("5", k -> null)); assertEquals(3, map.size()); assertEquals(2, consumed.size()); + assertThrows(NullPointerException.class, () -> map.computeIfAbsent(null, k -> null)); + assertThrows(NullPointerException.class, () -> map.computeIfAbsent("A", null)); } @Test @@ -189,6 +319,8 @@ class SimpleLRUCacheTest { assertEquals(2, map.size()); assertEquals(0, consumed.size()); assertFalse(map.containsKey("1")); + assertThrows(NullPointerException.class, () -> map.computeIfPresent(null, (k, v) -> null)); + assertThrows(NullPointerException.class, () -> map.computeIfPresent("A", null)); } @Test @@ -215,6 +347,8 @@ class SimpleLRUCacheTest { assertEquals(2, map.size()); assertEquals(1, consumed.size()); assertFalse(map.containsKey("2")); + assertThrows(NullPointerException.class, () -> map.compute(null, (k, v) -> null)); + assertThrows(NullPointerException.class, () -> map.compute("A", null)); } @Test @@ -238,6 +372,9 @@ class SimpleLRUCacheTest { assertNull(map.merge("2", "V2", (v1, v2) -> null)); assertEquals(2, map.size()); assertEquals(1, consumed.size()); + assertThrows(NullPointerException.class, () -> map.merge("A", "B", null)); + assertThrows(NullPointerException.class, () -> map.merge("A", null, (v1, v2) -> null)); + assertThrows(NullPointerException.class, () -> map.merge(null, "A", (v1, v2) -> null)); } @Test @@ -259,6 +396,8 @@ class SimpleLRUCacheTest { assertEquals("Three v2", map.get("3")); assertEquals(3, map.size()); assertEquals(0, consumed.size()); + assertThrows(NullPointerException.class, () -> map.replace("A", null)); + assertThrows(NullPointerException.class, () -> map.replace(null, "A")); } @Test @@ -285,6 +424,9 @@ class SimpleLRUCacheTest { assertEquals("Three v2", map.get("3")); assertEquals(3, map.size()); assertEquals(0, consumed.size()); + assertThrows(NullPointerException.class, () -> map.replace("A", "B", null)); + assertThrows(NullPointerException.class, () -> map.replace("A", null, "B")); + assertThrows(NullPointerException.class, () -> map.replace(null, "A", "B")); } @Test diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index 7c1b76e601a..5c3e79be57f 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -16,7 +16,7 @@ */ package org.apache.camel.support.cache; -import java.util.Collections; +import java.util.Collection; import java.util.Deque; import java.util.HashSet; import java.util.Map; @@ -25,12 +25,15 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; /** * {@code SimpleLRUCache} is a simple implementation of a cache of type Least Recently Used . The implementation doesn't @@ -40,7 +43,7 @@ import java.util.function.Function; * @param <K> type of the key * @param <V> type of the value */ -public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { +public class SimpleLRUCache<K, V> implements Map<K, V> { static final float DEFAULT_LOAD_FACTOR = 0.75f; /** @@ -52,9 +55,9 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { */ private final AtomicBoolean eviction = new AtomicBoolean(); /** - * The lock to prevent the addition of changes during the swap of queue of changes. + * The lock to prevent the addition of changes during the swap of queue of changes or the cache cleaning. */ - private final ReadWriteLock swapLock = new ReentrantReadWriteLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** * The maximum cache size. */ @@ -62,17 +65,30 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { /** * The last changes recorded. */ - private final AtomicReference<Deque<Entry<K, V>>> lastChanges = new AtomicReference<>(new ConcurrentLinkedDeque<>()); + private final AtomicReference<Deque<Entry<K, ValueHolder<V>>>> lastChanges + = new AtomicReference<>(new ConcurrentLinkedDeque<>()); + /** + * The total amount of changes recorded. + */ + private final AtomicInteger totalChanges = new AtomicInteger(); /** * The function to call when an entry is evicted. */ private final Consumer<V> evict; + /** + * The sequence number used to generate a unique id for each cache change. + */ + private final AtomicLong sequence = new AtomicLong(); + /** + * The underlying map. + */ + private final Map<K, ValueHolder<V>> delegate; public SimpleLRUCache(int initialCapacity, int maximumCacheSize, Consumer<V> evicted) { - super(initialCapacity, DEFAULT_LOAD_FACTOR); if (maximumCacheSize <= 0) { throw new IllegalArgumentException("The maximum cache size must be greater than 0"); } + this.delegate = new ConcurrentHashMap<>(initialCapacity, DEFAULT_LOAD_FACTOR); this.maximumCacheSize = maximumCacheSize; this.evict = Objects.requireNonNull(evicted); } @@ -84,20 +100,67 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { * @param mappingFunction the mapping function to apply. * @return the result of the mapping function. */ - private V addChange(OperationContext<K, V> context, Function<? super K, ? extends V> mappingFunction) { + private ValueHolder<V> addChange(OperationContext<K, V> context, Function<? super K, ? extends V> mappingFunction) { K key = context.key; V value = mappingFunction.apply(key); if (value == null) { return null; } - Entry<K, V> entry = Map.entry(key, value); - swapLock.readLock().lock(); - try { - lastChanges.get().add(entry); - } finally { - swapLock.readLock().unlock(); + ValueHolder<V> holder = newValue(value); + lastChanges.get().add(Map.entry(key, holder)); + totalChanges.incrementAndGet(); + return holder; + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return delegate.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + if (value == null) + throw new NullPointerException(); + return delegate.values().stream() + .map(ValueHolder::get) + .anyMatch(v -> Objects.equals(v, value)); + } + + @Override + public V get(Object key) { + return extractValue(delegate.get(key)); + } + + @SuppressWarnings("unchecked") + @Override + public boolean remove(Object key, Object value) { + if (key == null || value == null) { + throw new NullPointerException(); + } + K keyK = (K) key; + try (OperationContext<K, V> context = new OperationContext<>(this, keyK)) { + delegate.compute( + keyK, + (k, v) -> { + V extractedValue = extractValue(v); + if (Objects.equals(value, extractedValue)) { + context.result = extractedValue; + return null; + } + return v; + }); + return context.result != null; } - return value; } @Override @@ -106,26 +169,31 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.compute( + delegate.compute( key, (k, v) -> { - context.result = v; + context.result = extractValue(v); return addChange(context, x -> value); }); return context.result; } } + @Override + public V remove(Object key) { + return extractValue(delegate.remove(key)); + } + @Override public V putIfAbsent(K key, V value) { if (key == null || value == null) { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.compute( + delegate.compute( key, (k, v) -> { - context.result = v; + context.result = extractValue(v); if (v != null) { return v; } @@ -141,7 +209,7 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.computeIfAbsent(key, k -> addChange(context, mappingFunction)); + return extractValue(delegate.computeIfAbsent(key, k -> addChange(context, mappingFunction))); } } @@ -151,7 +219,8 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.computeIfPresent(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, v))); + return extractValue(delegate.computeIfPresent(key, + (k, v) -> addChange(context, x -> remappingFunction.apply(x, extractValue(v))))); } } @@ -161,7 +230,8 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.compute(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, v))); + return extractValue( + delegate.compute(key, (k, v) -> addChange(context, x -> remappingFunction.apply(x, extractValue(v))))); } } @@ -171,12 +241,12 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - return super.compute( + return extractValue(delegate.compute( key, (k, oldValue) -> { - V newValue = (oldValue == null) ? value : remappingFunction.apply(oldValue, value); + V newValue = (oldValue == null) ? value : remappingFunction.apply(oldValue.get(), value); return addChange(context, x -> newValue); - }); + })); } } @@ -186,12 +256,13 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.computeIfPresent( + delegate.computeIfPresent( key, (k, v) -> { - if (Objects.equals(oldValue, v)) { - context.result = addChange(context, x -> newValue); - return context.result; + if (Objects.equals(oldValue, extractValue(v))) { + ValueHolder<V> result = addChange(context, x -> newValue); + context.result = extractValue(result); + return result; } return v; }); @@ -205,10 +276,10 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { throw new NullPointerException(); } try (OperationContext<K, V> context = new OperationContext<>(this, key)) { - super.computeIfPresent( + delegate.computeIfPresent( key, (k, v) -> { - context.result = v; + context.result = extractValue(v); return addChange(context, x -> value); }); return context.result; @@ -222,23 +293,56 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { } } + @Override + public void clear() { + lock.writeLock().lock(); + try { + lastChanges.getAndSet(new ConcurrentLinkedDeque<>()); + totalChanges.set(0); + delegate.clear(); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Set<K> keySet() { + return delegate.keySet(); + } + + @Override + public Collection<V> values() { + return delegate.values().stream().map(ValueHolder::get).toList(); + } + @Override public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { + if (function == null) { + throw new NullPointerException(); + } for (Entry<? extends K, ? extends V> e : entrySet()) { - replace(e.getKey(), e.getValue(), function.apply(e.getKey(), e.getValue())); + K key = e.getKey(); + V value = e.getValue(); + try (OperationContext<K, V> context = new OperationContext<>(this, key)) { + delegate.computeIfPresent( + key, + (k, v) -> addChange(context, x -> function.apply(x, value))); + } } } @Override public Set<Entry<K, V>> entrySet() { - return Collections.unmodifiableSet(super.entrySet()); + return delegate.entrySet().stream() + .map(entry -> new CacheEntry<>(this, entry.getKey(), entry.getValue().get())) + .collect(Collectors.toUnmodifiableSet()); } /** * @return the size of the queue of changes. */ int getQueueSize() { - return lastChanges.get().size(); + return totalChanges.get(); } /** @@ -272,31 +376,48 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { /** * @return the oldest existing change. */ - private Entry<K, V> nextOldestChange() { - return lastChanges.get().poll(); + private Entry<K, ValueHolder<V>> nextOldestChange() { + Entry<K, ValueHolder<V>> oldestChange = lastChanges.get().poll(); + totalChanges.decrementAndGet(); + return oldestChange; } /** - * Removes duplicates from the queue of changes. + * Removes duplicates from the queue of changes if the queue is full. */ - private void compressChanges() { - Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>(); - Deque<Entry<K, V>> currentChanges; - swapLock.writeLock().lock(); + private void compressChangesIfNeeded() { + Deque<Entry<K, ValueHolder<V>>> newChanges; + Deque<Entry<K, ValueHolder<V>>> currentChanges; + lock.writeLock().lock(); try { - currentChanges = lastChanges.getAndSet(newChanges); + if (isQueueFull()) { + newChanges = new ConcurrentLinkedDeque<>(); + totalChanges.set(0); + currentChanges = lastChanges.getAndSet(newChanges); + } else { + return; + } } finally { - swapLock.writeLock().unlock(); + lock.writeLock().unlock(); } Set<K> keys = new HashSet<>(); - Entry<K, V> entry; + Entry<K, ValueHolder<V>> entry; while ((entry = currentChanges.pollLast()) != null) { if (keys.add(entry.getKey())) { newChanges.addFirst(entry); + totalChanges.incrementAndGet(); } } } + private ValueHolder<V> newValue(V value) { + return new ValueHolder<>(sequence.incrementAndGet(), value); + } + + private V extractValue(ValueHolder<V> holder) { + return holder == null ? null : holder.get(); + } + /** * The internal context of all write operations. */ @@ -317,21 +438,21 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { OperationContext(SimpleLRUCache<K, V> cache, K key) { this.cache = cache; this.key = key; + cache.lock.readLock().lock(); } @Override public void close() { + cache.lock.readLock().unlock(); if (cache.evictionNeeded() && cache.eviction.compareAndSet(false, true)) { try { do { - cache.compressChanges(); + cache.compressChangesIfNeeded(); if (cache.isCacheFull()) { - Entry<K, V> oldest = cache.nextOldestChange(); - if (oldest != null && cache.remove(oldest.getKey(), oldest.getValue())) { - cache.evict.accept(oldest.getValue()); + Entry<K, ValueHolder<V>> oldest = cache.nextOldestChange(); + if (cache.delegate.remove(oldest.getKey(), oldest.getValue())) { + cache.evict.accept(oldest.getValue().get()); } - } else { - break; } } while (cache.evictionNeeded()); } finally { @@ -340,4 +461,79 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { } } } + + /** + * A cache value holder that leverages a revision id to be able to distinguish the same key value pair that has been + * added several times to the cache. + * + * @param <V> the type of the value + */ + private static class ValueHolder<V> { + private final long revision; + private final V value; + + ValueHolder(long revision, V value) { + this.revision = revision; + this.value = value; + } + + V get() { + return value; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) + return false; + ValueHolder<?> that = (ValueHolder<?>) o; + return revision == that.revision; + } + + @Override + public int hashCode() { + return Objects.hashCode(revision); + } + } + + /** + * A modifiable cache entry. + * + * @param <K> the type of the key + * @param <V> the type of the value + */ + private static class CacheEntry<K, V> implements Entry<K, V> { + + private final K key; + private V val; + /** + * The underlying cache. + */ + private final SimpleLRUCache<K, V> cache; + + CacheEntry(SimpleLRUCache<K, V> cache, K key, V value) { + this.cache = cache; + this.key = key; + this.val = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return val; + } + + @Override + public V setValue(V value) { + if (value == null) + throw new NullPointerException(); + V v = val; + val = value; + cache.put(key, value); + return v; + } + } }