This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 07bcafc7839 CAMEL-20199: Remove synchronized block from components A to C 07bcafc7839 is described below commit 07bcafc783935293af4938ed8f5611a17c9f4f5c Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Thu Aug 1 14:53:53 2024 +0200 CAMEL-20199: Remove synchronized block from components A to C --- .../as2/api/AS2AsyncMDNServerConnection.java | 14 ++- .../component/as2/api/AS2ServerConnection.java | 14 ++- .../as2/internal/AS2ConnectionHelper.java | 100 +++++++++++--------- .../atmosphere/websocket/WebsocketComponent.java | 12 +-- .../component/aws2/kinesis/Kinesis2Consumer.java | 16 ++-- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 24 ++++- .../component/bean/AbstractBeanProcessor.java | 10 +- .../camel/component/bean/MethodInfoCache.java | 18 +--- .../component/braintree/BraintreeComponent.java | 20 ++-- .../camel/component/cometd/CometdComponent.java | 36 ++++--- .../consul/cluster/ConsulClusterView.java | 20 ++-- .../component/couchbase/CouchbaseConsumer.java | 105 +++++++++++---------- ...MarshalHeaderWithCustomMarshallFactoryTest.java | 8 +- .../camel/component/cxf/jaxrs/CxfRsProducer.java | 28 +++--- 14 files changed, 234 insertions(+), 191 deletions(-) diff --git a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java index 0f9660c3d95..29bc7ab8547 100644 --- a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java +++ b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java @@ -21,6 +21,8 @@ import java.io.InterruptedIOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLServerSocketFactory; @@ -54,7 +56,7 @@ public class AS2AsyncMDNServerConnection { private static final String REQUEST_HANDLER_THREAD_NAME_PREFIX = "AS2AsyncMdnHdlr-"; private static final int DEFAULT_BUFFER_SIZE = 8 * 1024; private RequestListenerThread listenerThread; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); public AS2AsyncMDNServerConnection(Integer portNumber, SSLContext sslContext) throws IOException { @@ -66,7 +68,8 @@ public class AS2AsyncMDNServerConnection { public void close() { if (listenerThread != null) { - synchronized (lock) { + lock.lock(); + try { try { listenerThread.serverSocket.close(); } catch (IOException e) { @@ -74,14 +77,19 @@ public class AS2AsyncMDNServerConnection { } finally { listenerThread = null; } + } finally { + lock.unlock(); } } } public void receive(String requestUriPattern, HttpRequestHandler handler) { if (listenerThread != null) { - synchronized (lock) { + lock.lock(); + try { listenerThread.registerHandler(requestUriPattern, handler); + } finally { + lock.unlock(); } } } diff --git a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java index 930d0dfc05b..08976e9b122 100644 --- a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java +++ b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java @@ -23,6 +23,8 @@ import java.net.Socket; import java.net.SocketException; import java.security.PrivateKey; import java.security.cert.Certificate; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLServerSocketFactory; @@ -213,7 +215,7 @@ public class AS2ServerConnection { } private RequestListenerThread listenerThread; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); private final String as2Version; private final String originServer; private final String serverFqdn; @@ -267,7 +269,8 @@ public class AS2ServerConnection { public void close() { if (listenerThread != null) { - synchronized (lock) { + lock.lock(); + try { try { listenerThread.serversocket.close(); } catch (IOException e) { @@ -275,14 +278,19 @@ public class AS2ServerConnection { } finally { listenerThread = null; } + } finally { + lock.unlock(); } } } public void listen(String requestUri, HttpRequestHandler handler) { if (listenerThread != null) { - synchronized (lock) { + lock.lock(); + try { listenerThread.registerHandler(requestUri, handler); + } finally { + lock.unlock(); } } } diff --git a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java index f19f81e6b53..c43297fa924 100644 --- a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java +++ b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java @@ -17,8 +17,9 @@ package org.apache.camel.component.as2.internal; import java.io.IOException; -import java.util.HashMap; +import java.io.UncheckedIOException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.camel.component.as2.AS2Configuration; import org.apache.camel.component.as2.api.AS2AsyncMDNServerConnection; @@ -34,9 +35,9 @@ public final class AS2ConnectionHelper { private static final Logger LOG = LoggerFactory.getLogger(AS2ConnectionHelper.class); - private static Map<Integer, AS2ServerConnection> serverConnections = new HashMap<>(); + private static final Map<Integer, AS2ServerConnection> serverConnections = new ConcurrentHashMap<>(); - private static Map<Integer, AS2AsyncMDNServerConnection> asyncMdnServerConnections = new HashMap<>(); + private static final Map<Integer, AS2AsyncMDNServerConnection> asyncMdnServerConnections = new ConcurrentHashMap<>(); /** * Prevent instantiation @@ -69,17 +70,20 @@ public final class AS2ConnectionHelper { */ public static AS2AsyncMDNServerConnection createAS2AsyncMDNServerConnection(AS2Configuration configuration) throws IOException { - AS2AsyncMDNServerConnection asyncMdnServerConnection - = asyncMdnServerConnections.get(configuration.getAsyncMdnPortNumber()); - synchronized (asyncMdnServerConnections) { - if (asyncMdnServerConnection == null) { - asyncMdnServerConnection - = new AS2AsyncMDNServerConnection( - configuration.getAsyncMdnPortNumber(), configuration.getSslContext()); - asyncMdnServerConnections.put(configuration.getAsyncMdnPortNumber(), asyncMdnServerConnection); - } + try { + return asyncMdnServerConnections.computeIfAbsent( + configuration.getAsyncMdnPortNumber(), + key -> { + try { + return new AS2AsyncMDNServerConnection( + configuration.getAsyncMdnPortNumber(), configuration.getSslContext()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); } - return asyncMdnServerConnection; } /** @@ -90,18 +94,24 @@ public final class AS2ConnectionHelper { * @throws IOException */ public static AS2ServerConnection createAS2ServerConnection(AS2Configuration configuration) throws IOException { - synchronized (serverConnections) { - AS2ServerConnection serverConnection = serverConnections.get(configuration.getServerPortNumber()); - if (serverConnection == null) { - serverConnection = new AS2ServerConnection( - configuration.getAs2Version(), configuration.getServer(), - configuration.getServerFqdn(), configuration.getServerPortNumber(), configuration.getSigningAlgorithm(), - configuration.getSigningCertificateChain(), configuration.getSigningPrivateKey(), - configuration.getDecryptingPrivateKey(), configuration.getMdnMessageTemplate(), - configuration.getValidateSigningCertificateChain(), configuration.getSslContext()); - serverConnections.put(configuration.getServerPortNumber(), serverConnection); - } - return serverConnection; + try { + return serverConnections.computeIfAbsent( + configuration.getServerPortNumber(), + key -> { + try { + return new AS2ServerConnection( + configuration.getAs2Version(), configuration.getServer(), + configuration.getServerFqdn(), configuration.getServerPortNumber(), + configuration.getSigningAlgorithm(), + configuration.getSigningCertificateChain(), configuration.getSigningPrivateKey(), + configuration.getDecryptingPrivateKey(), configuration.getMdnMessageTemplate(), + configuration.getValidateSigningCertificateChain(), configuration.getSslContext()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); } } @@ -111,34 +121,30 @@ public final class AS2ConnectionHelper { } public static void closeAllServerConnections() { - synchronized (serverConnections) { - for (Map.Entry<Integer, AS2ServerConnection> entry : serverConnections.entrySet()) { - try { - int port = entry.getKey(); - LOG.debug("Stopping and closing AS2ServerConnection on port: {}", port); - AS2ServerConnection conn = entry.getValue(); - conn.close(); - } catch (Exception e) { - // ignore - LOG.debug("Error stopping and closing AS2ServerConnection due to {}. This exception is ignored", - e.getMessage(), e); - } + for (Map.Entry<Integer, AS2ServerConnection> entry : serverConnections.entrySet()) { + try { + int port = entry.getKey(); + LOG.debug("Stopping and closing AS2ServerConnection on port: {}", port); + AS2ServerConnection conn = entry.getValue(); + conn.close(); + } catch (Exception e) { + // ignore + LOG.debug("Error stopping and closing AS2ServerConnection due to {}. This exception is ignored", + e.getMessage(), e); } } serverConnections.clear(); } public static void closeAllAsyncMdnServerConnections() { - synchronized (asyncMdnServerConnections) { - for (Map.Entry<Integer, AS2AsyncMDNServerConnection> entry : asyncMdnServerConnections.entrySet()) { - try { - int port = entry.getKey(); - LOG.debug("Stopping and closing AsyncMdnServerConnection on port: {}", port); - entry.getValue().close(); - } catch (Exception e) { - LOG.debug("Error stopping and closing AsyncMdnServerConnection due to {}. This exception is ignored", - e.getMessage(), e); - } + for (Map.Entry<Integer, AS2AsyncMDNServerConnection> entry : asyncMdnServerConnections.entrySet()) { + try { + int port = entry.getKey(); + LOG.debug("Stopping and closing AsyncMdnServerConnection on port: {}", port); + entry.getValue().close(); + } catch (Exception e) { + LOG.debug("Error stopping and closing AsyncMdnServerConnection due to {}. This exception is ignored", + e.getMessage(), e); } } asyncMdnServerConnections.clear(); diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java index 7336776a45c..5d48a1067f4 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java @@ -17,8 +17,8 @@ package org.apache.camel.component.atmosphere.websocket; import java.net.URI; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.camel.component.servlet.ServletComponent; import org.apache.camel.component.servlet.ServletEndpoint; @@ -29,14 +29,14 @@ import org.apache.camel.spi.annotations.Component; */ @Component("atmosphere-websocket") public class WebsocketComponent extends ServletComponent { - private Map<String, WebSocketStore> stores; + private final Map<String, WebSocketStore> stores; public WebsocketComponent() { // override the default servlet name of ServletComponent super(WebsocketEndpoint.class); setServletName("CamelWsServlet"); - this.stores = new HashMap<>(); + this.stores = new ConcurrentHashMap<>(); } @Override @@ -46,10 +46,6 @@ public class WebsocketComponent extends ServletComponent { } WebSocketStore getWebSocketStore(String name) { - WebSocketStore store; - synchronized (stores) { - store = stores.computeIfAbsent(name, k -> new MemoryWebSocketStore()); - } - return store; + return stores.computeIfAbsent(name, k -> new MemoryWebSocketStore()); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 2b94b9da034..c33cedab3ba 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -16,11 +16,7 @@ */ package org.apache.camel.component.aws2.kinesis; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -56,9 +52,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private KinesisConnection connection; private ResumeStrategy resumeStrategy; - private Map<String, String> currentShardIterators = new java.util.HashMap<>(); + private final Map<String, String> currentShardIterators = new java.util.HashMap<>(); - private List<Shard> currentShardList = new ArrayList<>(); + private volatile List<Shard> currentShardList = List.of(); private static final String SHARD_MONITOR_EXECUTOR_NAME = "Kinesis_shard_monitor"; private ScheduledExecutorService shardMonitorExecutor; @@ -384,12 +380,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R return getEndpoint().getConfiguration(); } - protected synchronized List<Shard> getCurrentShardList() { + protected List<Shard> getCurrentShardList() { return this.currentShardList; } - private synchronized void setCurrentShardList(List<Shard> latestShardList) { - this.currentShardList = latestShardList; + private void setCurrentShardList(List<Shard> latestShardList) { + this.currentShardList = List.copyOf(latestShardList); } private class ShardMonitor implements Runnable { diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index 8d23db6f1a5..57c8459fb9c 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -25,6 +25,8 @@ import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -63,7 +65,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { private static final String TIMEOUT_CHECKER_EXECUTOR_NAME = "S3_Streaming_Upload_Timeout_Checker"; private AtomicInteger part = new AtomicInteger(); private UploadState uploadAggregate = null; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); private transient String s3ProducerToString; private ScheduledExecutorService timeoutCheckerExecutorService; @@ -89,11 +91,14 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { @Override protected void doStop() throws Exception { - synchronized (lock) { + lock.lock(); + try { if (ObjectHelper.isNotEmpty(uploadAggregate)) { uploadPart(uploadAggregate); completeUpload(uploadAggregate); } + } finally { + lock.unlock(); } if (timeoutCheckerExecutorService != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(timeoutCheckerExecutorService); @@ -110,12 +115,15 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { @Override public void run() { - synchronized (lock) { + lock.lock(); + try { if (ObjectHelper.isNotEmpty(uploadAggregate)) { uploadPart(uploadAggregate); completeUpload(uploadAggregate); uploadAggregate = null; } + } finally { + lock.unlock(); } } } @@ -139,7 +147,8 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { totalSize += b.length; if (getConfiguration().isMultiPartUpload()) maxRead -= b.length; - synchronized (lock) { + lock.lock(); + try { // aggregate with previously received exchanges if (ObjectHelper.isNotEmpty(uploadAggregate)) { uploadAggregate.buffer.write(b); @@ -165,6 +174,8 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { } continue; } + } finally { + lock.unlock(); } if (state == null) { state = new UploadState(); @@ -242,13 +253,16 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { if (ObjectHelper.isNotEmpty(state)) { // exchange wasn't large enough to send, batch it with subsequent exchanges. - synchronized (lock) { + lock.lock(); + try { if (ObjectHelper.isEmpty(this.uploadAggregate)) { this.uploadAggregate = state; } else { // handle potential race condition. this.uploadAggregate.buffer.write(state.buffer.toByteArray()); } + } finally { + lock.unlock(); } } } diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java index ff287aeba53..6f184389ea5 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.bean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.camel.AsyncCallback; import org.apache.camel.BeanScope; import org.apache.camel.Exchange; @@ -38,7 +41,7 @@ public abstract class AbstractBeanProcessor extends AsyncProcessorSupport { private transient Processor processor; private transient Object bean; private transient boolean lookupProcessorDone; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); private BeanScope scope; private String method; private boolean shorthandMethod; @@ -130,11 +133,14 @@ public abstract class AbstractBeanProcessor extends AsyncProcessorSupport { boolean allowCache = scope == null || scope == BeanScope.Singleton; if (allowCache) { if (!lookupProcessorDone) { - synchronized (lock) { + lock.lock(); + try { lookupProcessorDone = true; // so if there is a custom type converter for the bean to processor target = exchange.getContext().getTypeConverter().tryConvertTo(Processor.class, exchange, beanTmp); processor = target; + } finally { + lock.unlock(); } } } else { diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java index 896204b1121..3d52bebae7d 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java @@ -45,13 +45,8 @@ public class MethodInfoCache { this.methodCache = methodCache; } - public synchronized MethodInfo getMethodInfo(Method method) { - MethodInfo answer = methodCache.get(method); - if (answer == null) { - answer = createMethodInfo(method); - methodCache.put(method, answer); - } - return answer; + public MethodInfo getMethodInfo(Method method) { + return methodCache.computeIfAbsent(method, this::createMethodInfo); } protected MethodInfo createMethodInfo(Method method) { @@ -60,13 +55,8 @@ public class MethodInfoCache { return info.getMethodInfo(method); } - protected synchronized BeanInfo getBeanInfo(Class<?> declaringClass) { - BeanInfo beanInfo = classCache.get(declaringClass); - if (beanInfo == null) { - beanInfo = createBeanInfo(declaringClass); - classCache.put(declaringClass, beanInfo); - } - return beanInfo; + protected BeanInfo getBeanInfo(Class<?> declaringClass) { + return classCache.computeIfAbsent(declaringClass, this::createBeanInfo); } protected BeanInfo createBeanInfo(Class<?> declaringClass) { diff --git a/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java b/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java index 509897b7700..09d77fb1df4 100644 --- a/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java +++ b/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java @@ -16,8 +16,8 @@ */ package org.apache.camel.component.braintree; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.braintreegateway.BraintreeGateway; import org.apache.camel.CamelContext; @@ -41,12 +41,12 @@ public class BraintreeComponent extends AbstractApiComponent<BraintreeApiName, B public BraintreeComponent() { super(BraintreeApiName.class, BraintreeApiCollection.getCollection()); - this.gateways = new HashMap<>(); + this.gateways = new ConcurrentHashMap<>(); } public BraintreeComponent(CamelContext context) { super(context, BraintreeApiName.class, BraintreeApiCollection.getCollection()); - this.gateways = new HashMap<>(); + this.gateways = new ConcurrentHashMap<>(); } @Override @@ -63,20 +63,12 @@ public class BraintreeComponent extends AbstractApiComponent<BraintreeApiName, B return new BraintreeEndpoint(uri, this, apiName, methodName, endpointConfiguration); } - public synchronized BraintreeGateway getGateway(BraintreeConfiguration configuration) { + public BraintreeGateway getGateway(BraintreeConfiguration configuration) { BraintreeGateway gateway; if (configuration.getAccessToken() != null) { - gateway = gateways.get(configuration.getAccessToken()); - if (gateway == null) { - gateway = configuration.newBraintreeGateway(); - gateways.put(configuration.getAccessToken(), gateway); - } + gateway = gateways.computeIfAbsent(configuration.getAccessToken(), k -> configuration.newBraintreeGateway()); } else { - gateway = gateways.get(configuration.getMerchantId()); - if (gateway == null) { - gateway = configuration.newBraintreeGateway(); - gateways.put(configuration.getMerchantId(), gateway); - } + gateway = gateways.computeIfAbsent(configuration.getMerchantId(), k -> configuration.newBraintreeGateway()); } return gateway; } diff --git a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java index 2f912d78bc5..6aa663fd688 100644 --- a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java +++ b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java @@ -22,6 +22,8 @@ import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.servlet.DispatcherType; @@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory; public class CometdComponent extends DefaultComponent implements SSLContextParametersAware { private static final Logger LOG = LoggerFactory.getLogger(CometdComponent.class); + private final Lock connectorsLock = new ReentrantLock(); private final Map<String, ConnectorRef> connectors = new LinkedHashMap<>(); private List<BayeuxServer.BayeuxServerListener> serverListeners; @@ -116,7 +119,8 @@ public class CometdComponent extends DefaultComponent implements SSLContextParam CometdEndpoint endpoint = prodcon.getEndpoint(); String connectorKey = endpoint.getProtocol() + ":" + endpoint.getUri().getHost() + ":" + endpoint.getPort(); - synchronized (connectors) { + connectorsLock.lock(); + try { ConnectorRef connectorRef = connectors.get(connectorKey); if (connectorRef == null) { ServerConnector connector; @@ -160,6 +164,8 @@ public class CometdComponent extends DefaultComponent implements SSLContextParam } } prodcon.setBayeux(bayeux); + } finally { + connectorsLock.unlock(); } } @@ -171,16 +177,17 @@ public class CometdComponent extends DefaultComponent implements SSLContextParam String connectorKey = endpoint.getProtocol() + ":" + endpoint.getUri().getHost() + ":" + endpoint.getPort(); - synchronized (connectors) { + connectorsLock.lock(); + try { ConnectorRef connectorRef = connectors.get(connectorKey); - if (connectorRef != null) { - if (connectorRef.decrement() == 0) { - connectorRef.server.removeConnector(connectorRef.connector); - connectorRef.connector.stop(); - connectorRef.server.stop(); - connectors.remove(connectorKey); - } + if (connectorRef != null && connectorRef.decrement() == 0) { + connectorRef.server.removeConnector(connectorRef.connector); + connectorRef.connector.stop(); + connectorRef.server.stop(); + connectors.remove(connectorKey); } + } finally { + connectorsLock.unlock(); } } @@ -352,10 +359,15 @@ public class CometdComponent extends DefaultComponent implements SSLContextParam @Override protected void doStop() throws Exception { - for (ConnectorRef connectorRef : connectors.values()) { - connectorRef.connector.stop(); + connectorsLock.lock(); + try { + for (ConnectorRef connectorRef : connectors.values()) { + connectorRef.connector.stop(); + } + connectors.clear(); + } finally { + connectorsLock.unlock(); } - connectors.clear(); super.doStop(); } diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java index eea1972290f..cfa1fae71f4 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.camel.cluster.CamelClusterMember; @@ -44,6 +46,7 @@ final class ConsulClusterView extends AbstractCamelClusterView { private final ConsulClusterConfiguration configuration; private final ConsulLocalMember localMember; + private final Lock sessionIdLock = new ReentrantLock(); private final AtomicReference<String> sessionId; private final Watcher watcher; @@ -113,21 +116,25 @@ final class ConsulClusterView extends AbstractCamelClusterView { if (keyValueClient.releaseLock(this.path, sessionId.get())) { LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId.get()); } - - synchronized (sessionId) { + sessionIdLock.lock(); + try { sessionClient.destroySession(sessionId.getAndSet(null)); localMember.setMaster(false); + } finally { + sessionIdLock.unlock(); } } } private boolean acquireLock() { - synchronized (sessionId) { + sessionIdLock.lock(); + try { String sid = sessionId.get(); return (sid != null) - ? sessionClient.getSessionInfo(sid).map(si -> keyValueClient.acquireLock(path, sid)).orElse(Boolean.FALSE) - : false; + && sessionClient.getSessionInfo(sid).map(si -> keyValueClient.acquireLock(path, sid)).orElse(Boolean.FALSE); + } finally { + sessionIdLock.unlock(); } } @@ -136,7 +143,7 @@ final class ConsulClusterView extends AbstractCamelClusterView { // *********************************************** private final class ConsulLocalMember implements CamelClusterMember { - private AtomicBoolean master = new AtomicBoolean(); + private final AtomicBoolean master = new AtomicBoolean(); void setMaster(boolean master) { if (master && this.master.compareAndSet(false, true)) { @@ -147,7 +154,6 @@ final class ConsulClusterView extends AbstractCamelClusterView { if (!master && this.master.compareAndSet(true, false)) { LOGGER.debug("Leadership lost for session id {}", sessionId.get()); fireLeadershipChangedEvent(getLeader().orElse(null)); - return; } } diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java index 8b252244f4c..ed171b5fffc 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.couchbase; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Collection; @@ -43,6 +46,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class); + private final Lock lock = new ReentrantLock(); private final CouchbaseEndpoint endpoint; private final Bucket bucket; private final Collection collection; @@ -109,64 +113,69 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R } @Override - protected synchronized int poll() throws Exception { - ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(), this.viewOptions); - - // okay we have some response from CouchBase so lets mark the consumer as ready - forceConsumerAsReady(); + protected int poll() throws Exception { + lock.lock(); + try { + ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(), this.viewOptions); - if (LOG.isTraceEnabled()) { - LOG.trace("ViewResponse = {}", result); - } + // okay we have some response from CouchBase so lets mark the consumer as ready + forceConsumerAsReady(); - String consumerProcessedStrategy = endpoint.getConsumerProcessedStrategy(); - for (ViewRow row : result.rows()) { - Object doc; - String id = row.id().get(); - if (endpoint.isFullDocument()) { - doc = CouchbaseCollectionOperation.getDocument(collection, id, endpoint.getQueryTimeout()); - } else { - doc = row.valueAs(Object.class); + if (LOG.isTraceEnabled()) { + LOG.trace("ViewResponse = {}", result); } - String key = row.keyAs(JsonNode.class).get().asText(); - String designDocumentName = endpoint.getDesignDocumentName(); - String viewName = endpoint.getViewName(); - - Exchange exchange = createExchange(false); - try { - exchange.getIn().setBody(doc); - exchange.getIn().setHeader(HEADER_ID, id); - exchange.getIn().setHeader(HEADER_KEY, key); - exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); - exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); - - if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Deleting doc with ID {}", id); - } - CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(), - endpoint.getProducerRetryPause()); - } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Filtering out ID {}", id); - } - // add filter for already processed docs + String consumerProcessedStrategy = endpoint.getConsumerProcessedStrategy(); + for (ViewRow row : result.rows()) { + Object doc; + String id = row.id().get(); + if (endpoint.isFullDocument()) { + doc = CouchbaseCollectionOperation.getDocument(collection, id, endpoint.getQueryTimeout()); } else { - LOG.trace("No strategy set for already processed docs, beware of duplicates!"); + doc = row.valueAs(Object.class); } - logDetails(id, doc, key, designDocumentName, viewName, exchange); + String key = row.keyAs(JsonNode.class).get().asText(); + String designDocumentName = endpoint.getDesignDocumentName(); + String viewName = endpoint.getViewName(); + + Exchange exchange = createExchange(false); + try { + exchange.getIn().setBody(doc); + exchange.getIn().setHeader(HEADER_ID, id); + exchange.getIn().setHeader(HEADER_KEY, key); + exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); + exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); + + if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Deleting doc with ID {}", id); + } + CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(), + endpoint.getProducerRetryPause()); + } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Filtering out ID {}", id); + } + // add filter for already processed docs + } else { + LOG.trace("No strategy set for already processed docs, beware of duplicates!"); + } + + logDetails(id, doc, key, designDocumentName, viewName, exchange); - getProcessor().process(exchange); - } catch (Exception e) { - this.getExceptionHandler().handleException("Error processing exchange.", exchange, e); - } finally { - releaseExchange(exchange, false); + getProcessor().process(exchange); + } catch (Exception e) { + this.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } finally { + releaseExchange(exchange, false); + } } - } - return result.rows().size(); + return result.rows().size(); + } finally { + lock.unlock(); + } } private void logDetails(String id, Object doc, String key, String designDocumentName, String viewName, Exchange exchange) { diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java index eb16e14892d..6033d150f1d 100644 --- a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java @@ -27,6 +27,8 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.apache.camel.Exchange; @@ -115,6 +117,7 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest extends CamelTestSupp private static final class SinglePrinterCsvMarshaller extends CsvMarshaller { + private final Lock lock = new ReentrantLock(); private final CSVPrinter printer; private SinglePrinterCsvMarshaller(CSVFormat format) { @@ -134,7 +137,8 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest extends CamelTestSupp @Override @SuppressWarnings("unchecked") public void marshal(Exchange exchange, Object object, OutputStream outputStream) throws IOException { - synchronized (printer) { + lock.lock(); + try { if (object instanceof Map) { Map map = (Map) object; printer.printRecord(getMapRecordValues(map)); @@ -149,6 +153,8 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest extends CamelTestSupp outputStream.write(stringBuilder.toString().getBytes()); // Reset the 'Appendable' for the next exchange. stringBuilder.setLength(0); + } finally { + lock.unlock(); } } diff --git a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java index 809bd7c8801..d03dfcee08f 100644 --- a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java +++ b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java @@ -879,7 +879,7 @@ public class CxfRsProducer extends DefaultAsyncProducer { * Cache contains {@link org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean} */ class ClientFactoryBeanCache { - private Map<String, JAXRSClientFactoryBean> cache; + private final Map<String, JAXRSClientFactoryBean> cache; ClientFactoryBeanCache(final int maxCacheSize) { this.cache = LRUCacheFactory.newLRUSoftCache(maxCacheSize); @@ -896,22 +896,16 @@ public class CxfRsProducer extends DefaultAsyncProducer { } public JAXRSClientFactoryBean get(String address) { - JAXRSClientFactoryBean retVal = null; - synchronized (cache) { - retVal = cache.get(address); - - if (retVal == null) { - retVal = ((CxfRsEndpoint) getEndpoint()).createJAXRSClientFactoryBean(address); - - cache.put(address, retVal); - - LOG.trace("Created client factory bean and add to cache for address '{}'", address); - - } else { - LOG.trace("Retrieved client factory bean from cache for address '{}'", address); - } - } - return retVal; + return cache.compute(address, + (key, value) -> { + if (value == null) { + value = ((CxfRsEndpoint) getEndpoint()).createJAXRSClientFactoryBean(address); + LOG.trace("Created client factory bean and add to cache for address '{}'", address); + } else { + LOG.trace("Retrieved client factory bean from cache for address '{}'", address); + } + return value; + }); } } }