This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e7bd122c8a CAMEL-20199: Remove synchronized blocks from components R 
to S (#16288)
1e7bd122c8a is described below

commit 1e7bd122c8ac5e962b7a7878ce2a95cb8b16bfec
Author: Nicolas Filotto <essob...@users.noreply.github.com>
AuthorDate: Fri Nov 15 09:33:54 2024 +0100

    CAMEL-20199: Remove synchronized blocks from components R to S (#16288)
    
    ## Motivation
    
    For better support of virtual threads, we need to avoid lengthy and 
frequent pinning by replacing synchronized blocks with ReentrantLocks
    
    ## Modifications:
    
    * Replace mutex with locks
    * Use locks instead of synchronized blocks
    * Leverage ConcurrentMap methods to get rid of synchronized blocks
---
 .../streams/ReactiveStreamsCamelSubscriber.java    |  57 ++++++--
 .../reactive/streams/ReactiveStreamsComponent.java |  45 ++++---
 .../streams/engine/DelayedMonoPublisher.java       |  32 ++++-
 .../reactor/engine/ReactorCamelProcessor.java      |  69 ++++++----
 .../camel/component/rocketmq/RocketMQProducer.java |   5 +-
 .../rxjava/engine/RxJavaCamelProcessor.java        |  69 ++++++----
 .../salesforce/internal/SalesforceSession.java     | 136 ++++++++++---------
 .../internal/streaming/SubscriptionHelper.java     |  86 ++++++------
 .../component/scheduler/SchedulerComponent.java    |  66 ++++-----
 .../camel/component/seda/QueueReference.java       |  55 +++++---
 .../apache/camel/component/seda/SedaComponent.java | 107 ++++++++-------
 .../apache/camel/component/seda/SedaEndpoint.java  | 123 +++++++++--------
 .../component/servicenow/auth/OAuthToken.java      | 102 +++++++-------
 .../apache/camel/component/sjms/SjmsComponent.java |  19 ++-
 .../apache/camel/component/sjms/SjmsProducer.java  |   5 +-
 .../sjms/consumer/EndpointMessageListener.java     |  17 ++-
 .../consumer/SimpleMessageListenerContainer.java   |  41 ++++--
 .../sjms/reply/MessageSelectorCreator.java         |  52 +++++---
 .../component/sjms/reply/QueueReplyManager.java    |   5 +-
 .../component/splunk/SplunkConnectionFactory.java  | 148 +++++++++++----------
 .../camel/component/splunk/SplunkEndpoint.java     |  21 +--
 .../component/splunk/support/SplunkDataWriter.java |  30 +++--
 .../component/splunk/support/SubmitDataWriter.java |  19 ++-
 .../springrabbit/EndpointMessageListener.java      |  16 ++-
 .../SpringSecurityAuthorizationPolicy.java         |  24 ++--
 .../spring/ws/SpringWebserviceProducer.java        |  12 +-
 .../camel/component/event/EventEndpoint.java       |  22 ++-
 .../spring/spi/TransactionErrorHandlerReifier.java |  43 +++---
 .../component/stax/StAXJAXBIteratorExpression.java |  19 +--
 .../camel/component/stream/StreamConsumer.java     |  62 +++++----
 .../camel/component/stream/StreamProducer.java     |   6 +-
 31 files changed, 900 insertions(+), 613 deletions(-)

diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
index ebf827f47ab..444fb42ef67 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.reactive.streams;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.reactivestreams.Subscriber;
@@ -37,6 +39,7 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
      */
     private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE;
 
+    private final Lock lock = new ReentrantLock();
     private final String name;
 
     private ReactiveStreamsConsumer consumer;
@@ -52,22 +55,33 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
     }
 
     public void attachConsumer(ReactiveStreamsConsumer consumer) {
-        synchronized (this) {
+        lock.lock();
+        try {
             if (this.consumer != null) {
                 throw new IllegalStateException("A consumer is already 
attached to the stream '" + name + "'");
             }
             this.consumer = consumer;
+        } finally {
+            lock.unlock();
         }
         refill();
     }
 
-    public synchronized ReactiveStreamsConsumer getConsumer() {
-        return consumer;
+    public ReactiveStreamsConsumer getConsumer() {
+        lock.lock();
+        try {
+            return consumer;
+        } finally {
+            lock.unlock();
+        }
     }
 
     public void detachConsumer() {
-        synchronized (this) {
+        lock.lock();
+        try {
             this.consumer = null;
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -78,12 +92,15 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
         }
 
         boolean allowed = true;
-        synchronized (this) {
+        lock.lock();
+        try {
             if (this.subscription != null) {
                 allowed = false;
             } else {
                 this.subscription = subscription;
             }
+        } finally {
+            lock.unlock();
         }
 
         if (!allowed) {
@@ -101,7 +118,8 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
         }
 
         ReactiveStreamsConsumer target;
-        synchronized (this) {
+        lock.lock();
+        try {
             if (requested < UNBOUNDED_REQUESTS) {
                 // When there are UNBOUNDED_REQUESTS, they remain constant
                 requested--;
@@ -110,12 +128,17 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
             if (target != null) {
                 inflightCount++;
             }
+        } finally {
+            lock.unlock();
         }
 
         if (target != null) {
             target.process(exchange, doneSync -> {
-                synchronized (this) {
+                lock.lock();
+                try {
                     inflightCount--;
+                } finally {
+                    lock.unlock();
                 }
 
                 refill();
@@ -129,7 +152,8 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
     protected void refill() {
         Long toBeRequested = null;
         Subscription subs = null;
-        synchronized (this) {
+        lock.lock();
+        try {
             if (consumer != null && this.subscription != null) {
                 Integer consMax = 
consumer.getEndpoint().getMaxInflightExchanges();
                 long max = (consMax != null && consMax > 0) ? 
consMax.longValue() : UNBOUNDED_REQUESTS;
@@ -144,6 +168,8 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
                     }
                 }
             }
+        } finally {
+            lock.unlock();
         }
 
         if (toBeRequested != null) {
@@ -160,9 +186,12 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
         LOG.error("Error in reactive stream '{}'", name, throwable);
 
         ReactiveStreamsConsumer consumer;
-        synchronized (this) {
+        lock.lock();
+        try {
             consumer = this.consumer;
             this.subscription = null;
+        } finally {
+            lock.unlock();
         }
 
         if (consumer != null) {
@@ -176,9 +205,12 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
         LOG.info("Reactive stream '{}' completed", name);
 
         ReactiveStreamsConsumer consumer;
-        synchronized (this) {
+        lock.lock();
+        try {
             consumer = this.consumer;
             this.subscription = null;
+        } finally {
+            lock.unlock();
         }
 
         if (consumer != null) {
@@ -189,8 +221,11 @@ public class ReactiveStreamsCamelSubscriber implements 
Subscriber<Exchange>, Clo
     @Override
     public void close() throws IOException {
         Subscription subscription;
-        synchronized (this) {
+        lock.lock();
+        try {
             subscription = this.subscription;
+        } finally {
+            lock.unlock();
         }
 
         if (subscription != null) {
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
index c4524e6028b..640b9d5f16d 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
@@ -161,29 +161,34 @@ public class ReactiveStreamsComponent extends 
DefaultComponent {
      *
      * @return the reactive streams service
      */
-    public synchronized CamelReactiveStreamsService 
getReactiveStreamsService() {
-        if (reactiveStreamsEngineConfiguration == null) {
-            reactiveStreamsEngineConfiguration = new 
ReactiveStreamsEngineConfiguration();
-            
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
-            
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
-            
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
-        }
+    public CamelReactiveStreamsService getReactiveStreamsService() {
+        lock.lock();
+        try {
+            if (reactiveStreamsEngineConfiguration == null) {
+                reactiveStreamsEngineConfiguration = new 
ReactiveStreamsEngineConfiguration();
+                
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
+                
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
+                
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
+            }
 
-        if (service == null) {
-            this.service = ReactiveStreamsHelper.resolveReactiveStreamsService(
-                    getCamelContext(),
-                    this.serviceType,
-                    this.reactiveStreamsEngineConfiguration);
-
-            try {
-                // Start the service and add it to the Camel context to expose 
managed attributes
-                getCamelContext().addService(service, true, true);
-            } catch (Exception e) {
-                throw new RuntimeCamelException(e);
+            if (service == null) {
+                this.service = 
ReactiveStreamsHelper.resolveReactiveStreamsService(
+                        getCamelContext(),
+                        this.serviceType,
+                        this.reactiveStreamsEngineConfiguration);
+
+                try {
+                    // Start the service and add it to the Camel context to 
expose managed attributes
+                    getCamelContext().addService(service, true, true);
+                } catch (Exception e) {
+                    throw new RuntimeCamelException(e);
+                }
             }
-        }
 
-        return service;
+            return service;
+        } finally {
+            lock.unlock();
+        }
     }
 
     // ****************************************
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
index dd8c4929c7d..e9615048c20 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -22,6 +22,8 @@ import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
@@ -126,6 +128,7 @@ public class DelayedMonoPublisher<T> implements 
Publisher<T> {
         private volatile boolean requested;
 
         private final Subscriber<? super T> subscriber;
+        private final Lock lock = new ReentrantLock();
 
         private MonoSubscription(Subscriber<? super T> subscriber) {
             this.subscriber = subscriber;
@@ -133,21 +136,30 @@ public class DelayedMonoPublisher<T> implements 
Publisher<T> {
 
         @Override
         public void request(long l) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (terminated) {
                     // just ignore the request
                     return;
                 }
+            } finally {
+                lock.unlock();
             }
 
             if (l <= 0) {
                 subscriber.onError(new IllegalArgumentException("3.9"));
-                synchronized (this) {
+                lock.lock();
+                try {
                     terminated = true;
+                } finally {
+                    lock.unlock();
                 }
             } else {
-                synchronized (this) {
+                lock.lock();
+                try {
                     requested = true;
+                } finally {
+                    lock.unlock();
                 }
             }
 
@@ -155,12 +167,15 @@ public class DelayedMonoPublisher<T> implements 
Publisher<T> {
         }
 
         public void flush() {
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (!isReady()) {
                     return;
                 }
 
                 terminated = true;
+            } finally {
+                lock.unlock();
             }
 
             if (data != null) {
@@ -180,8 +195,13 @@ public class DelayedMonoPublisher<T> implements 
Publisher<T> {
         }
 
         @Override
-        public synchronized void cancel() {
-            terminated = true;
+        public void cancel() {
+            lock.lock();
+            try {
+                terminated = true;
+            } finally {
+                lock.unlock();
+            }
         }
     }
 }
diff --git 
a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
 
b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
index c33100a813d..47733ed920d 100644
--- 
a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
+++ 
b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
@@ -20,6 +20,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import 
org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
@@ -40,6 +42,7 @@ final class ReactorCamelProcessor implements Closeable {
     private final AtomicReference<FluxSink<Exchange>> camelSink;
 
     private final ReactorStreamsService service;
+    private final Lock lock = new ReentrantLock();
     private ReactiveStreamsProducer camelProducer;
 
     ReactorCamelProcessor(ReactorStreamsService service, String name) {
@@ -64,41 +67,51 @@ final class ReactorCamelProcessor implements Closeable {
         return publisher;
     }
 
-    synchronized void attach(ReactiveStreamsProducer producer) {
-        Objects.requireNonNull(producer, "producer cannot be null, use the 
detach method");
+    void attach(ReactiveStreamsProducer producer) {
+        lock.lock();
+        try {
+            Objects.requireNonNull(producer, "producer cannot be null, use the 
detach method");
 
-        if (this.camelProducer != null) {
-            throw new IllegalStateException("A producer is already attached to 
the stream '" + name + "'");
-        }
-
-        if (this.camelProducer != producer) { // this condition is always true
-            detach();
-
-            ReactiveStreamsBackpressureStrategy strategy = 
producer.getEndpoint().getBackpressureStrategy();
-            Flux<Exchange> flux = Flux.create(camelSink::set, 
FluxSink.OverflowStrategy.IGNORE);
-
-            if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.OLDEST)) {
-                // signal item emitted for non-dropped items only
-                flux = 
flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
-            } else if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.LATEST)) {
-                // Since there is no callback for dropped elements on 
backpressure "latest", item emission is signaled before dropping
-                // No exception is reported back to the exchanges
-                flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
-            } else {
-                // Default strategy is BUFFER
-                flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, 
this::onBackPressure).handle(this::onItemEmitted);
+            if (this.camelProducer != null) {
+                throw new IllegalStateException("A producer is already 
attached to the stream '" + name + "'");
             }
 
-            flux.subscribe(this.publisher);
+            if (this.camelProducer != producer) { // this condition is always 
true
+                detach();
+
+                ReactiveStreamsBackpressureStrategy strategy = 
producer.getEndpoint().getBackpressureStrategy();
+                Flux<Exchange> flux = Flux.create(camelSink::set, 
FluxSink.OverflowStrategy.IGNORE);
+
+                if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.OLDEST)) {
+                    // signal item emitted for non-dropped items only
+                    flux = 
flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
+                } else if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.LATEST)) {
+                    // Since there is no callback for dropped elements on 
backpressure "latest", item emission is signaled before dropping
+                    // No exception is reported back to the exchanges
+                    flux = 
flux.handle(this::onItemEmitted).onBackpressureLatest();
+                } else {
+                    // Default strategy is BUFFER
+                    flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, 
this::onBackPressure)
+                            .handle(this::onItemEmitted);
+                }
+
+                flux.subscribe(this.publisher);
 
-            camelProducer = producer;
+                camelProducer = producer;
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
-    synchronized void detach() {
-
-        this.camelProducer = null;
-        this.camelSink.set(null);
+    void detach() {
+        lock.lock();
+        try {
+            this.camelProducer = null;
+            this.camelSink.set(null);
+        } finally {
+            lock.unlock();
+        }
     }
 
     void send(Exchange exchange) {
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
index a07006f7695..defd5ea0ebd 100644
--- 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -136,7 +136,8 @@ public class RocketMQProducer extends DefaultAsyncProducer {
 
     protected void initReplyManager() {
         if (!started.get()) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (started.get()) {
                     return;
                 }
@@ -160,6 +161,8 @@ public class RocketMQProducer extends DefaultAsyncProducer {
                     }
                 }
                 started.set(true);
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
 
b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
index 42a29c022c4..ef6135968b1 100644
--- 
a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
+++ 
b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
@@ -20,6 +20,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import io.reactivex.BackpressureStrategy;
 import io.reactivex.Flowable;
@@ -39,6 +41,7 @@ final class RxJavaCamelProcessor implements Closeable {
     private final RxJavaStreamsService service;
     private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter;
     private final FlowableProcessor<Exchange> publisher;
+    private final Lock lock = new ReentrantLock();
     private ReactiveStreamsProducer camelProducer;
 
     RxJavaCamelProcessor(RxJavaStreamsService service, String name) {
@@ -58,40 +61,50 @@ final class RxJavaCamelProcessor implements Closeable {
         return publisher;
     }
 
-    synchronized void attach(ReactiveStreamsProducer producer) {
-        Objects.requireNonNull(producer, "producer cannot be null, use the 
detach method");
+    void attach(ReactiveStreamsProducer producer) {
+        lock.lock();
+        try {
+            Objects.requireNonNull(producer, "producer cannot be null, use the 
detach method");
 
-        if (this.camelProducer != null) {
-            throw new IllegalStateException("A producer is already attached to 
the stream '" + name + "'");
-        }
-
-        if (this.camelProducer != producer) {
-            detach();
-
-            ReactiveStreamsBackpressureStrategy strategy = 
producer.getEndpoint().getBackpressureStrategy();
-            Flowable<Exchange> flow = Flowable.create(camelEmitter::set, 
BackpressureStrategy.MISSING);
-
-            if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.OLDEST)) {
-                flow.onBackpressureDrop(this::onBackPressure)
-                        .doAfterNext(this::onItemEmitted)
-                        .subscribe(this.publisher);
-            } else if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.LATEST)) {
-                flow.doAfterNext(this::onItemEmitted)
-                        .onBackpressureLatest()
-                        .subscribe(this.publisher);
-            } else {
-                flow.doAfterNext(this::onItemEmitted)
-                        .onBackpressureBuffer()
-                        .subscribe(this.publisher);
+            if (this.camelProducer != null) {
+                throw new IllegalStateException("A producer is already 
attached to the stream '" + name + "'");
             }
 
-            camelProducer = producer;
+            if (this.camelProducer != producer) {
+                detach();
+
+                ReactiveStreamsBackpressureStrategy strategy = 
producer.getEndpoint().getBackpressureStrategy();
+                Flowable<Exchange> flow = Flowable.create(camelEmitter::set, 
BackpressureStrategy.MISSING);
+
+                if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.OLDEST)) {
+                    flow.onBackpressureDrop(this::onBackPressure)
+                            .doAfterNext(this::onItemEmitted)
+                            .subscribe(this.publisher);
+                } else if (ObjectHelper.equal(strategy, 
ReactiveStreamsBackpressureStrategy.LATEST)) {
+                    flow.doAfterNext(this::onItemEmitted)
+                            .onBackpressureLatest()
+                            .subscribe(this.publisher);
+                } else {
+                    flow.doAfterNext(this::onItemEmitted)
+                            .onBackpressureBuffer()
+                            .subscribe(this.publisher);
+                }
+
+                camelProducer = producer;
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
-    synchronized void detach() {
-        this.camelProducer = null;
-        this.camelEmitter.set(null);
+    void detach() {
+        lock.lock();
+        try {
+            this.camelProducer = null;
+            this.camelEmitter.set(null);
+        } finally {
+            lock.unlock();
+        }
     }
 
     void send(Exchange exchange) {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
index 60262a5b9c5..12c7c9dddaf 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -157,40 +157,44 @@ public class SalesforceSession extends ServiceSupport {
         }
     }
 
-    public synchronized String login(String oldToken) throws 
SalesforceException {
-
-        // check if we need a new session
-        // this way there's always a single valid session
-        if (accessToken == null || accessToken.equals(oldToken)) {
+    public String login(String oldToken) throws SalesforceException {
+        lock.lock();
+        try {
+            // check if we need a new session
+            // this way there's always a single valid session
+            if (accessToken == null || accessToken.equals(oldToken)) {
 
-            // try revoking the old access token before creating a new one
-            accessToken = oldToken;
-            if (accessToken != null) {
-                try {
-                    logout();
-                } catch (SalesforceException e) {
-                    LOG.warn("Error revoking old access token: {}", 
e.getMessage(), e);
+                // try revoking the old access token before creating a new one
+                accessToken = oldToken;
+                if (accessToken != null) {
+                    try {
+                        logout();
+                    } catch (SalesforceException e) {
+                        LOG.warn("Error revoking old access token: {}", 
e.getMessage(), e);
+                    }
+                    accessToken = null;
                 }
-                accessToken = null;
-            }
 
-            // login to Salesforce and get session id
-            final Request loginPost = getLoginRequest(null);
-            try {
+                // login to Salesforce and get session id
+                final Request loginPost = getLoginRequest(null);
+                try {
 
-                final ContentResponse loginResponse = loginPost.send();
-                parseLoginResponse(loginResponse, 
loginResponse.getContentAsString());
+                    final ContentResponse loginResponse = loginPost.send();
+                    parseLoginResponse(loginResponse, 
loginResponse.getContentAsString());
 
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new SalesforceException("Login error: interrupted", e);
-            } catch (TimeoutException e) {
-                throw new SalesforceException("Login request timeout: " + 
e.getMessage(), e);
-            } catch (ExecutionException e) {
-                throw new SalesforceException("Unexpected login error: " + 
e.getCause().getMessage(), e.getCause());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new SalesforceException("Login error: interrupted", 
e);
+                } catch (TimeoutException e) {
+                    throw new SalesforceException("Login request timeout: " + 
e.getMessage(), e);
+                } catch (ExecutionException e) {
+                    throw new SalesforceException("Unexpected login error: " + 
e.getCause().getMessage(), e.getCause());
+                }
             }
+            return accessToken;
+        } finally {
+            lock.unlock();
         }
-        return accessToken;
     }
 
     /**
@@ -301,11 +305,11 @@ public class SalesforceSession extends ServiceSupport {
      * Parses login response, allows SalesforceSecurityHandler to parse a 
login request for a failed authentication
      * conversation.
      */
-    public synchronized void parseLoginResponse(ContentResponse loginResponse, 
String responseContent)
+    public void parseLoginResponse(ContentResponse loginResponse, String 
responseContent)
             throws SalesforceException {
-        final int responseStatus = loginResponse.getStatus();
-
+        lock.lock();
         try {
+            final int responseStatus = loginResponse.getStatus();
             switch (responseStatus) {
                 case HttpStatus.OK_200:
                     // parse the response to get token
@@ -352,47 +356,55 @@ public class SalesforceSession extends ServiceSupport {
         } catch (IOException e) {
             String msg = "Login error: response parse exception " + 
e.getMessage();
             throw new SalesforceException(msg, e);
+        } finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void logout() throws SalesforceException {
-        if (accessToken == null) {
-            return;
-        }
-
+    public void logout() throws SalesforceException {
+        lock.lock();
         try {
-            String logoutUrl = (instanceUrl == null ? config.getLoginUrl() : 
instanceUrl) + OAUTH2_REVOKE_PATH + accessToken;
-            final Request logoutGet = 
httpClient.newRequest(logoutUrl).timeout(timeout, TimeUnit.MILLISECONDS);
-            final ContentResponse logoutResponse = logoutGet.send();
+            if (accessToken == null) {
+                return;
+            }
 
-            final int statusCode = logoutResponse.getStatus();
+            try {
+                String logoutUrl
+                        = (instanceUrl == null ? config.getLoginUrl() : 
instanceUrl) + OAUTH2_REVOKE_PATH + accessToken;
+                final Request logoutGet = 
httpClient.newRequest(logoutUrl).timeout(timeout, TimeUnit.MILLISECONDS);
+                final ContentResponse logoutResponse = logoutGet.send();
 
-            if (statusCode == HttpStatus.OK_200) {
-                LOG.debug("Logout successful");
-            } else {
-                LOG.debug("Failed to revoke OAuth token. This is expected if 
the token is invalid or already expired");
-            }
+                final int statusCode = logoutResponse.getStatus();
 
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new SalesforceException("Interrupted while logging out", e);
-        } catch (ExecutionException e) {
-            final Throwable ex = e.getCause();
-            throw new SalesforceException("Unexpected logout exception: " + 
ex.getMessage(), ex);
-        } catch (TimeoutException e) {
-            throw new SalesforceException("Logout request TIMEOUT!", e);
-        } finally {
-            // reset session
-            accessToken = null;
-            instanceUrl = null;
-            // notify all session listeners about logout
-            for (SalesforceSessionListener listener : listeners) {
-                try {
-                    listener.onLogout();
-                } catch (Exception t) {
-                    LOG.warn("Unexpected error from listener {}: {}", 
listener, t.getMessage());
+                if (statusCode == HttpStatus.OK_200) {
+                    LOG.debug("Logout successful");
+                } else {
+                    LOG.debug("Failed to revoke OAuth token. This is expected 
if the token is invalid or already expired");
+                }
+
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new SalesforceException("Interrupted while logging out", 
e);
+            } catch (ExecutionException e) {
+                final Throwable ex = e.getCause();
+                throw new SalesforceException("Unexpected logout exception: " 
+ ex.getMessage(), ex);
+            } catch (TimeoutException e) {
+                throw new SalesforceException("Logout request TIMEOUT!", e);
+            } finally {
+                // reset session
+                accessToken = null;
+                instanceUrl = null;
+                // notify all session listeners about logout
+                for (SalesforceSessionListener listener : listeners) {
+                    try {
+                        listener.onLogout();
+                    } catch (Exception t) {
+                        LOG.warn("Unexpected error from listener {}: {}", 
listener, t.getMessage());
+                    }
                 }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 0ffda9096fe..894fcb16820 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -413,25 +413,30 @@ public class SubscriptionHelper extends ServiceSupport {
         return client;
     }
 
-    public synchronized void subscribe(StreamingApiConsumer consumer) {
-        // create subscription for consumer
-        final String channelName = getChannelName(consumer.getTopicName());
-        channelToConsumers.computeIfAbsent(channelName, key -> 
ConcurrentHashMap.newKeySet()).add(consumer);
-        channelsToSubscribe.add(channelName);
-
-        setReplayIdIfAbsent(consumer.getEndpoint());
-
-        // channel message listener
-        LOG.info("Subscribing to channel {}...", channelName);
-        var messageListener = consumerToListener.computeIfAbsent(consumer, key 
-> (channel, message) -> {
-            LOG.debug("Received Message: {}", message);
-            // convert CometD message to Camel Message
-            consumer.processMessage(channel, message);
-        });
-
-        // subscribe asynchronously
-        final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
-        clientChannel.subscribe(messageListener);
+    public void subscribe(StreamingApiConsumer consumer) {
+        lock.lock();
+        try {
+            // create subscription for consumer
+            final String channelName = getChannelName(consumer.getTopicName());
+            channelToConsumers.computeIfAbsent(channelName, key -> 
ConcurrentHashMap.newKeySet()).add(consumer);
+            channelsToSubscribe.add(channelName);
+
+            setReplayIdIfAbsent(consumer.getEndpoint());
+
+            // channel message listener
+            LOG.info("Subscribing to channel {}...", channelName);
+            var messageListener = consumerToListener.computeIfAbsent(consumer, 
key -> (channel, message) -> {
+                LOG.debug("Received Message: {}", message);
+                // convert CometD message to Camel Message
+                consumer.processMessage(channel, message);
+            });
+
+            // subscribe asynchronously
+            final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
+            clientChannel.subscribe(messageListener);
+        } finally {
+            lock.unlock();
+        }
     }
 
     private static boolean isTemporaryError(Message message) {
@@ -506,26 +511,31 @@ public class SubscriptionHelper extends ServiceSupport {
         return channelName.toString();
     }
 
-    public synchronized void unsubscribe(StreamingApiConsumer consumer) {
-        // channel name
-        final String channelName = getChannelName(consumer.getTopicName());
-
-        // unsubscribe from channel
-        var consumers = channelToConsumers.get(channelName);
-        if (consumers != null) {
-            consumers.remove(consumer);
-            if (consumers.isEmpty()) {
-                channelToConsumers.remove(channelName);
+    public void unsubscribe(StreamingApiConsumer consumer) {
+        lock.lock();
+        try {
+            // channel name
+            final String channelName = getChannelName(consumer.getTopicName());
+
+            // unsubscribe from channel
+            var consumers = channelToConsumers.get(channelName);
+            if (consumers != null) {
+                consumers.remove(consumer);
+                if (consumers.isEmpty()) {
+                    channelToConsumers.remove(channelName);
+                }
             }
-        }
-        final ClientSessionChannel.MessageListener listener = 
consumerToListener.remove(consumer);
-        if (listener != null) {
-            LOG.debug("Unsubscribing from channel {}...", channelName);
-            final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
-            // if there are other listeners on this channel, an unsubscribe 
message will not be sent,
-            // so we're not going to listen for and expect an unsub response. 
Just unsub and move on.
-            clientChannel.unsubscribe(listener);
-            clientChannel.release();
+            final ClientSessionChannel.MessageListener listener = 
consumerToListener.remove(consumer);
+            if (listener != null) {
+                LOG.debug("Unsubscribing from channel {}...", channelName);
+                final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
+                // if there are other listeners on this channel, an 
unsubscribe message will not be sent,
+                // so we're not going to listen for and expect an unsub 
response. Just unsub and move on.
+                clientChannel.unsubscribe(listener);
+                clientChannel.release();
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
 
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
index 4c23a7e6bb4..52cdbe76311 100644
--- 
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
+++ 
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
@@ -17,8 +17,8 @@
 package org.apache.camel.component.scheduler;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -29,8 +29,7 @@ import org.apache.camel.support.HealthCheckComponent;
 @org.apache.camel.spi.annotations.Component("scheduler")
 public class SchedulerComponent extends HealthCheckComponent {
 
-    private final Map<String, ScheduledExecutorService> executors = new 
HashMap<>();
-    private final Map<String, AtomicInteger> refCounts = new HashMap<>();
+    private final Map<String, ScheduledExecutorServiceHolder> executors = new 
ConcurrentHashMap<>();
 
     @Metadata
     private boolean includeMetadata;
@@ -75,53 +74,46 @@ public class SchedulerComponent extends 
HealthCheckComponent {
 
     protected ScheduledExecutorService addConsumer(SchedulerConsumer consumer) 
{
         String name = consumer.getEndpoint().getName();
-        int poolSize = consumer.getEndpoint().getPoolSize();
-
-        ScheduledExecutorService answer;
-        synchronized (executors) {
-            answer = executors.get(name);
-            if (answer == null) {
-                answer = 
getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 
"scheduler://" + name,
-                        poolSize);
-                executors.put(name, answer);
-                // store new reference counter
-                refCounts.put(name, new AtomicInteger(1));
-            } else {
-                // increase reference counter
-                AtomicInteger counter = refCounts.get(name);
-                if (counter != null) {
-                    counter.incrementAndGet();
-                }
+        return executors.compute(name, (k, v) -> {
+            if (v == null) {
+                int poolSize = consumer.getEndpoint().getPoolSize();
+                return new ScheduledExecutorServiceHolder(
+                        
getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 
"scheduler://" + name,
+                                poolSize));
             }
-        }
-        return answer;
+            v.refCount.incrementAndGet();
+            return v;
+        }).executorService;
     }
 
     protected void removeConsumer(SchedulerConsumer consumer) {
         String name = consumer.getEndpoint().getName();
 
-        synchronized (executors) {
-            // decrease reference counter
-            AtomicInteger counter = refCounts.get(name);
-            if (counter != null && counter.decrementAndGet() <= 0) {
-                refCounts.remove(name);
-                // remove scheduler as its no longer in use
-                ScheduledExecutorService scheduler = executors.remove(name);
-                if (scheduler != null) {
-                    
getCamelContext().getExecutorServiceManager().shutdown(scheduler);
-                }
+        executors.computeIfPresent(name, (k, v) -> {
+            if (v.refCount.decrementAndGet() == 0) {
+                
getCamelContext().getExecutorServiceManager().shutdown(v.executorService);
+                return null;
             }
-        }
+            return v;
+        });
     }
 
     @Override
     protected void doStop() throws Exception {
-        Collection<ScheduledExecutorService> collection = executors.values();
-        for (ScheduledExecutorService scheduler : collection) {
-            getCamelContext().getExecutorServiceManager().shutdown(scheduler);
+        Collection<ScheduledExecutorServiceHolder> collection = 
executors.values();
+        for (ScheduledExecutorServiceHolder holder : collection) {
+            
getCamelContext().getExecutorServiceManager().shutdown(holder.executorService);
         }
         executors.clear();
-        refCounts.clear();
     }
 
+    private static class ScheduledExecutorServiceHolder {
+        private final ScheduledExecutorService executorService;
+        private final AtomicInteger refCount;
+
+        ScheduledExecutorServiceHolder(ScheduledExecutorService 
executorService) {
+            this.executorService = executorService;
+            this.refCount = new AtomicInteger(1);
+        }
+    }
 }
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
index 61dd5922259..4efa264ba34 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.seda;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 
@@ -34,7 +36,8 @@ public final class QueueReference {
     private Integer size;
     private Boolean multipleConsumers;
 
-    private List<SedaEndpoint> endpoints = new LinkedList<>();
+    private final Lock lock = new ReentrantLock();
+    private final List<SedaEndpoint> endpoints = new LinkedList<>();
 
     QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean 
multipleConsumers) {
         this.queue = queue;
@@ -42,27 +45,40 @@ public final class QueueReference {
         this.multipleConsumers = multipleConsumers;
     }
 
-    synchronized void addReference(SedaEndpoint endpoint) {
-        if (!endpoints.contains(endpoint)) {
-            endpoints.add(endpoint);
-            // update the multipleConsumers setting if need
-            if (endpoint.isMultipleConsumers()) {
-                multipleConsumers = true;
+    void addReference(SedaEndpoint endpoint) {
+        lock.lock();
+        try {
+            if (!endpoints.contains(endpoint)) {
+                endpoints.add(endpoint);
+                // update the multipleConsumers setting if need
+                if (endpoint.isMultipleConsumers()) {
+                    multipleConsumers = true;
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
-    synchronized void removeReference(SedaEndpoint endpoint) {
-        if (endpoints.contains(endpoint)) {
+    void removeReference(SedaEndpoint endpoint) {
+        lock.lock();
+        try {
             endpoints.remove(endpoint);
+        } finally {
+            lock.unlock();
         }
     }
 
     /**
      * Gets the reference counter
      */
-    public synchronized int getCount() {
-        return endpoints.size();
+    public int getCount() {
+        lock.lock();
+        try {
+            return endpoints.size();
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
@@ -85,13 +101,18 @@ public final class QueueReference {
         return queue;
     }
 
-    public synchronized boolean hasConsumers() {
-        for (SedaEndpoint endpoint : endpoints) {
-            if (!endpoint.getConsumers().isEmpty()) {
-                return true;
+    public boolean hasConsumers() {
+        lock.lock();
+        try {
+            for (SedaEndpoint endpoint : endpoints) {
+                if (!endpoint.getConsumers().isEmpty()) {
+                    return true;
+                }
             }
-        }
 
-        return false;
+            return false;
+        } finally {
+            lock.unlock();
+        }
     }
 }
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index cdf46a989ef..05dfb0bb2f0 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -144,69 +144,78 @@ public class SedaComponent extends DefaultComponent {
         this.defaultPollTimeout = defaultPollTimeout;
     }
 
-    public synchronized QueueReference getOrCreateQueue(
+    public QueueReference getOrCreateQueue(
             SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, 
BlockingQueueFactory<Exchange> customQueueFactory) {
+        lock.lock();
+        try {
+            String key = getQueueKey(endpoint.getEndpointUri());
 
-        String key = getQueueKey(endpoint.getEndpointUri());
-
-        if (size == null) {
-            // there may be a custom size during startup
-            size = customSize.get(key);
-        }
-
-        QueueReference ref = getQueues().get(key);
-        if (ref != null) {
-            // if the given size is not provided, we just use the existing 
queue as is
-            if (size != null && !size.equals(ref.getSize())) {
-                // there is already a queue, so make sure the size matches
-                throw new IllegalArgumentException(
-                        "Cannot use existing queue " + key + " as the existing 
queue size "
-                                                   + (ref.getSize() != null ? 
ref.getSize() : SedaConstants.QUEUE_SIZE)
-                                                   + " does not match given 
queue size " + size);
+            if (size == null) {
+                // there may be a custom size during startup
+                size = customSize.get(key);
             }
-            // add the reference before returning queue
-            ref.addReference(endpoint);
 
-            if (log.isDebugEnabled()) {
-                log.debug("Reusing existing queue {} with size {} and 
reference count {}", key, size, ref.getCount());
+            QueueReference ref = getQueues().get(key);
+            if (ref != null) {
+                // if the given size is not provided, we just use the existing 
queue as is
+                if (size != null && !size.equals(ref.getSize())) {
+                    // there is already a queue, so make sure the size matches
+                    throw new IllegalArgumentException(
+                            "Cannot use existing queue " + key + " as the 
existing queue size "
+                                                       + (ref.getSize() != 
null ? ref.getSize() : SedaConstants.QUEUE_SIZE)
+                                                       + " does not match 
given queue size " + size);
+                }
+                // add the reference before returning queue
+                ref.addReference(endpoint);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Reusing existing queue {} with size {} and 
reference count {}", key, size, ref.getCount());
+                }
+                return ref;
             }
-            return ref;
-        }
 
-        // create queue
-        BlockingQueue<Exchange> queue;
-        BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == 
null ? defaultQueueFactory : customQueueFactory;
-        if (size != null && size > 0) {
-            queue = queueFactory.create(size);
-        } else {
-            if (getQueueSize() > 0) {
-                size = getQueueSize();
-                queue = queueFactory.create(getQueueSize());
+            // create queue
+            BlockingQueue<Exchange> queue;
+            BlockingQueueFactory<Exchange> queueFactory = customQueueFactory 
== null ? defaultQueueFactory : customQueueFactory;
+            if (size != null && size > 0) {
+                queue = queueFactory.create(size);
             } else {
-                queue = queueFactory.create();
+                if (getQueueSize() > 0) {
+                    size = getQueueSize();
+                    queue = queueFactory.create(getQueueSize());
+                } else {
+                    queue = queueFactory.create();
+                }
             }
-        }
-        log.debug("Created queue {} with size {}", key, size);
+            log.debug("Created queue {} with size {}", key, size);
 
-        // create and add a new reference queue
-        ref = new QueueReference(queue, size, multipleConsumers);
-        ref.addReference(endpoint);
-        getQueues().put(key, ref);
+            // create and add a new reference queue
+            ref = new QueueReference(queue, size, multipleConsumers);
+            ref.addReference(endpoint);
+            getQueues().put(key, ref);
 
-        return ref;
+            return ref;
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized QueueReference registerQueue(SedaEndpoint endpoint, 
BlockingQueue<Exchange> queue) {
-        String key = getQueueKey(endpoint.getEndpointUri());
+    public QueueReference registerQueue(SedaEndpoint endpoint, 
BlockingQueue<Exchange> queue) {
+        lock.lock();
+        try {
+            String key = getQueueKey(endpoint.getEndpointUri());
 
-        QueueReference ref = getQueues().get(key);
-        if (ref == null) {
-            ref = new QueueReference(queue, endpoint.getSize(), 
endpoint.isMultipleConsumers());
-            ref.addReference(endpoint);
-            getQueues().put(key, ref);
-        }
+            QueueReference ref = getQueues().get(key);
+            if (ref == null) {
+                ref = new QueueReference(queue, endpoint.getSize(), 
endpoint.isMultipleConsumers());
+                ref.addReference(endpoint);
+                getQueues().put(key, ref);
+            }
 
-        return ref;
+            return ref;
+        } finally {
+            lock.unlock();
+        }
     }
 
     public Map<String, QueueReference> getQueues() {
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index d03d320e6be..8c4710edc04 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -183,30 +183,35 @@ public class SedaEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Brow
         return answer;
     }
 
-    public synchronized BlockingQueue<Exchange> getQueue() {
-        if (queue == null) {
-            // prefer to lookup queue from component, so if this endpoint is 
re-created or re-started
-            // then the existing queue from the component can be used, so new 
producers and consumers
-            // can use the already existing queue referenced from the component
-            if (getComponent() != null) {
-                // use null to indicate default size (= use what the existing 
queue has been configured with)
-                Integer size = (getSize() == Integer.MAX_VALUE || getSize() == 
SedaConstants.QUEUE_SIZE) ? null : getSize();
-                QueueReference ref = getComponent().getOrCreateQueue(this, 
size, isMultipleConsumers(), queueFactory);
-                queue = ref.getQueue();
-                String key = getComponent().getQueueKey(getEndpointUri());
-                LOG.debug("Endpoint {} is using shared queue: {} with size: 
{}", this, key,
-                        ref.getSize() != null ? ref.getSize() : 
Integer.MAX_VALUE);
-                // and set the size we are using
-                if (ref.getSize() != null) {
-                    setSize(ref.getSize());
+    public BlockingQueue<Exchange> getQueue() {
+        lock.lock();
+        try {
+            if (queue == null) {
+                // prefer to lookup queue from component, so if this endpoint 
is re-created or re-started
+                // then the existing queue from the component can be used, so 
new producers and consumers
+                // can use the already existing queue referenced from the 
component
+                if (getComponent() != null) {
+                    // use null to indicate default size (= use what the 
existing queue has been configured with)
+                    Integer size = (getSize() == Integer.MAX_VALUE || 
getSize() == SedaConstants.QUEUE_SIZE) ? null : getSize();
+                    QueueReference ref = getComponent().getOrCreateQueue(this, 
size, isMultipleConsumers(), queueFactory);
+                    queue = ref.getQueue();
+                    String key = getComponent().getQueueKey(getEndpointUri());
+                    LOG.debug("Endpoint {} is using shared queue: {} with 
size: {}", this, key,
+                            ref.getSize() != null ? ref.getSize() : 
Integer.MAX_VALUE);
+                    // and set the size we are using
+                    if (ref.getSize() != null) {
+                        setSize(ref.getSize());
+                    }
+                } else {
+                    // fallback and create queue (as this endpoint has no 
component)
+                    queue = createQueue();
+                    LOG.debug("Endpoint {} is using queue: {} with size: {}", 
this, getEndpointUri(), getSize());
                 }
-            } else {
-                // fallback and create queue (as this endpoint has no 
component)
-                queue = createQueue();
-                LOG.debug("Endpoint {} is using queue: {} with size: {}", 
this, getEndpointUri(), getSize());
             }
+            return queue;
+        } finally {
+            lock.unlock();
         }
-        return queue;
     }
 
     protected BlockingQueue<Exchange> createQueue() {
@@ -240,45 +245,55 @@ public class SedaEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Brow
         return null;
     }
 
-    protected synchronized AsyncProcessor getConsumerMulticastProcessor() {
-        if (!multicastStarted && consumerMulticastProcessor != null) {
-            // only start it on-demand to avoid starting it during stopping
-            ServiceHelper.startService(consumerMulticastProcessor);
-            multicastStarted = true;
+    protected AsyncProcessor getConsumerMulticastProcessor() {
+        lock.lock();
+        try {
+            if (!multicastStarted && consumerMulticastProcessor != null) {
+                // only start it on-demand to avoid starting it during stopping
+                ServiceHelper.startService(consumerMulticastProcessor);
+                multicastStarted = true;
+            }
+            return consumerMulticastProcessor;
+        } finally {
+            lock.unlock();
         }
-        return consumerMulticastProcessor;
     }
 
-    protected synchronized void updateMulticastProcessor() throws Exception {
-        // only needed if we support multiple consumers
-        if (!isMultipleConsumersSupported()) {
-            return;
-        }
-
-        // stop old before we create a new
-        if (consumerMulticastProcessor != null) {
-            ServiceHelper.stopService(consumerMulticastProcessor);
-            consumerMulticastProcessor = null;
-        }
-
-        int size = getConsumers().size();
-        if (size >= 1) {
-            if (multicastExecutor == null) {
-                // create multicast executor as we need it when we have more 
than 1 processor
-                multicastExecutor = 
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
-                        URISupport.sanitizeUri(getEndpointUri()) + 
"(multicast)");
+    protected void updateMulticastProcessor() throws Exception {
+        lock.lock();
+        try {
+            // only needed if we support multiple consumers
+            if (!isMultipleConsumersSupported()) {
+                return;
             }
-            // create list of consumers to multicast to
-            List<Processor> processors = new ArrayList<>(size);
-            for (SedaConsumer consumer : getConsumers()) {
-                processors.add(consumer.getProcessor());
+
+            // stop old before we create a new
+            if (consumerMulticastProcessor != null) {
+                ServiceHelper.stopService(consumerMulticastProcessor);
+                consumerMulticastProcessor = null;
             }
-            // create multicast processor
-            multicastStarted = false;
 
-            consumerMulticastProcessor = (AsyncProcessor) 
PluginHelper.getProcessorFactory(getCamelContext())
-                    .createProcessor(getCamelContext(), "MulticastProcessor",
-                            new Object[] { processors, multicastExecutor, 
false });
+            int size = getConsumers().size();
+            if (size >= 1) {
+                if (multicastExecutor == null) {
+                    // create multicast executor as we need it when we have 
more than 1 processor
+                    multicastExecutor = 
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
+                            URISupport.sanitizeUri(getEndpointUri()) + 
"(multicast)");
+                }
+                // create list of consumers to multicast to
+                List<Processor> processors = new ArrayList<>(size);
+                for (SedaConsumer consumer : getConsumers()) {
+                    processors.add(consumer.getProcessor());
+                }
+                // create multicast processor
+                multicastStarted = false;
+
+                consumerMulticastProcessor = (AsyncProcessor) 
PluginHelper.getProcessorFactory(getCamelContext())
+                        .createProcessor(getCamelContext(), 
"MulticastProcessor",
+                                new Object[] { processors, multicastExecutor, 
false });
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
 
b/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
index f317cd47015..74959b9815d 100644
--- 
a/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
+++ 
b/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.servicenow.auth;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.component.servicenow.ServiceNowConfiguration;
 import org.apache.cxf.jaxrs.client.WebClient;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 public class OAuthToken {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OAuthToken.class);
 
+    private final Lock lock = new ReentrantLock();
     private final ServiceNowConfiguration configuration;
     private ClientAccessToken token;
     private String authString;
@@ -42,54 +45,59 @@ public class OAuthToken {
         this.expireAt = 0;
     }
 
-    private synchronized void getOrRefreshAccessToken() {
-        if (token == null) {
-            LOGGER.debug("Generate OAuth token");
-
-            token = OAuthClientUtils.getAccessToken(
-                    WebClient.create(configuration.getOauthTokenUrl()),
-                    new Consumer(
-                            configuration.getOauthClientId(),
-                            configuration.getOauthClientSecret()),
-                    new ResourceOwnerGrant(
-                            configuration.getUserName(),
-                            configuration.getPassword()),
-                    true);
-
-            LOGGER.debug("OAuth token expires in {}s", token.getExpiresIn());
-
-            // Set expiration time related info in milliseconds
-            token.setIssuedAt(System.currentTimeMillis());
-            
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), 
TimeUnit.SECONDS));
-
-            authString = token.toString();
-
-            if (token.getExpiresIn() > 0) {
-                expireAt = token.getIssuedAt() + token.getExpiresIn();
-            }
-        } else if (expireAt > 0 && System.currentTimeMillis() >= expireAt) {
-            LOGGER.debug("OAuth token is expired, refresh it");
-
-            token = OAuthClientUtils.refreshAccessToken(
-                    WebClient.create(configuration.getOauthTokenUrl()),
-                    new Consumer(
-                            configuration.getOauthClientId(),
-                            configuration.getOauthClientSecret()),
-                    token,
-                    null,
-                    false);
-
-            LOGGER.debug("Refreshed OAuth token expires in {}s", 
token.getExpiresIn());
-
-            // Set expiration time related info in milliseconds
-            token.setIssuedAt(System.currentTimeMillis());
-            
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), 
TimeUnit.SECONDS));
-
-            authString = token.toString();
-
-            if (token.getExpiresIn() > 0) {
-                expireAt = token.getIssuedAt() + token.getExpiresIn();
+    private void getOrRefreshAccessToken() {
+        lock.lock();
+        try {
+            if (token == null) {
+                LOGGER.debug("Generate OAuth token");
+
+                token = OAuthClientUtils.getAccessToken(
+                        WebClient.create(configuration.getOauthTokenUrl()),
+                        new Consumer(
+                                configuration.getOauthClientId(),
+                                configuration.getOauthClientSecret()),
+                        new ResourceOwnerGrant(
+                                configuration.getUserName(),
+                                configuration.getPassword()),
+                        true);
+
+                LOGGER.debug("OAuth token expires in {}s", 
token.getExpiresIn());
+
+                // Set expiration time related info in milliseconds
+                token.setIssuedAt(System.currentTimeMillis());
+                
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), 
TimeUnit.SECONDS));
+
+                authString = token.toString();
+
+                if (token.getExpiresIn() > 0) {
+                    expireAt = token.getIssuedAt() + token.getExpiresIn();
+                }
+            } else if (expireAt > 0 && System.currentTimeMillis() >= expireAt) 
{
+                LOGGER.debug("OAuth token is expired, refresh it");
+
+                token = OAuthClientUtils.refreshAccessToken(
+                        WebClient.create(configuration.getOauthTokenUrl()),
+                        new Consumer(
+                                configuration.getOauthClientId(),
+                                configuration.getOauthClientSecret()),
+                        token,
+                        null,
+                        false);
+
+                LOGGER.debug("Refreshed OAuth token expires in {}s", 
token.getExpiresIn());
+
+                // Set expiration time related info in milliseconds
+                token.setIssuedAt(System.currentTimeMillis());
+                
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), 
TimeUnit.SECONDS));
+
+                authString = token.toString();
+
+                if (token.getExpiresIn() > 0) {
+                    expireAt = token.getIssuedAt() + token.getExpiresIn();
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 585482ce544..ea285912b88 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -134,14 +134,19 @@ public class SjmsComponent extends 
HeaderFilterStrategyComponent {
         super.doShutdown();
     }
 
-    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
-        if (asyncStartStopExecutorService == null) {
-            // use a cached thread pool for async start tasks as they can run 
for a while, and we need a dedicated thread
-            // for each task, and the thread pool will shrink when no more 
tasks running
-            asyncStartStopExecutorService
-                    = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, 
"AsyncStartStopListener");
+    protected ExecutorService getAsyncStartStopExecutorService() {
+        lock.lock();
+        try {
+            if (asyncStartStopExecutorService == null) {
+                // use a cached thread pool for async start tasks as they can 
run for a while, and we need a dedicated thread
+                // for each task, and the thread pool will shrink when no more 
tasks running
+                asyncStartStopExecutorService
+                        = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, 
"AsyncStartStopListener");
+            }
+            return asyncStartStopExecutorService;
+        } finally {
+            lock.unlock();
         }
-        return asyncStartStopExecutorService;
     }
 
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 9f71a770abd..6ff8154ea8a 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -95,7 +95,8 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
     protected void initReplyManager() {
         if (!started.get()) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (started.get()) {
                     return;
                 }
@@ -136,6 +137,8 @@ public class SjmsProducer extends DefaultAsyncProducer {
                     
Thread.currentThread().setContextClassLoader(oldClassLoader);
                 }
                 started.set(true);
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index 3aa5eb35349..63fe3dd1c33 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.sjms.consumer;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import jakarta.jms.Connection;
 import jakarta.jms.Destination;
 import jakarta.jms.JMSException;
@@ -66,6 +69,7 @@ public class EndpointMessageListener implements 
SessionMessageListener {
     private boolean eagerLoadingOfProperties;
     private String eagerPoisonBody;
     private volatile SjmsTemplate template;
+    private final Lock lock = new ReentrantLock();
 
     public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint 
endpoint, Processor processor) {
         this.consumer = consumer;
@@ -73,11 +77,16 @@ public class EndpointMessageListener implements 
SessionMessageListener {
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
-    public synchronized SjmsTemplate getTemplate() {
-        if (template == null) {
-            template = endpoint.createInOnlyTemplate();
+    public SjmsTemplate getTemplate() {
+        lock.lock();
+        try {
+            if (template == null) {
+                template = endpoint.createInOnlyTemplate();
+            }
+            return template;
+        } finally {
+            lock.unlock();
         }
-        return template;
     }
 
     public void setTemplate(SjmsTemplate template) {
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
index 0a931703378..081e98e65a0 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.sjms.consumer;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import jakarta.jms.Connection;
 import jakarta.jms.ConnectionFactory;
@@ -54,10 +56,10 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
     private String destinationName;
     private DestinationCreationStrategy destinationCreationStrategy;
 
-    private final Object connectionLock = new Object();
+    private final Lock connectionLock = new ReentrantLock();
     private Connection connection;
     private volatile boolean connectionStarted;
-    private final Object consumerLock = new Object();
+    private final Lock consumerLock = new ReentrantLock();
     private Set<MessageConsumer> consumers;
     private Set<Session> sessions;
     private BackOffTimer.Task recoverTask;
@@ -181,9 +183,12 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
             }
         }
 
-        synchronized (this.connectionLock) {
+        connectionLock.lock();
+        try {
             this.sessions = null;
             this.consumers = null;
+        } finally {
+            connectionLock.unlock();
         }
         scheduleConnectionRecovery();
     }
@@ -240,7 +245,8 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
     }
 
     protected void initConsumers() throws Exception {
-        synchronized (this.consumerLock) {
+        consumerLock.lock();
+        try {
             if (consumers == null) {
                 LOG.debug("Initializing {} concurrent consumers as JMS 
listener on destination: {}", concurrentConsumers,
                         destinationName);
@@ -254,6 +260,8 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
                     consumers.add(consumer);
                 }
             }
+        } finally {
+            consumerLock.unlock();
         }
     }
 
@@ -266,7 +274,8 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
     }
 
     protected void stopConsumers() {
-        synchronized (this.consumerLock) {
+        consumerLock.lock();
+        try {
             if (consumers != null) {
                 LOG.debug("Stopping JMS MessageConsumers");
                 for (MessageConsumer consumer : this.consumers) {
@@ -279,11 +288,14 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
                     }
                 }
             }
+        } finally {
+            consumerLock.unlock();
         }
     }
 
     protected void createConnection() throws Exception {
-        synchronized (this.connectionLock) {
+        connectionLock.lock();
+        try {
             if (this.connection == null) {
                 Connection con = null;
                 try {
@@ -300,22 +312,28 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
                 this.connection = con;
                 LOG.debug("Created JMS Connection");
             }
+        } finally {
+            connectionLock.unlock();
         }
     }
 
     protected final void refreshConnection() throws Exception {
-        synchronized (this.connectionLock) {
+        connectionLock.lock();
+        try {
             closeConnection(connection);
             this.connection = null;
             createConnection();
             if (this.connectionStarted) {
                 startConnection();
             }
+        } finally {
+            connectionLock.unlock();
         }
     }
 
     protected void startConnection() throws Exception {
-        synchronized (this.connectionLock) {
+        connectionLock.lock();
+        try {
             this.connectionStarted = true;
             if (this.connection != null) {
                 try {
@@ -324,11 +342,14 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
                     // ignore as it may already be started
                 }
             }
+        } finally {
+            connectionLock.unlock();
         }
     }
 
     protected void stopConnection() {
-        synchronized (this.connectionLock) {
+        connectionLock.lock();
+        try {
             this.connectionStarted = false;
             if (this.connection != null) {
                 try {
@@ -337,6 +358,8 @@ public class SimpleMessageListenerContainer extends 
ServiceSupport
                     LOG.debug("Error stopping connection. This exception is 
ignored.", e);
                 }
             }
+        } finally {
+            connectionLock.unlock();
         }
     }
 
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
index f7aa5899f38..59f8443fd93 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.sjms.reply;
 
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.TimeoutMap;
 import org.slf4j.Logger;
@@ -32,6 +34,7 @@ public class MessageSelectorCreator {
     protected static final Logger LOG = 
LoggerFactory.getLogger(MessageSelectorCreator.class);
     protected final TimeoutMap<String, ?> timeoutMap;
     protected final ConcurrentSkipListSet<String> correlationIds;
+    private final Lock lock = new ReentrantLock();
     protected volatile boolean dirty = true;
     protected StringBuilder expression;
 
@@ -44,34 +47,39 @@ public class MessageSelectorCreator {
         this.correlationIds = new ConcurrentSkipListSet<>();
     }
 
-    public synchronized String get() {
-        if (!dirty) {
-            return expression.toString();
-        }
+    public String get() {
+        lock.lock();
+        try {
+            if (!dirty) {
+                return expression.toString();
+            }
 
-        expression = new StringBuilder(256);
+            expression = new StringBuilder(256);
 
-        expression.append("JMSCorrelationID='");
-        if (correlationIds.isEmpty()) {
-            // no id's so use a dummy to select nothing
-            expression.append("CamelDummyJmsMessageSelector'");
-        } else {
-            boolean first = true;
-            for (String value : correlationIds) {
-                if (!first) {
-                    expression.append(" OR JMSCorrelationID='");
-                }
-                expression.append(value).append("'");
-                if (first) {
-                    first = false;
+            expression.append("JMSCorrelationID='");
+            if (correlationIds.isEmpty()) {
+                // no id's so use a dummy to select nothing
+                expression.append("CamelDummyJmsMessageSelector'");
+            } else {
+                boolean first = true;
+                for (String value : correlationIds) {
+                    if (!first) {
+                        expression.append(" OR JMSCorrelationID='");
+                    }
+                    expression.append(value).append("'");
+                    if (first) {
+                        first = false;
+                    }
                 }
             }
-        }
 
-        String answer = expression.toString();
+            String answer = expression.toString();
 
-        dirty = false;
-        return answer;
+            dirty = false;
+            return answer;
+        } finally {
+            lock.unlock();
+        }
     }
 
     // Changes to live correlation-ids invalidate existing message selector
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
index 662f40fa6cc..c92ba21e516 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
@@ -73,12 +73,15 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
         @Override
         public Destination createDestination(Session session, String 
destinationName, boolean topic) throws JMSException {
-            synchronized (QueueReplyManager.this) {
+            QueueReplyManager.this.lock.lock();
+            try {
                 // resolve the reply to destination
                 if (destination == null) {
                     destination = delegate.createDestination(session, 
destinationName, topic);
                     setReplyTo(destination);
                 }
+            } finally {
+                QueueReplyManager.this.lock.unlock();
             }
             return destination;
         }
diff --git 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
index 4f13a2d5ff3..c9119d3b4b0 100644
--- 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
+++ 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
@@ -21,6 +21,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.splunk.HttpService;
 import com.splunk.SSLSecurityProtocol;
@@ -45,6 +47,7 @@ public class SplunkConnectionFactory {
     private boolean useSunHttpsHandler;
     private SSLSecurityProtocol sslProtocol;
     private boolean validateCertificates;
+    private final Lock lock = new ReentrantLock();
 
     public SplunkConnectionFactory(final String host, final int port, final 
String username, final String password) {
         this.host = host;
@@ -113,81 +116,86 @@ public class SplunkConnectionFactory {
         this.token = token;
     }
 
-    public synchronized Service createService(CamelContext camelContext) {
-        final ServiceArgs args = new ServiceArgs();
-        if (host != null) {
-            args.setHost(host);
-        }
-        if (port > 0) {
-            args.setPort(port);
-        }
-        if (scheme != null) {
-            args.setScheme(scheme);
-        }
-        if (app != null) {
-            args.setApp(app);
-        }
-        if (owner != null) {
-            args.setOwner(owner);
-        }
-        if (username != null) {
-            args.setUsername(username);
-        }
-        if (password != null && token == null) {
-            args.setPassword(password);
-        }
-        if (token != null) {
-            args.setToken(String.format("Bearer %s", token));
-            args.remove("username");
-            args.remove("password");
-        }
-        // useful in cases where you want to bypass app. servers https handling
-        // (wls i'm looking at you)
-        if (isUseSunHttpsHandler()) {
-            String sunHandlerClassName = "sun.net.www.protocol.https.Handler";
-            Class<URLStreamHandler> clazz
-                    = 
camelContext.getClassResolver().resolveClass(sunHandlerClassName, 
URLStreamHandler.class);
-            if (clazz != null) {
-                URLStreamHandler handler = 
camelContext.getInjector().newInstance(clazz);
-                args.setHTTPSHandler(handler);
-                LOG.debug("using the URLStreamHandler {} for {}", handler, 
args);
-            } else {
-                LOG.warn("could not resolve and use the URLStreamHandler class 
'{}'", sunHandlerClassName);
+    public Service createService(CamelContext camelContext) {
+        lock.lock();
+        try {
+            final ServiceArgs args = new ServiceArgs();
+            if (host != null) {
+                args.setHost(host);
             }
-        }
-
-        ExecutorService executor
-                = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
"DefaultSplunkConnectionFactory");
-
-        Future<Service> future = executor.submit(new Callable<Service>() {
-            public Service call() throws Exception {
-                if (Service.DEFAULT_SCHEME.equals(getScheme())) {
-                    LOG.debug("Https in use. Setting SSL protocol to {} and 
sertificate validation to %s", getSslProtocol(),
-                            isValidateCertificates());
-                    
HttpService.setValidateCertificates(isValidateCertificates());
-                    HttpService.setSslSecurityProtocol(getSslProtocol());
+            if (port > 0) {
+                args.setPort(port);
+            }
+            if (scheme != null) {
+                args.setScheme(scheme);
+            }
+            if (app != null) {
+                args.setApp(app);
+            }
+            if (owner != null) {
+                args.setOwner(owner);
+            }
+            if (username != null) {
+                args.setUsername(username);
+            }
+            if (password != null && token == null) {
+                args.setPassword(password);
+            }
+            if (token != null) {
+                args.setToken(String.format("Bearer %s", token));
+                args.remove("username");
+                args.remove("password");
+            }
+            // useful in cases where you want to bypass app. servers https 
handling
+            // (wls i'm looking at you)
+            if (isUseSunHttpsHandler()) {
+                String sunHandlerClassName = 
"sun.net.www.protocol.https.Handler";
+                Class<URLStreamHandler> clazz
+                        = 
camelContext.getClassResolver().resolveClass(sunHandlerClassName, 
URLStreamHandler.class);
+                if (clazz != null) {
+                    URLStreamHandler handler = 
camelContext.getInjector().newInstance(clazz);
+                    args.setHTTPSHandler(handler);
+                    LOG.debug("using the URLStreamHandler {} for {}", handler, 
args);
+                } else {
+                    LOG.warn("could not resolve and use the URLStreamHandler 
class '{}'", sunHandlerClassName);
                 }
-                return Service.connect(args);
             }
-        });
-        try {
-            Service service = null;
-            if (connectionTimeout > 0) {
-                service = future.get(connectionTimeout, TimeUnit.MILLISECONDS);
-            } else {
-                service = future.get();
+
+            ExecutorService executor
+                    = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
"DefaultSplunkConnectionFactory");
+
+            Future<Service> future = executor.submit(new Callable<Service>() {
+                public Service call() throws Exception {
+                    if (Service.DEFAULT_SCHEME.equals(getScheme())) {
+                        LOG.debug("Https in use. Setting SSL protocol to {} 
and sertificate validation to %s", getSslProtocol(),
+                                isValidateCertificates());
+                        
HttpService.setValidateCertificates(isValidateCertificates());
+                        HttpService.setSslSecurityProtocol(getSslProtocol());
+                    }
+                    return Service.connect(args);
+                }
+            });
+            try {
+                Service service = null;
+                if (connectionTimeout > 0) {
+                    service = future.get(connectionTimeout, 
TimeUnit.MILLISECONDS);
+                } else {
+                    service = future.get();
+                }
+                LOG.info("Successfully connected to Splunk");
+                return service;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(
+                        String.format("could not connect to Splunk Server @ 
%s:%d - %s", host, port, e.getMessage()), e);
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        String.format("could not connect to Splunk Server @ 
%s:%d - %s", host, port, e.getMessage()), e);
+            } finally {
+                camelContext.getExecutorServiceManager().shutdownNow(executor);
             }
-            LOG.info("Successfully connected to Splunk");
-            return service;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(
-                    String.format("could not connect to Splunk Server @ %s:%d 
- %s", host, port, e.getMessage()), e);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    String.format("could not connect to Splunk Server @ %s:%d 
- %s", host, port, e.getMessage()), e);
         } finally {
-            camelContext.getExecutorServiceManager().shutdownNow(executor);
+            lock.unlock();
         }
     }
 }
diff --git 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
index ec90916e8b1..1fe381a9138 100644
--- 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
+++ 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
@@ -130,14 +130,19 @@ public class SplunkEndpoint extends ScheduledPollEndpoint 
implements EndpointSer
         return configuration;
     }
 
-    public synchronized boolean reset(Exception e) {
-        boolean answer = false;
-        if (e instanceof RuntimeException && e.getCause() instanceof 
ConnectException
-                || e instanceof SocketException || e instanceof SSLException) {
-            LOG.warn("Got exception from Splunk. Service will be reset.");
-            this.service = null;
-            answer = true;
+    public boolean reset(Exception e) {
+        lock.lock();
+        try {
+            boolean answer = false;
+            if (e instanceof RuntimeException && e.getCause() instanceof 
ConnectException
+                    || e instanceof SocketException || e instanceof 
SSLException) {
+                LOG.warn("Got exception from Splunk. Service will be reset.");
+                this.service = null;
+                answer = true;
+            }
+            return answer;
+        } finally {
+            lock.unlock();
         }
-        return answer;
     }
 }
diff --git 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
index f131fa14d6d..36b8d28aaf2 100644
--- 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
+++ 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
@@ -22,6 +22,8 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.splunk.Args;
 import com.splunk.Service;
@@ -37,6 +39,7 @@ public abstract class SplunkDataWriter implements DataWriter {
     protected Args args;
     private boolean connected;
     private Socket socket;
+    protected final Lock lock = new ReentrantLock();
 
     public SplunkDataWriter(SplunkEndpoint endpoint, Args args) {
         this.endpoint = endpoint;
@@ -55,27 +58,36 @@ public abstract class SplunkDataWriter implements 
DataWriter {
         doWrite(event + SplunkEvent.LINEBREAK);
     }
 
-    protected synchronized void doWrite(String event) throws IOException {
-        LOG.debug("writing event to splunk:{}", event);
-        OutputStream ostream = socket.getOutputStream();
-        Writer writer = new OutputStreamWriter(ostream, 
StandardCharsets.UTF_8);
-        writer.write(event);
-        writer.flush();
+    protected void doWrite(String event) throws IOException {
+        lock.lock();
+        try {
+            LOG.debug("writing event to splunk:{}", event);
+            OutputStream ostream = socket.getOutputStream();
+            Writer writer = new OutputStreamWriter(ostream, 
StandardCharsets.UTF_8);
+            writer.write(event);
+            writer.flush();
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
-    public synchronized void start() {
+    public void start() {
+        lock.lock();
         try {
             socket = createSocket(endpoint.getService());
             connected = true;
         } catch (Exception e) {
             connected = false;
             throw new RuntimeException(e);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized void stop() {
+    public void stop() {
+        lock.lock();
         try {
             if (socket != null) {
                 socket.close();
@@ -83,6 +95,8 @@ public abstract class SplunkDataWriter implements DataWriter {
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
index f9587b080c1..1d773ed8400 100644
--- 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
+++ 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
@@ -33,13 +33,18 @@ public class SubmitDataWriter extends SplunkDataWriter {
     }
 
     @Override
-    protected synchronized void doWrite(String event) throws IOException {
-        Index index = getIndex();
-        if (index != null) {
-            index.submit(args, event);
-        } else {
-            Receiver receiver = endpoint.getService().getReceiver();
-            receiver.submit(args, event);
+    protected void doWrite(String event) throws IOException {
+        lock.lock();
+        try {
+            Index index = getIndex();
+            if (index != null) {
+                index.submit(args, event);
+            } else {
+                Receiver receiver = endpoint.getService().getReceiver();
+                receiver.submit(args, event);
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
index 2dda0bd9ec2..b16969f2b8e 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.springrabbit;
 
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.rabbitmq.client.Channel;
 import org.apache.camel.AsyncCallback;
@@ -47,6 +49,7 @@ public class EndpointMessageListener implements 
ChannelAwareMessageListener {
     private RabbitTemplate template;
     private boolean disableReplyTo;
     private boolean async;
+    private final Lock lock = new ReentrantLock();
 
     public EndpointMessageListener(SpringRabbitMQConsumer consumer, 
SpringRabbitMQEndpoint endpoint, Processor processor) {
         this.consumer = consumer;
@@ -76,11 +79,16 @@ public class EndpointMessageListener implements 
ChannelAwareMessageListener {
         this.disableReplyTo = disableReplyTo;
     }
 
-    public synchronized RabbitTemplate getTemplate() {
-        if (template == null) {
-            template = endpoint.createInOnlyTemplate();
+    public RabbitTemplate getTemplate() {
+        lock.lock();
+        try {
+            if (template == null) {
+                template = endpoint.createInOnlyTemplate();
+            }
+            return template;
+        } finally {
+            lock.unlock();
         }
-        return template;
     }
 
     public void setTemplate(RabbitTemplate template) {
diff --git 
a/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
 
b/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
index 52a53292e00..59548292cec 100644
--- 
a/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
+++ 
b/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.spring.security;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import javax.security.auth.Subject;
 
 import org.apache.camel.CamelAuthorizationException;
@@ -48,10 +51,11 @@ public class SpringSecurityAuthorizationPolicy extends 
IdentifiedType
     private static final Logger LOG = 
LoggerFactory.getLogger(SpringSecurityAuthorizationPolicy.class);
     private AuthorizationManager<Exchange> authorizationManager;
     private AuthenticationManager authenticationManager;
-    private AuthenticationAdapter authenticationAdapter;
+    private volatile AuthenticationAdapter authenticationAdapter;
     private ApplicationEventPublisher eventPublisher;
     private boolean alwaysReauthenticate;
     private boolean useThreadSecurityContext = true;
+    private final Lock lock = new ReentrantLock();
 
     @Override
     public void beforeWrap(Route route, NamedNode definition) {
@@ -145,16 +149,20 @@ public class SpringSecurityAuthorizationPolicy extends 
IdentifiedType
     }
 
     public AuthenticationAdapter getAuthenticationAdapter() {
-        if (authenticationAdapter == null) {
-            synchronized (this) {
-                if (authenticationAdapter != null) {
-                    return authenticationAdapter;
-                } else {
-                    authenticationAdapter = new DefaultAuthenticationAdapter();
+        AuthenticationAdapter adapter = authenticationAdapter;
+        if (adapter == null) {
+            lock.lock();
+            try {
+                adapter = authenticationAdapter;
+                if (adapter == null) {
+                    adapter = new DefaultAuthenticationAdapter();
+                    authenticationAdapter = adapter;
                 }
+            } finally {
+                lock.unlock();
             }
         }
-        return authenticationAdapter;
+        return adapter;
     }
 
     public void setAuthenticationAdapter(AuthenticationAdapter adapter) {
diff --git 
a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
 
b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
index 7ad8f5b78ec..0c210e330b4 100644
--- 
a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
+++ 
b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
@@ -21,6 +21,8 @@ import java.net.HttpURLConnection;
 import java.net.URI;
 import java.security.GeneralSecurityException;
 import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -213,6 +215,7 @@ public class SpringWebserviceProducer extends 
DefaultProducer {
         private final AbstractHttpWebServiceMessageSender delegate;
         private final SpringWebserviceConfiguration configuration;
         private final CamelContext camelContext;
+        private final Lock lock = new ReentrantLock();
 
         private SSLContext sslContext;
 
@@ -235,14 +238,15 @@ public class SpringWebserviceProducer extends 
DefaultProducer {
                 }
 
                 if (configuration.getSslContextParameters() != null && 
connection instanceof HttpsURLConnection) {
+                    lock.lock();
                     try {
-                        synchronized (this) {
-                            if (sslContext == null) {
-                                sslContext = 
configuration.getSslContextParameters().createSSLContext(camelContext);
-                            }
+                        if (sslContext == null) {
+                            sslContext = 
configuration.getSslContextParameters().createSSLContext(camelContext);
                         }
                     } catch (GeneralSecurityException e) {
                         throw new RuntimeCamelException("Error creating 
SSLContext based on SSLContextParameters.", e);
+                    } finally {
+                        lock.unlock();
                     }
 
                     ((HttpsURLConnection) 
connection).setSSLSocketFactory(sslContext.getSocketFactory());
diff --git 
a/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
 
b/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
index 1e8ef97b1d6..667035bf135 100644
--- 
a/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
+++ 
b/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
@@ -121,14 +121,24 @@ public class EventEndpoint extends DefaultEndpoint 
implements ApplicationContext
 
     // Implementation methods
     // 
-------------------------------------------------------------------------
-    public synchronized void consumerStarted(EventConsumer consumer) {
-        getComponent().consumerStarted(this);
-        getLoadBalancer().addProcessor(consumer.getAsyncProcessor());
+    public void consumerStarted(EventConsumer consumer) {
+        lock.lock();
+        try {
+            getComponent().consumerStarted(this);
+            getLoadBalancer().addProcessor(consumer.getAsyncProcessor());
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized void consumerStopped(EventConsumer consumer) {
-        getComponent().consumerStopped(this);
-        getLoadBalancer().removeProcessor(consumer.getAsyncProcessor());
+    public void consumerStopped(EventConsumer consumer) {
+        lock.lock();
+        try {
+            getComponent().consumerStopped(this);
+            getLoadBalancer().removeProcessor(consumer.getAsyncProcessor());
+        } finally {
+            lock.unlock();
+        }
     }
 
     protected LoadBalancer createLoadBalancer() {
diff --git 
a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
 
b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
index 6d5199bcb17..a45914ddc73 100644
--- 
a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
+++ 
b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
@@ -191,29 +191,34 @@ public class TransactionErrorHandlerReifier extends 
ErrorHandlerReifier<SpringTr
         return answer;
     }
 
-    protected synchronized ScheduledExecutorService getExecutorService(
+    protected ScheduledExecutorService getExecutorService(
             ScheduledExecutorService executorService, String 
executorServiceRef) {
-        if (executorService == null || executorService.isShutdown()) {
-            // camel context will shutdown the executor when it shutdown so no
-            // need to shut it down when stopping
-            if (executorServiceRef != null) {
-                executorService = lookupByNameAndType(executorServiceRef, 
ScheduledExecutorService.class);
-                if (executorService == null) {
-                    ExecutorServiceManager manager = 
camelContext.getExecutorServiceManager();
-                    ThreadPoolProfile profile = 
manager.getThreadPoolProfile(executorServiceRef);
-                    executorService = manager.newScheduledThreadPool(this, 
executorServiceRef, profile);
+        lock.lock();
+        try {
+            if (executorService == null || executorService.isShutdown()) {
+                // camel context will shutdown the executor when it shutdown 
so no
+                // need to shut it down when stopping
+                if (executorServiceRef != null) {
+                    executorService = lookupByNameAndType(executorServiceRef, 
ScheduledExecutorService.class);
+                    if (executorService == null) {
+                        ExecutorServiceManager manager = 
camelContext.getExecutorServiceManager();
+                        ThreadPoolProfile profile = 
manager.getThreadPoolProfile(executorServiceRef);
+                        executorService = manager.newScheduledThreadPool(this, 
executorServiceRef, profile);
+                    }
+                    if (executorService == null) {
+                        throw new IllegalArgumentException("ExecutorService " 
+ executorServiceRef + " not found in registry.");
+                    }
+                } else {
+                    // no explicit configured thread pool, so leave it up to 
the
+                    // error handler to decide if it need a default thread 
pool from
+                    // CamelContext#getErrorHandlerExecutorService
+                    executorService = null;
                 }
-                if (executorService == null) {
-                    throw new IllegalArgumentException("ExecutorService " + 
executorServiceRef + " not found in registry.");
-                }
-            } else {
-                // no explicit configured thread pool, so leave it up to the
-                // error handler to decide if it need a default thread pool 
from
-                // CamelContext#getErrorHandlerExecutorService
-                executorService = null;
             }
+            return executorService;
+        } finally {
+            lock.unlock();
         }
-        return executorService;
     }
 
 }
diff --git 
a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
 
b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
index 40f265349b4..f9b785d8c78 100644
--- 
a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
+++ 
b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
@@ -101,16 +101,17 @@ public class StAXJAXBIteratorExpression<T> extends 
ExpressionAdapter {
     }
 
     private static JAXBContext jaxbContext(Class<?> handled) throws 
JAXBException {
-        if (JAX_CONTEXTS.containsKey(handled)) {
-            return JAX_CONTEXTS.get(handled);
-        }
-
-        JAXBContext context;
-        synchronized (JAX_CONTEXTS) {
-            context = JAXBContext.newInstance(handled);
-            JAX_CONTEXTS.put(handled, context);
+        try {
+            return JAX_CONTEXTS.computeIfAbsent(handled, k -> {
+                try {
+                    return JAXBContext.newInstance(handled);
+                } catch (JAXBException e) {
+                    throw new RuntimeCamelException(e);
+                }
+            });
+        } catch (RuntimeCamelException e) {
+            throw (JAXBException) e.getCause();
         }
-        return context;
     }
 
     @Override
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index 3ab6db20563..8739b8ec5a2 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -335,43 +335,53 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
     /**
      * Strategy method for processing the line
      */
-    protected synchronized long processLine(String line, boolean last, long 
index) throws Exception {
-        if (endpoint.getGroupLines() > 0) {
-            // remember line
-            if (line != null) {
-                lines.add(line);
-            }
+    protected long processLine(String line, boolean last, long index) throws 
Exception {
+        lock.lock();
+        try {
+            if (endpoint.getGroupLines() > 0) {
+                // remember line
+                if (line != null) {
+                    lines.add(line);
+                }
 
-            // should we flush lines?
-            if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines() 
|| last)) {
-                // spit out lines as we hit the size, or it was the last
-                List<String> copy = new ArrayList<>(lines);
-                Object body = endpoint.getGroupStrategy().groupLines(copy);
-                // remember to inc index when we create an exchange
-                Exchange exchange = createExchange(body, index++, last);
+                // should we flush lines?
+                if (!lines.isEmpty() && (lines.size() >= 
endpoint.getGroupLines() || last)) {
+                    // spit out lines as we hit the size, or it was the last
+                    List<String> copy = new ArrayList<>(lines);
+                    Object body = endpoint.getGroupStrategy().groupLines(copy);
+                    // remember to inc index when we create an exchange
+                    Exchange exchange = createExchange(body, index++, last);
 
-                // clear lines
-                lines.clear();
+                    // clear lines
+                    lines.clear();
 
+                    getProcessor().process(exchange);
+                }
+            } else if (line != null) {
+                // single line
+                // remember to inc index when we create an exchange
+                Exchange exchange = createExchange(line, index++, last);
                 getProcessor().process(exchange);
             }
-        } else if (line != null) {
-            // single line
-            // remember to inc index when we create an exchange
-            Exchange exchange = createExchange(line, index++, last);
-            getProcessor().process(exchange);
-        }
 
-        return index;
+            return index;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
      * Strategy method for processing the data
      */
-    protected synchronized long processRaw(byte[] body, long index) throws 
Exception {
-        Exchange exchange = createExchange(body, index++, true);
-        getProcessor().process(exchange);
-        return index;
+    protected long processRaw(byte[] body, long index) throws Exception {
+        lock.lock();
+        try {
+            Exchange exchange = createExchange(body, index++, true);
+            getProcessor().process(exchange);
+            return index;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index 77b0e7d5298..9cc2328511b 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -69,14 +69,16 @@ public class StreamProducer extends DefaultAsyncProducer {
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
             delay(endpoint.getDelay());
-
-            synchronized (this) {
+            lock.lock();
+            try {
                 try {
                     openStream(exchange);
                     writeToStream(outputStream, exchange);
                 } finally {
                     closeStream(exchange, false);
                 }
+            } finally {
+                lock.unlock();
             }
         } catch (InterruptedException e) {
             exchange.setException(e);

Reply via email to