This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 67b444d8ddc CAMEL-20103: fix the currency problem acquiring a producer 
template from the DefaultProducerCache
67b444d8ddc is described below

commit 67b444d8ddc46c89d54169ece756a503388b151c
Author: pnowak85 <pnowa...@users.noreply.github.com>
AuthorDate: Tue Nov 14 10:10:33 2023 +0100

    CAMEL-20103: fix the currency problem acquiring a producer template from 
the DefaultProducerCache
    
    Fix the currency problem acquiring a producer template from the 
DefaultProducerCache which can lead to a wrong producer being returned under 
high load (#11971)
---
 .../camel/impl/DefaultProducerCacheTest.java       | 47 ++++++++++++++++++++++
 .../camel/support/cache/DefaultProducerCache.java  | 12 +++---
 2 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
index e3b394a6bfd..bb82f409135 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
@@ -16,7 +16,14 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -208,6 +215,46 @@ public class DefaultProducerCacheTest extends 
ContextTestSupport {
         await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> 
assertEquals(3, stopCounter.get()));
     }
 
+    @Test
+    public void testAcquireProducerConcurrency() throws InterruptedException, 
ExecutionException {
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
0);
+        cache.start();
+        List<Endpoint> endpoints = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            AsyncProducer p = cache.acquireProducer(e);
+            endpoints.add(e);
+        }
+
+        assertEquals(3, cache.size());
+
+        ExecutorService ex = Executors.newFixedThreadPool(16);
+
+        List<Callable<Boolean>> callables = new ArrayList<>();
+
+        for(int i = 0; i < 500; i++) {
+            int index = i % 3;
+            callables.add(() -> {
+                Producer producer = 
cache.acquireProducer(endpoints.get(index));
+                boolean isEqual = 
producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());
+
+                if(!isEqual) {
+                    log.info("Endpoint uri to acquire: " + 
endpoints.get(index).getEndpointUri() + ", returned producer (uri): " + 
producer.getEndpoint().getEndpointUri());
+                }
+
+                return isEqual;
+            });
+        }
+
+        for (int i = 1; i <= 100; i++) {
+            log.info("Iteration: {}", i);
+            List<Future<Boolean>> results = ex.invokeAll(callables);
+            for (Future<Boolean> future : results) {
+                assertEquals(true, future.get());
+            }
+        }
+    }
+
     private static class MyProducerCache extends DefaultProducerCache {
 
         private MyServicePool myServicePool;
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index 0cb817aa451..c156ef2b108 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -59,7 +59,6 @@ public class DefaultProducerCache extends ServiceSupport 
implements ProducerCach
     private boolean extendedStatistics;
     private final int maxCacheSize;
 
-    private Endpoint lastUsedEndpoint;
     private AsyncProducer lastUsedProducer;
 
     public DefaultProducerCache(Object source, CamelContext camelContext, int 
cacheSize) {
@@ -125,8 +124,10 @@ public class DefaultProducerCache extends ServiceSupport 
implements ProducerCach
     public AsyncProducer acquireProducer(Endpoint endpoint) {
         // Try to favor thread locality as some data in the producer's cache 
may be shared among threads,
         // triggering cases of false sharing
-        if (endpoint == lastUsedEndpoint && endpoint.isSingletonProducer()) {
-            return lastUsedProducer;
+        // copy reference to avoid need for synchronization and be thread safe
+        AsyncProducer lastUsedProducerRef = lastUsedProducer;
+        if (lastUsedProducerRef != null && endpoint == 
lastUsedProducerRef.getEndpoint() && endpoint.isSingletonProducer()) {
+            return lastUsedProducerRef;
         }
 
         try {
@@ -135,10 +136,7 @@ public class DefaultProducerCache extends ServiceSupport 
implements ProducerCach
                 statistics.onHit(endpoint.getEndpointUri());
             }
 
-            synchronized (this) {
-                lastUsedEndpoint = endpoint;
-                lastUsedProducer = producer;
-            }
+            lastUsedProducer = producer;
 
             return producer;
         } catch (Exception e) {

Reply via email to