This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 07bcafc7839 CAMEL-20199: Remove synchronized block from components A 
to C
07bcafc7839 is described below

commit 07bcafc783935293af4938ed8f5611a17c9f4f5c
Author: Nicolas Filotto <nfilo...@talend.com>
AuthorDate: Thu Aug 1 14:53:53 2024 +0200

    CAMEL-20199: Remove synchronized block from components A to C
---
 .../as2/api/AS2AsyncMDNServerConnection.java       |  14 ++-
 .../component/as2/api/AS2ServerConnection.java     |  14 ++-
 .../as2/internal/AS2ConnectionHelper.java          | 100 +++++++++++---------
 .../atmosphere/websocket/WebsocketComponent.java   |  12 +--
 .../component/aws2/kinesis/Kinesis2Consumer.java   |  16 ++--
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java |  24 ++++-
 .../component/bean/AbstractBeanProcessor.java      |  10 +-
 .../camel/component/bean/MethodInfoCache.java      |  18 +---
 .../component/braintree/BraintreeComponent.java    |  20 ++--
 .../camel/component/cometd/CometdComponent.java    |  36 ++++---
 .../consul/cluster/ConsulClusterView.java          |  20 ++--
 .../component/couchbase/CouchbaseConsumer.java     | 105 +++++++++++----------
 ...MarshalHeaderWithCustomMarshallFactoryTest.java |   8 +-
 .../camel/component/cxf/jaxrs/CxfRsProducer.java   |  28 +++---
 14 files changed, 234 insertions(+), 191 deletions(-)

diff --git 
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
 
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
index 0f9660c3d95..29bc7ab8547 100644
--- 
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
+++ 
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
@@ -21,6 +21,8 @@ import java.io.InterruptedIOException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocketFactory;
@@ -54,7 +56,7 @@ public class AS2AsyncMDNServerConnection {
     private static final String REQUEST_HANDLER_THREAD_NAME_PREFIX = 
"AS2AsyncMdnHdlr-";
     private static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
     private RequestListenerThread listenerThread;
-    private final Object lock = new Object();
+    private final Lock lock = new ReentrantLock();
 
     public AS2AsyncMDNServerConnection(Integer portNumber, SSLContext 
sslContext)
                                                                                
   throws IOException {
@@ -66,7 +68,8 @@ public class AS2AsyncMDNServerConnection {
 
     public void close() {
         if (listenerThread != null) {
-            synchronized (lock) {
+            lock.lock();
+            try {
                 try {
                     listenerThread.serverSocket.close();
                 } catch (IOException e) {
@@ -74,14 +77,19 @@ public class AS2AsyncMDNServerConnection {
                 } finally {
                     listenerThread = null;
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
 
     public void receive(String requestUriPattern, HttpRequestHandler handler) {
         if (listenerThread != null) {
-            synchronized (lock) {
+            lock.lock();
+            try {
                 listenerThread.registerHandler(requestUriPattern, handler);
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
 
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
index 930d0dfc05b..08976e9b122 100644
--- 
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
+++ 
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
@@ -23,6 +23,8 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.security.PrivateKey;
 import java.security.cert.Certificate;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocketFactory;
@@ -213,7 +215,7 @@ public class AS2ServerConnection {
     }
 
     private RequestListenerThread listenerThread;
-    private final Object lock = new Object();
+    private final Lock lock = new ReentrantLock();
     private final String as2Version;
     private final String originServer;
     private final String serverFqdn;
@@ -267,7 +269,8 @@ public class AS2ServerConnection {
 
     public void close() {
         if (listenerThread != null) {
-            synchronized (lock) {
+            lock.lock();
+            try {
                 try {
                     listenerThread.serversocket.close();
                 } catch (IOException e) {
@@ -275,14 +278,19 @@ public class AS2ServerConnection {
                 } finally {
                     listenerThread = null;
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
 
     public void listen(String requestUri, HttpRequestHandler handler) {
         if (listenerThread != null) {
-            synchronized (lock) {
+            lock.lock();
+            try {
                 listenerThread.registerHandler(requestUri, handler);
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
 
b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
index f19f81e6b53..c43297fa924 100644
--- 
a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
+++ 
b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
@@ -17,8 +17,9 @@
 package org.apache.camel.component.as2.internal;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.component.as2.AS2Configuration;
 import org.apache.camel.component.as2.api.AS2AsyncMDNServerConnection;
@@ -34,9 +35,9 @@ public final class AS2ConnectionHelper {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AS2ConnectionHelper.class);
 
-    private static Map<Integer, AS2ServerConnection> serverConnections = new 
HashMap<>();
+    private static final Map<Integer, AS2ServerConnection> serverConnections = 
new ConcurrentHashMap<>();
 
-    private static Map<Integer, AS2AsyncMDNServerConnection> 
asyncMdnServerConnections = new HashMap<>();
+    private static final Map<Integer, AS2AsyncMDNServerConnection> 
asyncMdnServerConnections = new ConcurrentHashMap<>();
 
     /**
      * Prevent instantiation
@@ -69,17 +70,20 @@ public final class AS2ConnectionHelper {
      */
     public static AS2AsyncMDNServerConnection 
createAS2AsyncMDNServerConnection(AS2Configuration configuration)
             throws IOException {
-        AS2AsyncMDNServerConnection asyncMdnServerConnection
-                = 
asyncMdnServerConnections.get(configuration.getAsyncMdnPortNumber());
-        synchronized (asyncMdnServerConnections) {
-            if (asyncMdnServerConnection == null) {
-                asyncMdnServerConnection
-                        = new AS2AsyncMDNServerConnection(
-                                configuration.getAsyncMdnPortNumber(), 
configuration.getSslContext());
-                
asyncMdnServerConnections.put(configuration.getAsyncMdnPortNumber(), 
asyncMdnServerConnection);
-            }
+        try {
+            return asyncMdnServerConnections.computeIfAbsent(
+                    configuration.getAsyncMdnPortNumber(),
+                    key -> {
+                        try {
+                            return new AS2AsyncMDNServerConnection(
+                                    configuration.getAsyncMdnPortNumber(), 
configuration.getSslContext());
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    });
+        } catch (UncheckedIOException e) {
+            throw e.getCause();
         }
-        return asyncMdnServerConnection;
     }
 
     /**
@@ -90,18 +94,24 @@ public final class AS2ConnectionHelper {
      * @throws IOException
      */
     public static AS2ServerConnection 
createAS2ServerConnection(AS2Configuration configuration) throws IOException {
-        synchronized (serverConnections) {
-            AS2ServerConnection serverConnection = 
serverConnections.get(configuration.getServerPortNumber());
-            if (serverConnection == null) {
-                serverConnection = new AS2ServerConnection(
-                        configuration.getAs2Version(), 
configuration.getServer(),
-                        configuration.getServerFqdn(), 
configuration.getServerPortNumber(), configuration.getSigningAlgorithm(),
-                        configuration.getSigningCertificateChain(), 
configuration.getSigningPrivateKey(),
-                        configuration.getDecryptingPrivateKey(), 
configuration.getMdnMessageTemplate(),
-                        configuration.getValidateSigningCertificateChain(), 
configuration.getSslContext());
-                serverConnections.put(configuration.getServerPortNumber(), 
serverConnection);
-            }
-            return serverConnection;
+        try {
+            return serverConnections.computeIfAbsent(
+                    configuration.getServerPortNumber(),
+                    key -> {
+                        try {
+                            return new AS2ServerConnection(
+                                    configuration.getAs2Version(), 
configuration.getServer(),
+                                    configuration.getServerFqdn(), 
configuration.getServerPortNumber(),
+                                    configuration.getSigningAlgorithm(),
+                                    
configuration.getSigningCertificateChain(), 
configuration.getSigningPrivateKey(),
+                                    configuration.getDecryptingPrivateKey(), 
configuration.getMdnMessageTemplate(),
+                                    
configuration.getValidateSigningCertificateChain(), 
configuration.getSslContext());
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    });
+        } catch (UncheckedIOException e) {
+            throw e.getCause();
         }
     }
 
@@ -111,34 +121,30 @@ public final class AS2ConnectionHelper {
     }
 
     public static void closeAllServerConnections() {
-        synchronized (serverConnections) {
-            for (Map.Entry<Integer, AS2ServerConnection> entry : 
serverConnections.entrySet()) {
-                try {
-                    int port = entry.getKey();
-                    LOG.debug("Stopping and closing AS2ServerConnection on 
port: {}", port);
-                    AS2ServerConnection conn = entry.getValue();
-                    conn.close();
-                } catch (Exception e) {
-                    // ignore
-                    LOG.debug("Error stopping and closing AS2ServerConnection 
due to {}. This exception is ignored",
-                            e.getMessage(), e);
-                }
+        for (Map.Entry<Integer, AS2ServerConnection> entry : 
serverConnections.entrySet()) {
+            try {
+                int port = entry.getKey();
+                LOG.debug("Stopping and closing AS2ServerConnection on port: 
{}", port);
+                AS2ServerConnection conn = entry.getValue();
+                conn.close();
+            } catch (Exception e) {
+                // ignore
+                LOG.debug("Error stopping and closing AS2ServerConnection due 
to {}. This exception is ignored",
+                        e.getMessage(), e);
             }
         }
         serverConnections.clear();
     }
 
     public static void closeAllAsyncMdnServerConnections() {
-        synchronized (asyncMdnServerConnections) {
-            for (Map.Entry<Integer, AS2AsyncMDNServerConnection> entry : 
asyncMdnServerConnections.entrySet()) {
-                try {
-                    int port = entry.getKey();
-                    LOG.debug("Stopping and closing AsyncMdnServerConnection 
on port: {}", port);
-                    entry.getValue().close();
-                } catch (Exception e) {
-                    LOG.debug("Error stopping and closing 
AsyncMdnServerConnection due to {}. This exception is ignored",
-                            e.getMessage(), e);
-                }
+        for (Map.Entry<Integer, AS2AsyncMDNServerConnection> entry : 
asyncMdnServerConnections.entrySet()) {
+            try {
+                int port = entry.getKey();
+                LOG.debug("Stopping and closing AsyncMdnServerConnection on 
port: {}", port);
+                entry.getValue().close();
+            } catch (Exception e) {
+                LOG.debug("Error stopping and closing AsyncMdnServerConnection 
due to {}. This exception is ignored",
+                        e.getMessage(), e);
             }
         }
         asyncMdnServerConnections.clear();
diff --git 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
index 7336776a45c..5d48a1067f4 100644
--- 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
+++ 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
@@ -17,8 +17,8 @@
 package org.apache.camel.component.atmosphere.websocket;
 
 import java.net.URI;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.component.servlet.ServletComponent;
 import org.apache.camel.component.servlet.ServletEndpoint;
@@ -29,14 +29,14 @@ import org.apache.camel.spi.annotations.Component;
  */
 @Component("atmosphere-websocket")
 public class WebsocketComponent extends ServletComponent {
-    private Map<String, WebSocketStore> stores;
+    private final Map<String, WebSocketStore> stores;
 
     public WebsocketComponent() {
         // override the default servlet name of ServletComponent
         super(WebsocketEndpoint.class);
         setServletName("CamelWsServlet");
 
-        this.stores = new HashMap<>();
+        this.stores = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -46,10 +46,6 @@ public class WebsocketComponent extends ServletComponent {
     }
 
     WebSocketStore getWebSocketStore(String name) {
-        WebSocketStore store;
-        synchronized (stores) {
-            store = stores.computeIfAbsent(name, k -> new 
MemoryWebSocketStore());
-        }
-        return store;
+        return stores.computeIfAbsent(name, k -> new MemoryWebSocketStore());
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 2b94b9da034..c33cedab3ba 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -16,11 +16,7 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -56,9 +52,9 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
     private KinesisConnection connection;
     private ResumeStrategy resumeStrategy;
 
-    private Map<String, String> currentShardIterators = new 
java.util.HashMap<>();
+    private final Map<String, String> currentShardIterators = new 
java.util.HashMap<>();
 
-    private List<Shard> currentShardList = new ArrayList<>();
+    private volatile List<Shard> currentShardList = List.of();
     private static final String SHARD_MONITOR_EXECUTOR_NAME = 
"Kinesis_shard_monitor";
     private ScheduledExecutorService shardMonitorExecutor;
 
@@ -384,12 +380,12 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
         return getEndpoint().getConfiguration();
     }
 
-    protected synchronized List<Shard> getCurrentShardList() {
+    protected List<Shard> getCurrentShardList() {
         return this.currentShardList;
     }
 
-    private synchronized void setCurrentShardList(List<Shard> latestShardList) 
{
-        this.currentShardList = latestShardList;
+    private void setCurrentShardList(List<Shard> latestShardList) {
+        this.currentShardList = List.copyOf(latestShardList);
     }
 
     private class ShardMonitor implements Runnable {
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 8d23db6f1a5..57c8459fb9c 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -25,6 +25,8 @@ import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -63,7 +65,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     private static final String TIMEOUT_CHECKER_EXECUTOR_NAME = 
"S3_Streaming_Upload_Timeout_Checker";
     private AtomicInteger part = new AtomicInteger();
     private UploadState uploadAggregate = null;
-    private final Object lock = new Object();
+    private final Lock lock = new ReentrantLock();
     private transient String s3ProducerToString;
     private ScheduledExecutorService timeoutCheckerExecutorService;
 
@@ -89,11 +91,14 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
-        synchronized (lock) {
+        lock.lock();
+        try {
             if (ObjectHelper.isNotEmpty(uploadAggregate)) {
                 uploadPart(uploadAggregate);
                 completeUpload(uploadAggregate);
             }
+        } finally {
+            lock.unlock();
         }
         if (timeoutCheckerExecutorService != null) {
             
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(timeoutCheckerExecutorService);
@@ -110,12 +115,15 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
 
         @Override
         public void run() {
-            synchronized (lock) {
+            lock.lock();
+            try {
                 if (ObjectHelper.isNotEmpty(uploadAggregate)) {
                     uploadPart(uploadAggregate);
                     completeUpload(uploadAggregate);
                     uploadAggregate = null;
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
@@ -139,7 +147,8 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
             totalSize += b.length;
             if (getConfiguration().isMultiPartUpload())
                 maxRead -= b.length;
-            synchronized (lock) {
+            lock.lock();
+            try {
                 // aggregate with previously received exchanges
                 if (ObjectHelper.isNotEmpty(uploadAggregate)) {
                     uploadAggregate.buffer.write(b);
@@ -165,6 +174,8 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                     }
                     continue;
                 }
+            } finally {
+                lock.unlock();
             }
             if (state == null) {
                 state = new UploadState();
@@ -242,13 +253,16 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
 
         if (ObjectHelper.isNotEmpty(state)) {
             // exchange wasn't large enough to send, batch it with subsequent 
exchanges.
-            synchronized (lock) {
+            lock.lock();
+            try {
                 if (ObjectHelper.isEmpty(this.uploadAggregate)) {
                     this.uploadAggregate = state;
                 } else {
                     // handle potential race condition.
                     
this.uploadAggregate.buffer.write(state.buffer.toByteArray());
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
 
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
index ff287aeba53..6f184389ea5 100644
--- 
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
+++ 
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.bean;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.BeanScope;
 import org.apache.camel.Exchange;
@@ -38,7 +41,7 @@ public abstract class AbstractBeanProcessor extends 
AsyncProcessorSupport {
     private transient Processor processor;
     private transient Object bean;
     private transient boolean lookupProcessorDone;
-    private final Object lock = new Object();
+    private final Lock lock = new ReentrantLock();
     private BeanScope scope;
     private String method;
     private boolean shorthandMethod;
@@ -130,11 +133,14 @@ public abstract class AbstractBeanProcessor extends 
AsyncProcessorSupport {
             boolean allowCache = scope == null || scope == BeanScope.Singleton;
             if (allowCache) {
                 if (!lookupProcessorDone) {
-                    synchronized (lock) {
+                    lock.lock();
+                    try {
                         lookupProcessorDone = true;
                         // so if there is a custom type converter for the bean 
to processor
                         target = 
exchange.getContext().getTypeConverter().tryConvertTo(Processor.class, 
exchange, beanTmp);
                         processor = target;
+                    } finally {
+                        lock.unlock();
                     }
                 }
             } else {
diff --git 
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
 
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
index 896204b1121..3d52bebae7d 100644
--- 
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
+++ 
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
@@ -45,13 +45,8 @@ public class MethodInfoCache {
         this.methodCache = methodCache;
     }
 
-    public synchronized MethodInfo getMethodInfo(Method method) {
-        MethodInfo answer = methodCache.get(method);
-        if (answer == null) {
-            answer = createMethodInfo(method);
-            methodCache.put(method, answer);
-        }
-        return answer;
+    public MethodInfo getMethodInfo(Method method) {
+        return methodCache.computeIfAbsent(method, this::createMethodInfo);
     }
 
     protected MethodInfo createMethodInfo(Method method) {
@@ -60,13 +55,8 @@ public class MethodInfoCache {
         return info.getMethodInfo(method);
     }
 
-    protected synchronized BeanInfo getBeanInfo(Class<?> declaringClass) {
-        BeanInfo beanInfo = classCache.get(declaringClass);
-        if (beanInfo == null) {
-            beanInfo = createBeanInfo(declaringClass);
-            classCache.put(declaringClass, beanInfo);
-        }
-        return beanInfo;
+    protected BeanInfo getBeanInfo(Class<?> declaringClass) {
+        return classCache.computeIfAbsent(declaringClass, 
this::createBeanInfo);
     }
 
     protected BeanInfo createBeanInfo(Class<?> declaringClass) {
diff --git 
a/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
 
b/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
index 509897b7700..09d77fb1df4 100644
--- 
a/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
+++ 
b/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
@@ -16,8 +16,8 @@
  */
 package org.apache.camel.component.braintree;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.braintreegateway.BraintreeGateway;
 import org.apache.camel.CamelContext;
@@ -41,12 +41,12 @@ public class BraintreeComponent extends 
AbstractApiComponent<BraintreeApiName, B
 
     public BraintreeComponent() {
         super(BraintreeApiName.class, BraintreeApiCollection.getCollection());
-        this.gateways = new HashMap<>();
+        this.gateways = new ConcurrentHashMap<>();
     }
 
     public BraintreeComponent(CamelContext context) {
         super(context, BraintreeApiName.class, 
BraintreeApiCollection.getCollection());
-        this.gateways = new HashMap<>();
+        this.gateways = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -63,20 +63,12 @@ public class BraintreeComponent extends 
AbstractApiComponent<BraintreeApiName, B
         return new BraintreeEndpoint(uri, this, apiName, methodName, 
endpointConfiguration);
     }
 
-    public synchronized BraintreeGateway getGateway(BraintreeConfiguration 
configuration) {
+    public BraintreeGateway getGateway(BraintreeConfiguration configuration) {
         BraintreeGateway gateway;
         if (configuration.getAccessToken() != null) {
-            gateway = gateways.get(configuration.getAccessToken());
-            if (gateway == null) {
-                gateway = configuration.newBraintreeGateway();
-                gateways.put(configuration.getAccessToken(), gateway);
-            }
+            gateway = gateways.computeIfAbsent(configuration.getAccessToken(), 
k -> configuration.newBraintreeGateway());
         } else {
-            gateway = gateways.get(configuration.getMerchantId());
-            if (gateway == null) {
-                gateway = configuration.newBraintreeGateway();
-                gateways.put(configuration.getMerchantId(), gateway);
-            }
+            gateway = gateways.computeIfAbsent(configuration.getMerchantId(), 
k -> configuration.newBraintreeGateway());
         }
         return gateway;
     }
diff --git 
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
 
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
index 2f912d78bc5..6aa663fd688 100644
--- 
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
+++ 
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
@@ -22,6 +22,8 @@ import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import jakarta.servlet.DispatcherType;
 
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
 public class CometdComponent extends DefaultComponent implements 
SSLContextParametersAware {
     private static final Logger LOG = 
LoggerFactory.getLogger(CometdComponent.class);
 
+    private final Lock connectorsLock = new ReentrantLock();
     private final Map<String, ConnectorRef> connectors = new LinkedHashMap<>();
 
     private List<BayeuxServer.BayeuxServerListener> serverListeners;
@@ -116,7 +119,8 @@ public class CometdComponent extends DefaultComponent 
implements SSLContextParam
         CometdEndpoint endpoint = prodcon.getEndpoint();
         String connectorKey = endpoint.getProtocol() + ":" + 
endpoint.getUri().getHost() + ":" + endpoint.getPort();
 
-        synchronized (connectors) {
+        connectorsLock.lock();
+        try {
             ConnectorRef connectorRef = connectors.get(connectorKey);
             if (connectorRef == null) {
                 ServerConnector connector;
@@ -160,6 +164,8 @@ public class CometdComponent extends DefaultComponent 
implements SSLContextParam
                 }
             }
             prodcon.setBayeux(bayeux);
+        } finally {
+            connectorsLock.unlock();
         }
     }
 
@@ -171,16 +177,17 @@ public class CometdComponent extends DefaultComponent 
implements SSLContextParam
 
         String connectorKey = endpoint.getProtocol() + ":" + 
endpoint.getUri().getHost() + ":" + endpoint.getPort();
 
-        synchronized (connectors) {
+        connectorsLock.lock();
+        try {
             ConnectorRef connectorRef = connectors.get(connectorKey);
-            if (connectorRef != null) {
-                if (connectorRef.decrement() == 0) {
-                    
connectorRef.server.removeConnector(connectorRef.connector);
-                    connectorRef.connector.stop();
-                    connectorRef.server.stop();
-                    connectors.remove(connectorKey);
-                }
+            if (connectorRef != null && connectorRef.decrement() == 0) {
+                connectorRef.server.removeConnector(connectorRef.connector);
+                connectorRef.connector.stop();
+                connectorRef.server.stop();
+                connectors.remove(connectorKey);
             }
+        } finally {
+            connectorsLock.unlock();
         }
     }
 
@@ -352,10 +359,15 @@ public class CometdComponent extends DefaultComponent 
implements SSLContextParam
 
     @Override
     protected void doStop() throws Exception {
-        for (ConnectorRef connectorRef : connectors.values()) {
-            connectorRef.connector.stop();
+        connectorsLock.lock();
+        try {
+            for (ConnectorRef connectorRef : connectors.values()) {
+                connectorRef.connector.stop();
+            }
+            connectors.clear();
+        } finally {
+            connectorsLock.unlock();
         }
-        connectors.clear();
 
         super.doStop();
     }
diff --git 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
index eea1972290f..cfa1fae71f4 100644
--- 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
+++ 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import org.apache.camel.cluster.CamelClusterMember;
@@ -44,6 +46,7 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
 
     private final ConsulClusterConfiguration configuration;
     private final ConsulLocalMember localMember;
+    private final Lock sessionIdLock = new ReentrantLock();
     private final AtomicReference<String> sessionId;
     private final Watcher watcher;
 
@@ -113,21 +116,25 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
             if (keyValueClient.releaseLock(this.path, sessionId.get())) {
                 LOGGER.debug("Successfully released lock on path '{}' with id 
'{}'", path, sessionId.get());
             }
-
-            synchronized (sessionId) {
+            sessionIdLock.lock();
+            try {
                 sessionClient.destroySession(sessionId.getAndSet(null));
                 localMember.setMaster(false);
+            } finally {
+                sessionIdLock.unlock();
             }
         }
     }
 
     private boolean acquireLock() {
-        synchronized (sessionId) {
+        sessionIdLock.lock();
+        try {
             String sid = sessionId.get();
 
             return (sid != null)
-                    ? sessionClient.getSessionInfo(sid).map(si -> 
keyValueClient.acquireLock(path, sid)).orElse(Boolean.FALSE)
-                    : false;
+                    && sessionClient.getSessionInfo(sid).map(si -> 
keyValueClient.acquireLock(path, sid)).orElse(Boolean.FALSE);
+        } finally {
+            sessionIdLock.unlock();
         }
     }
 
@@ -136,7 +143,7 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
     // ***********************************************
 
     private final class ConsulLocalMember implements CamelClusterMember {
-        private AtomicBoolean master = new AtomicBoolean();
+        private final AtomicBoolean master = new AtomicBoolean();
 
         void setMaster(boolean master) {
             if (master && this.master.compareAndSet(false, true)) {
@@ -147,7 +154,6 @@ final class ConsulClusterView extends 
AbstractCamelClusterView {
             if (!master && this.master.compareAndSet(true, false)) {
                 LOGGER.debug("Leadership lost for session id {}", 
sessionId.get());
                 fireLeadershipChangedEvent(getLeader().orElse(null));
-                return;
             }
         }
 
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 8b252244f4c..ed171b5fffc 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.couchbase;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
 import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.Collection;
@@ -43,6 +46,7 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer implements R
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CouchbaseConsumer.class);
 
+    private final Lock lock = new ReentrantLock();
     private final CouchbaseEndpoint endpoint;
     private final Bucket bucket;
     private final Collection collection;
@@ -109,64 +113,69 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer implements R
     }
 
     @Override
-    protected synchronized int poll() throws Exception {
-        ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), 
endpoint.getViewName(), this.viewOptions);
-
-        // okay we have some response from CouchBase so lets mark the consumer 
as ready
-        forceConsumerAsReady();
+    protected int poll() throws Exception {
+        lock.lock();
+        try {
+            ViewResult result = 
bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(), 
this.viewOptions);
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("ViewResponse =  {}", result);
-        }
+            // okay we have some response from CouchBase so lets mark the 
consumer as ready
+            forceConsumerAsReady();
 
-        String consumerProcessedStrategy = 
endpoint.getConsumerProcessedStrategy();
-        for (ViewRow row : result.rows()) {
-            Object doc;
-            String id = row.id().get();
-            if (endpoint.isFullDocument()) {
-                doc = CouchbaseCollectionOperation.getDocument(collection, id, 
endpoint.getQueryTimeout());
-            } else {
-                doc = row.valueAs(Object.class);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("ViewResponse =  {}", result);
             }
 
-            String key = row.keyAs(JsonNode.class).get().asText();
-            String designDocumentName = endpoint.getDesignDocumentName();
-            String viewName = endpoint.getViewName();
-
-            Exchange exchange = createExchange(false);
-            try {
-                exchange.getIn().setBody(doc);
-                exchange.getIn().setHeader(HEADER_ID, id);
-                exchange.getIn().setHeader(HEADER_KEY, key);
-                exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, 
designDocumentName);
-                exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
-
-                if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Deleting doc with ID {}", id);
-                    }
-                    CouchbaseCollectionOperation.removeDocument(collection, 
id, endpoint.getWriteQueryTimeout(),
-                            endpoint.getProducerRetryPause());
-                } else if 
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Filtering out ID {}", id);
-                    }
-                    // add filter for already processed docs
+            String consumerProcessedStrategy = 
endpoint.getConsumerProcessedStrategy();
+            for (ViewRow row : result.rows()) {
+                Object doc;
+                String id = row.id().get();
+                if (endpoint.isFullDocument()) {
+                    doc = CouchbaseCollectionOperation.getDocument(collection, 
id, endpoint.getQueryTimeout());
                 } else {
-                    LOG.trace("No strategy set for already processed docs, 
beware of duplicates!");
+                    doc = row.valueAs(Object.class);
                 }
 
-                logDetails(id, doc, key, designDocumentName, viewName, 
exchange);
+                String key = row.keyAs(JsonNode.class).get().asText();
+                String designDocumentName = endpoint.getDesignDocumentName();
+                String viewName = endpoint.getViewName();
+
+                Exchange exchange = createExchange(false);
+                try {
+                    exchange.getIn().setBody(doc);
+                    exchange.getIn().setHeader(HEADER_ID, id);
+                    exchange.getIn().setHeader(HEADER_KEY, key);
+                    exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, 
designDocumentName);
+                    exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
+
+                    if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Deleting doc with ID {}", id);
+                        }
+                        
CouchbaseCollectionOperation.removeDocument(collection, id, 
endpoint.getWriteQueryTimeout(),
+                                endpoint.getProducerRetryPause());
+                    } else if 
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Filtering out ID {}", id);
+                        }
+                        // add filter for already processed docs
+                    } else {
+                        LOG.trace("No strategy set for already processed docs, 
beware of duplicates!");
+                    }
+
+                    logDetails(id, doc, key, designDocumentName, viewName, 
exchange);
 
-                getProcessor().process(exchange);
-            } catch (Exception e) {
-                this.getExceptionHandler().handleException("Error processing 
exchange.", exchange, e);
-            } finally {
-                releaseExchange(exchange, false);
+                    getProcessor().process(exchange);
+                } catch (Exception e) {
+                    this.getExceptionHandler().handleException("Error 
processing exchange.", exchange, e);
+                } finally {
+                    releaseExchange(exchange, false);
+                }
             }
-        }
 
-        return result.rows().size();
+            return result.rows().size();
+        } finally {
+            lock.unlock();
+        }
     }
 
     private void logDetails(String id, Object doc, String key, String 
designDocumentName, String viewName, Exchange exchange) {
diff --git 
a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
 
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
index eb16e14892d..6033d150f1d 100644
--- 
a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
+++ 
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
@@ -27,6 +27,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Stream;
 
 import org.apache.camel.Exchange;
@@ -115,6 +117,7 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest 
extends CamelTestSupp
 
     private static final class SinglePrinterCsvMarshaller extends 
CsvMarshaller {
 
+        private final Lock lock = new ReentrantLock();
         private final CSVPrinter printer;
 
         private SinglePrinterCsvMarshaller(CSVFormat format) {
@@ -134,7 +137,8 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest 
extends CamelTestSupp
         @Override
         @SuppressWarnings("unchecked")
         public void marshal(Exchange exchange, Object object, OutputStream 
outputStream) throws IOException {
-            synchronized (printer) {
+            lock.lock();
+            try {
                 if (object instanceof Map) {
                     Map map = (Map) object;
                     printer.printRecord(getMapRecordValues(map));
@@ -149,6 +153,8 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest 
extends CamelTestSupp
                 outputStream.write(stringBuilder.toString().getBytes());
                 // Reset the 'Appendable' for the next exchange.
                 stringBuilder.setLength(0);
+            } finally {
+                lock.unlock();
             }
         }
 
diff --git 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 809bd7c8801..d03dfcee08f 100644
--- 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -879,7 +879,7 @@ public class CxfRsProducer extends DefaultAsyncProducer {
      * Cache contains {@link 
org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean}
      */
     class ClientFactoryBeanCache {
-        private Map<String, JAXRSClientFactoryBean> cache;
+        private final Map<String, JAXRSClientFactoryBean> cache;
 
         ClientFactoryBeanCache(final int maxCacheSize) {
             this.cache = LRUCacheFactory.newLRUSoftCache(maxCacheSize);
@@ -896,22 +896,16 @@ public class CxfRsProducer extends DefaultAsyncProducer {
         }
 
         public JAXRSClientFactoryBean get(String address) {
-            JAXRSClientFactoryBean retVal = null;
-            synchronized (cache) {
-                retVal = cache.get(address);
-
-                if (retVal == null) {
-                    retVal = ((CxfRsEndpoint) 
getEndpoint()).createJAXRSClientFactoryBean(address);
-
-                    cache.put(address, retVal);
-
-                    LOG.trace("Created client factory bean and add to cache 
for address '{}'", address);
-
-                } else {
-                    LOG.trace("Retrieved client factory bean from cache for 
address '{}'", address);
-                }
-            }
-            return retVal;
+            return cache.compute(address,
+                    (key, value) -> {
+                        if (value == null) {
+                            value = ((CxfRsEndpoint) 
getEndpoint()).createJAXRSClientFactoryBean(address);
+                            LOG.trace("Created client factory bean and add to 
cache for address '{}'", address);
+                        } else {
+                            LOG.trace("Retrieved client factory bean from 
cache for address '{}'", address);
+                        }
+                        return value;
+                    });
         }
     }
 }


Reply via email to