This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch lru-sync in repository https://gitbox.apache.org/repos/asf/camel.git
commit dd30b1fe2487ee1a5e57dca947a79731aaa585a6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri May 19 11:13:28 2023 +0200 CAMEL-19295: Experiment with sync LRUCache and test case that otherwise will OOME --- .../camel/impl/DefaultEndpointRegistryTest.java | 54 ++++++++++++++++++++++ .../camel/support/DefaultLRUCacheFactory.java | 21 +++++---- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java index d70fa1d996b..4bcae075a55 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java @@ -16,9 +16,17 @@ */ package org.apache.camel.impl; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.camel.ProducerTemplate; +import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.engine.DefaultEndpointRegistry; +import org.apache.camel.impl.engine.SimpleCamelContext; import org.apache.camel.spi.EndpointRegistry; +import org.apache.camel.support.NormalizedUri; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -68,4 +76,50 @@ public class DefaultEndpointRegistryTest { assertTrue(reg.isStatic("file:error")); } + //Testing the issue https://issues.apache.org/jira/browse/CAMEL-19295 + @Test + public void testConcurrency() throws InterruptedException { + + SimpleCamelContext context = new SimpleCamelContext(); + context.start(); + + ProducerTemplate producerTemplate = context.createProducerTemplate(); + EndpointRegistry<NormalizedUri> endpointRegistry = context.getEndpointRegistry(); + + int nThreads = 4; + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + int iterations = 500; + + for (int j = 0; j < iterations; j++) { + CountDownLatch allThreadCompletionSemaphore = new CountDownLatch(nThreads); + for (int i = 0; i < nThreads; i++) { + + executorService.submit(() -> { + + producerTemplate.requestBody("controlbus:route?routeId=route1&action=ACTION_STATUS&loggingLevel=off", null, + ServiceStatus.class); + producerTemplate.requestBody("controlbus:route?routeId=route2&action=ACTION_STATUS&loggingLevel=off", null, + ServiceStatus.class); + producerTemplate.requestBody("controlbus:route?routeId=route3&action=ACTION_STATUS&loggingLevel=off", null, + ServiceStatus.class); + producerTemplate.requestBody("controlbus:route?routeId=route4&action=ACTION_STATUS&loggingLevel=off", null, + ServiceStatus.class); + producerTemplate.requestBody("controlbus:route?routeId=route5&action=ACTION_STATUS&loggingLevel=off", null, + ServiceStatus.class); + + allThreadCompletionSemaphore.countDown(); + + }); + } + + allThreadCompletionSemaphore.await(); + + assertTrue(endpointRegistry.values().toArray() != null); + + } + + executorService.shutdown(); + + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java index 995a1405a2d..eefc3acbe7a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java @@ -16,6 +16,7 @@ */ package org.apache.camel.support; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -41,7 +42,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { @Override public <K, V> Map<K, V> createLRUCache(int maximumCacheSize) { LOG.trace("Creating LRUCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(maximumCacheSize); + return Collections.synchronizedMap(new SimpleLRUCache<>(maximumCacheSize)); } /** @@ -53,7 +54,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { @Override public <K, V> Map<K, V> createLRUCache(int maximumCacheSize, Consumer<V> onEvict) { LOG.trace("Creating LRUCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(16, maximumCacheSize, onEvict); + return Collections.synchronizedMap(new SimpleLRUCache<>(16, maximumCacheSize, onEvict)); } /** @@ -67,7 +68,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { @Override public <K, V> Map<K, V> createLRUCache(int initialCapacity, int maximumCacheSize) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize); + return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize)); } /** @@ -83,7 +84,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { public <K, V> Map<K, V> createLRUCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, maximumCacheSize, stopOnEviction); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction); + return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction)); } /** @@ -96,20 +97,20 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { @Override public <K, V> Map<K, V> createLRUSoftCache(int maximumCacheSize) { LOG.trace("Creating LRUSoftCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(maximumCacheSize); + return Collections.synchronizedMap(new SimpleLRUCache<>(maximumCacheSize)); } @Override public <K, V> Map<K, V> createLRUSoftCache(int initialCapacity, int maximumCacheSize) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize); + return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize)); } @Override public <K, V> Map<K, V> createLRUSoftCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, maximumCacheSize, stopOnEviction); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction); + return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction)); } /** @@ -122,20 +123,20 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory { @Override public <K, V> Map<K, V> createLRUWeakCache(int maximumCacheSize) { LOG.trace("Creating LRUWeakCache with maximumCacheSize: {}", maximumCacheSize); - return new SimpleLRUCache<>(maximumCacheSize); + return Collections.synchronizedMap(new SimpleLRUCache<>(maximumCacheSize)); } @Override public <K, V> Map<K, V> createLRUWeakCache(int initialCapacity, int maximumCacheSize) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize); + return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize)); } @Override public <K, V> Map<K, V> createLRUWeakCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) { LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity, maximumCacheSize, stopOnEviction); - return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction); + return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction)); } private class SimpleLRUCache<K, V> extends LinkedHashMap<K, V> {