This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 67b444d8ddc CAMEL-20103: fix the currency problem acquiring a producer template from the DefaultProducerCache 67b444d8ddc is described below commit 67b444d8ddc46c89d54169ece756a503388b151c Author: pnowak85 <pnowa...@users.noreply.github.com> AuthorDate: Tue Nov 14 10:10:33 2023 +0100 CAMEL-20103: fix the currency problem acquiring a producer template from the DefaultProducerCache Fix the currency problem acquiring a producer template from the DefaultProducerCache which can lead to a wrong producer being returned under high load (#11971) --- .../camel/impl/DefaultProducerCacheTest.java | 47 ++++++++++++++++++++++ .../camel/support/cache/DefaultProducerCache.java | 12 +++--- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java index e3b394a6bfd..bb82f409135 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java @@ -16,7 +16,14 @@ */ package org.apache.camel.impl; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -208,6 +215,46 @@ public class DefaultProducerCacheTest extends ContextTestSupport { await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(3, stopCounter.get())); } + @Test + public void testAcquireProducerConcurrency() throws InterruptedException, ExecutionException { + DefaultProducerCache cache = new DefaultProducerCache(this, context, 0); + cache.start(); + List<Endpoint> endpoints = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Endpoint e = context.getEndpoint("direct:queue:" + i); + AsyncProducer p = cache.acquireProducer(e); + endpoints.add(e); + } + + assertEquals(3, cache.size()); + + ExecutorService ex = Executors.newFixedThreadPool(16); + + List<Callable<Boolean>> callables = new ArrayList<>(); + + for(int i = 0; i < 500; i++) { + int index = i % 3; + callables.add(() -> { + Producer producer = cache.acquireProducer(endpoints.get(index)); + boolean isEqual = producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri()); + + if(!isEqual) { + log.info("Endpoint uri to acquire: " + endpoints.get(index).getEndpointUri() + ", returned producer (uri): " + producer.getEndpoint().getEndpointUri()); + } + + return isEqual; + }); + } + + for (int i = 1; i <= 100; i++) { + log.info("Iteration: {}", i); + List<Future<Boolean>> results = ex.invokeAll(callables); + for (Future<Boolean> future : results) { + assertEquals(true, future.get()); + } + } + } + private static class MyProducerCache extends DefaultProducerCache { private MyServicePool myServicePool; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java index 0cb817aa451..c156ef2b108 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java @@ -59,7 +59,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach private boolean extendedStatistics; private final int maxCacheSize; - private Endpoint lastUsedEndpoint; private AsyncProducer lastUsedProducer; public DefaultProducerCache(Object source, CamelContext camelContext, int cacheSize) { @@ -125,8 +124,10 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach public AsyncProducer acquireProducer(Endpoint endpoint) { // Try to favor thread locality as some data in the producer's cache may be shared among threads, // triggering cases of false sharing - if (endpoint == lastUsedEndpoint && endpoint.isSingletonProducer()) { - return lastUsedProducer; + // copy reference to avoid need for synchronization and be thread safe + AsyncProducer lastUsedProducerRef = lastUsedProducer; + if (lastUsedProducerRef != null && endpoint == lastUsedProducerRef.getEndpoint() && endpoint.isSingletonProducer()) { + return lastUsedProducerRef; } try { @@ -135,10 +136,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach statistics.onHit(endpoint.getEndpointUri()); } - synchronized (this) { - lastUsedEndpoint = endpoint; - lastUsedProducer = producer; - } + lastUsedProducer = producer; return producer; } catch (Exception e) {