This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c32244de7bb Camel-20918: fix exception handling for Salesforce 
Streaming API (#14778)
c32244de7bb is described below

commit c32244de7bbc0d4ef86496d4f842dde99d02fa12
Author: Bartosz Popiela <bartosz...@gmail.com>
AuthorDate: Mon Jul 22 12:24:59 2024 +0200

    Camel-20918: fix exception handling for Salesforce Streaming API (#14778)
    
    * CAMEL-20918: set MAX_NETWORK_DELAY_OPTION to be 120 sec as per 
https://github.com/cometd/cometd/issues/1142#issuecomment-1048256297 and 
https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_timeouts.htm
    
    * CAMEL-20918: fix exception handling for Salesforce Streaming API:
    - execute subscriptionListener in a separate thread similarly to 
handshakeListener and connectListener,
    - fix SubscriptionHelper#listenerMap remaining empty when an exception 
occurs while subscribing to a channel,
    - remove SubscriptionHelper#handshake as there is always a thread hanging 
in this method as per https://issues.apache.org/jira/browse/CAMEL-20388,
    - remove the call to closeChannel for topic listeners as they will be 
removed on SubscriptionHelper#unsubscribe
    
    * CAMEL-20918: fix a failing build
    
    * CAMEL-20918: set MAX_NETWORK_DELAY_OPTION as first to allow overwriting
    
    * CAMEL-20918: update replayId per channel when explicitly subscribed and 
not after rehandshake
    
    * CAMEL-20918: fix replay extension not to overwrite ReplayId per channel 
if already set
    
    * CAMEL-20918: fix formatting
    
    * CAMEL-20918: remove unnecessary log
---
 .../component/salesforce/StreamingApiConsumer.java |   9 +-
 .../internal/streaming/ReplayExtension.java        |   4 +-
 .../internal/streaming/SubscriptionHelper.java     | 521 ++++++++-------------
 .../internal/streaming/ReplayExtensionTest.java    |   4 +-
 .../salesforce/internal/streaming/StubServer.java  |   4 +-
 .../streaming/SubscriptionHelperManualIT.java      | 512 +++++++++++---------
 .../internal/streaming/SubscriptionHelperTest.java |  21 +
 7 files changed, 526 insertions(+), 549 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
index 02f63c61a25..4dcfd100a6e 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
@@ -292,10 +292,15 @@ public class StreamingApiConsumer extends DefaultConsumer 
{
 
         // subscribe to topic
         ServiceHelper.startService(subscriptionHelper);
-        subscriptionHelper.subscribe(topicName, this);
+        subscriptionHelper.subscribe(this);
         subscribed = true;
     }
 
+    @Override
+    public SalesforceEndpoint getEndpoint() {
+        return this.endpoint;
+    }
+
     /**
      * Stops this consumer.
      *
@@ -324,7 +329,7 @@ public class StreamingApiConsumer extends DefaultConsumer {
         if (subscribed) {
             subscribed = false;
             // unsubscribe from topic
-            subscriptionHelper.unsubscribe(topicName, this);
+            subscriptionHelper.unsubscribe(this);
         }
     }
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index 287d4400da5..0d2d32f012e 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -48,8 +48,8 @@ public class ReplayExtension implements Extension {
     private final ConcurrentMap<String, Long> dataMap = new 
ConcurrentHashMap<>();
     private final AtomicBoolean supported = new AtomicBoolean();
 
-    public void addChannelReplayId(final String channelName, final long 
replayId) {
-        dataMap.put(channelName, replayId);
+    public void setReplayIdIfAbsent(final String channelName, final long 
replayId) {
+        dataMap.putIfAbsent(channelName, replayId);
     }
 
     @Override
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 4cf48067b69..0ffda9096fe 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -21,14 +21,22 @@ import java.net.CookieManager;
 import java.net.CookiePolicy;
 import java.net.CookieStore;
 import java.net.URI;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 import org.apache.camel.CamelException;
-import org.apache.camel.component.salesforce.*;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.StreamingApiConsumer;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
@@ -46,9 +54,12 @@ import org.eclipse.jetty.http.HttpHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Collections.emptySet;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.cometd.bayeux.Channel.*;
+import static org.cometd.bayeux.Channel.META_CONNECT;
+import static org.cometd.bayeux.Channel.META_HANDSHAKE;
+import static org.cometd.bayeux.Channel.META_SUBSCRIBE;
 import static org.cometd.bayeux.Message.ERROR_FIELD;
 import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
@@ -58,13 +69,13 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionHelper.class);
 
-    private static final int CONNECT_TIMEOUT = 110;
+    private static final int HANDSHAKE_TIMEOUT_SEC = 120;
 
     private static final String FAILURE_FIELD = "failure";
     private static final String EXCEPTION_FIELD = "exception";
     private static final String SFDC_FIELD = "sfdc";
     private static final String FAILURE_REASON_FIELD = "failureReason";
-    private static final int DISCONNECT_INTERVAL = 5000;
+
     private static final String SERVER_TOO_BUSY_ERROR = "503::";
     private static final String AUTHENTICATION_INVALID = "401::Authentication 
invalid";
     private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId 
\\{.*} you provided was invalid.*";
@@ -73,32 +84,166 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private final SalesforceComponent component;
     private SalesforceSession session;
-    private final long timeout = 60 * 1000L;
 
-    private final Map<StreamingApiConsumer, 
ClientSessionChannel.MessageListener> listenerMap;
     private final long maxBackoff;
     private final long backoffIncrement;
-
-    private ClientSessionChannel.MessageListener handshakeListener;
-    private ClientSessionChannel.MessageListener connectListener;
-
     private volatile String handshakeError;
     private volatile Exception handshakeException;
     private volatile String connectError;
     private volatile Exception connectException;
 
-    private volatile boolean reconnecting;
     private final AtomicLong handshakeBackoff;
-    private final AtomicBoolean handshaking = new AtomicBoolean();
+
+    private final Map<String, Set<StreamingApiConsumer>> channelToConsumers = 
new ConcurrentHashMap<>();
+
+    private final Map<StreamingApiConsumer, 
ClientSessionChannel.MessageListener> consumerToListener
+            = new ConcurrentHashMap<>();
+
+    private final Set<String> channelsToSubscribe = 
ConcurrentHashMap.newKeySet();
+
+    private final ClientSessionChannel.MessageListener handshakeListener = 
createHandshakeListener();
+
+    private final ClientSessionChannel.MessageListener subscriptionListener = 
createSubscriptionListener();
+
+    private final ClientSessionChannel.MessageListener connectListener = 
createConnectionListener();
 
     public SubscriptionHelper(final SalesforceComponent component) {
         this.component = component;
-        listenerMap = new ConcurrentHashMap<>();
         handshakeBackoff = new AtomicLong();
         backoffIncrement = component.getConfig().getBackoffIncrement();
         maxBackoff = component.getConfig().getMaxBackoff();
     }
 
+    private MessageListener createHandshakeListener() {
+        return (channel, message) -> 
component.getHttpClient().getWorkerPool().execute(() -> {
+            LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
+
+            if (!message.isSuccessful()) {
+                LOG.warn("Handshake failure: {}", message);
+                handshakeError = (String) message.get(ERROR_FIELD);
+                handshakeException = getFailure(message);
+                if (handshakeError != null) {
+                    if (handshakeError.startsWith("403::")) {
+                        String failureReason = getFailureReason(message);
+                        if (failureReason.equals(AUTHENTICATION_INVALID)) {
+                            LOG.debug(
+                                    "attempting login due to handshake error: 
403 -> 401::Authentication invalid");
+                            
session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
+                        }
+                    }
+                }
+                // failed, so keep trying
+                LOG.debug("Handshake failed, so try again.");
+                client.handshake();
+            } else if (!channelToConsumers.isEmpty()) {
+                channelsToSubscribe.clear();
+                channelsToSubscribe.addAll(channelToConsumers.keySet());
+                LOG.info("Handshake successful. Channels to subscribe: {}", 
channelsToSubscribe);
+            }
+        });
+    }
+
+    private MessageListener createConnectionListener() {
+        return (channel, message) -> 
component.getHttpClient().getWorkerPool().execute(() -> {
+            LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
+
+            if (!message.isSuccessful()) {
+                LOG.warn("Connect failure: {}", message);
+                connectError = (String) message.get(ERROR_FIELD);
+                connectException = getFailure(message);
+
+                if (connectError != null && 
connectError.equals(AUTHENTICATION_INVALID)) {
+                    LOG.debug("connectError: {}", connectError);
+                    LOG.debug("Attempting login...");
+                    session.attemptLoginUntilSuccessful(backoffIncrement, 
maxBackoff);
+                }
+                if (message.getAdvice() == null || 
"none".equals(message.getAdvice().get("reconnect"))) {
+                    LOG.debug("Advice == none, so handshaking");
+                    client.handshake();
+                }
+            } else if (!channelsToSubscribe.isEmpty()) {
+                LOG.info("Subscribing to channels: {}", channelsToSubscribe);
+                for (var channelName : channelsToSubscribe) {
+                    for (var consumer : channelToConsumers.get(channelName)) {
+                        subscribe(consumer);
+                    }
+                }
+            }
+        });
+    }
+
+    private MessageListener createSubscriptionListener() {
+        return (channel, message) -> 
component.getHttpClient().getWorkerPool().execute(() -> {
+            LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
+            var channelName = message.getOrDefault(SUBSCRIPTION_FIELD, 
"").toString();
+            if (!message.isSuccessful()) {
+                LOG.warn("Subscription failure: {}", message);
+                var consumers = channelToConsumers.getOrDefault(channelName, 
emptySet());
+                consumers.stream().findFirst().ifPresent(salesforceConsumer -> 
subscriptionFailed(salesforceConsumer, message));
+            } else {
+                // remember subscription
+                LOG.info("Subscribed to channel {}", channelName);
+                channelsToSubscribe.remove(channelName);
+
+                // reset backoff interval
+                handshakeBackoff.set(0);
+            }
+        });
+    }
+
+    private void subscriptionFailed(StreamingApiConsumer firstConsumer, 
Message message) {
+        var channelName = message.getOrDefault(SUBSCRIPTION_FIELD, 
"").toString();
+        var consumers = channelToConsumers.getOrDefault(channelName, 
emptySet());
+
+        String error = (String) message.get(ERROR_FIELD);
+        if (error == null) {
+            error = "Missing error message";
+        }
+
+        Exception failure = getFailure(message);
+        String msg = String.format("Error subscribing to %s: %s", 
firstConsumer.getTopicName(),
+                failure != null ? failure.getMessage() : error);
+        boolean abort = true;
+
+        LOG.warn(msg);
+        if (isTemporaryError(message)) {
+
+            // retry after delay
+            final long backoff = handshakeBackoff.getAndAdd(backoffIncrement);
+            if (backoff > maxBackoff) {
+                LOG.error("Subscribe aborted after exceeding {} msecs 
backoff", maxBackoff);
+            } else {
+                abort = false;
+
+                try {
+                    LOG.debug("Pausing for {} msecs before subscribe attempt", 
backoff);
+                    Thread.sleep(backoff);
+                    for (var consumer : consumers) {
+                        subscribe(consumer);
+                    }
+                } catch (InterruptedException e) {
+                    LOG.warn("Aborting subscribe on interrupt!", e);
+                }
+            }
+        } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
+            abort = false;
+            long fallBackReplayId
+                    = ((SalesforceEndpoint) 
firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId();
+            LOG.warn(error);
+            LOG.warn("Falling back to replayId {} for channel {}", 
fallBackReplayId, channelName);
+            REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, 
fallBackReplayId);
+            for (var consumer : consumers) {
+                subscribe(consumer);
+            }
+        }
+
+        if (abort && client != null) {
+            for (var consumer : consumers) {
+                consumer.handleException(msg, new SalesforceException(msg, 
failure));
+            }
+        }
+    }
+
     @Override
     protected void doStart() throws Exception {
         session = component.getSession();
@@ -111,97 +256,20 @@ public class SubscriptionHelper extends ServiceSupport {
         client = createClient(component, session);
 
         initMessageListeners();
-        connect();
+        handshake();
     }
 
     private void initMessageListeners() {
-        // listener for handshake error or exception
-        if (handshakeListener == null) {
-            // first start
-            handshakeListener = new ClientSessionChannel.MessageListener() {
-                public void onMessage(ClientSessionChannel channel, Message 
message) {
-                    component.getHttpClient().getWorkerPool().execute(() -> {
-                        LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
-
-                        if (!message.isSuccessful()) {
-                            LOG.warn("Handshake failure: {}", message);
-                            handshakeError = (String) message.get(ERROR_FIELD);
-                            handshakeException = getFailure(message);
-                            if (handshakeError != null) {
-                                if (handshakeError.startsWith("403::")) {
-                                    String failureReason = 
getFailureReason(message);
-                                    if 
(failureReason.equals(AUTHENTICATION_INVALID)) {
-                                        LOG.debug(
-                                                "attempting login due to 
handshake error: 403 -> 401::Authentication invalid");
-                                        
session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
-                                    }
-                                }
-                            }
-
-                            // failed, so keep trying
-                            LOG.debug("Handshake failed, so try again.");
-                            handshake();
-
-                        } else if (!listenerMap.isEmpty()) {
-                            reconnecting = true;
-                        }
-                    });
-                }
-            };
-        }
         client.getChannel(META_HANDSHAKE).addListener(handshakeListener);
-
-        // listener for connect error
-        if (connectListener == null) {
-            connectListener = new ClientSessionChannel.MessageListener() {
-                public void onMessage(ClientSessionChannel channel, Message 
message) {
-                    component.getHttpClient().getWorkerPool().execute(() -> {
-                        LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
-
-                        if (!message.isSuccessful()) {
-
-                            LOG.warn("Connect failure: {}", message);
-                            connectError = (String) message.get(ERROR_FIELD);
-                            connectException = getFailure(message);
-                            client.disconnect();
-
-                            if (connectError != null && 
connectError.equals(AUTHENTICATION_INVALID)) {
-                                LOG.debug("connectError: {}", connectError);
-                                LOG.debug("Attempting login...");
-                                
session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
-                            }
-                            // Server says don't retry to connect, so we'll 
handshake instead
-                            // Otherwise, Bayeux client automatically 
re-attempts connection
-                            if (message.getAdvice() != null &&
-                                    
!message.getAdvice().get("reconnect").equals("retry")) {
-                                LOG.debug("Advice != retry, so handshaking");
-                                handshake();
-                            }
-                        } else if (reconnecting) {
-                            LOG.debug("Refreshing subscriptions to {} channels 
on reconnect", listenerMap.size());
-                            // reconnected to Salesforce, subscribe to existing
-                            // channels
-                            final Map<StreamingApiConsumer, MessageListener> 
map = new HashMap<>(listenerMap);
-                            listenerMap.clear();
-                            for (Map.Entry<StreamingApiConsumer, 
ClientSessionChannel.MessageListener> entry : map.entrySet()) {
-                                final StreamingApiConsumer consumer = 
entry.getKey();
-                                final String topicName = 
consumer.getTopicName();
-                                subscribe(topicName, consumer);
-                            }
-                            reconnecting = false;
-                        }
-                    });
-                }
-            };
-        }
+        client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
         client.getChannel(META_CONNECT).addListener(connectListener);
     }
 
-    private void connect() throws CamelException {
+    private void handshake() throws CamelException {
         // connect to Salesforce cometd endpoint
         client.handshake();
 
-        final long waitMs = MILLISECONDS.convert(CONNECT_TIMEOUT, SECONDS);
+        final long waitMs = MILLISECONDS.convert(HANDSHAKE_TIMEOUT_SEC, 
SECONDS);
         if (!client.waitFor(waitMs, BayeuxClient.State.CONNECTED)) {
             if (handshakeException != null) {
                 throw new CamelException(
@@ -214,105 +282,9 @@ public class SubscriptionHelper extends ServiceSupport {
             } else if (connectError != null) {
                 throw new CamelException(String.format("Error during CONNECT: 
%s", connectError));
             } else {
-                throw new CamelException(String.format("Handshake request 
timeout after %s seconds", CONNECT_TIMEOUT));
-            }
-        }
-    }
-
-    private void handshake() {
-        LOG.debug("Begin handshake if not already in progress.");
-        if (!handshaking.compareAndSet(false, true)) {
-            return;
-        }
-
-        LOG.debug("Continuing with handshake.");
-        try {
-            doHandshake();
-        } finally {
-            handshaking.set(false);
-        }
-    }
-
-    private void doHandshake() {
-        if (isStoppingOrStopped()) {
-            return;
-        }
-
-        LOG.info("Handshaking after unexpected disconnect from Salesforce...");
-        boolean abort = false;
-
-        // wait for disconnect
-        LOG.debug("Waiting to disconnect...");
-        while (!abort && !client.isDisconnected()) {
-            try {
-                Thread.sleep(DISCONNECT_INTERVAL);
-            } catch (InterruptedException e) {
-                LOG.error("Aborting handshake on interrupt!");
-                abort = true;
-                Thread.currentThread().interrupt();
-            }
-
-            abort = abort || isStoppingOrStopped();
-        }
-
-        if (!abort) {
-
-            // update handshake attempt backoff
-            final long backoff = handshakeBackoff.getAndAdd(backoffIncrement);
-            if (backoff > maxBackoff) {
-                LOG.error("Handshake aborted after exceeding {} msecs 
backoff", maxBackoff);
-                abort = true;
-            } else {
-
-                // pause before handshake attempt
-                LOG.debug("Pausing for {} msecs before handshake attempt", 
backoff);
-                try {
-                    Thread.sleep(backoff);
-                } catch (InterruptedException e) {
-                    LOG.error("Aborting handshake on interrupt!");
-                    abort = true;
-                    Thread.currentThread().interrupt();
-                }
-            }
-
-            if (!abort) {
-                Exception lastError = new SalesforceException("Unknown error", 
null);
-                try {
-                    // reset client. If we fail to stop and logout, catch the 
exception
-                    // so we can still continue to doStart()
-                    if (client != null) {
-                        client.disconnect();
-                        boolean disconnected = client.waitFor(timeout, 
State.DISCONNECTED);
-                        if (!disconnected) {
-                            LOG.warn("Could not disconnect client connected 
to: {} after: {} msec.", getEndpointUrl(component),
-                                    timeout);
-                            client.abort();
-                        }
-
-                        client.handshake();
-                        final long waitMs = 
MILLISECONDS.convert(CONNECT_TIMEOUT, SECONDS);
-                        client.waitFor(waitMs, BayeuxClient.State.CONNECTED);
-                    }
-                } catch (Exception e) {
-                    LOG.error("Error handshaking: {}", e.getMessage(), e);
-                    lastError = e;
-                }
-
-                if (client != null && client.isHandshook()) {
-                    LOG.debug("Successful handshake!");
-                    // reset backoff interval
-                    handshakeBackoff.set(backoffIncrement);
-                } else {
-                    LOG.error("Failed to handshake after pausing for {} 
msecs", backoff);
-                    if ((backoff + backoffIncrement) > maxBackoff) {
-                        // notify all consumers
-                        String abortMsg = "Aborting handshake attempt due to: 
" + lastError.getMessage();
-                        SalesforceException ex = new 
SalesforceException(abortMsg, lastError);
-                        for (StreamingApiConsumer consumer : 
listenerMap.keySet()) {
-                            consumer.handleException(abortMsg, ex);
-                        }
-                    }
-                }
+                throw new CamelException(
+                        String.format("Handshake request timeout after %s 
seconds",
+                                HANDSHAKE_TIMEOUT_SEC));
             }
         }
     }
@@ -333,37 +305,32 @@ public class SubscriptionHelper extends ServiceSupport {
         return exception;
     }
 
-    private void closeChannel(final String name, MessageListener listener) {
+    private void closeChannel(final String name) {
         if (client == null) {
             return;
         }
 
         final ClientSessionChannel channel = client.getChannel(name);
-        channel.removeListener(listener);
+        for (var listener : channel.getListeners()) {
+            channel.removeListener(listener);
+        }
         channel.release();
     }
 
     @Override
     protected void doStop() throws Exception {
-        closeChannel(META_CONNECT, connectListener);
-        closeChannel(META_HANDSHAKE, handshakeListener);
-
-        for (Map.Entry<StreamingApiConsumer, MessageListener> entry : 
listenerMap.entrySet()) {
-            final StreamingApiConsumer consumer = entry.getKey();
-            final String topic = consumer.getTopicName();
-
-            final MessageListener listener = entry.getValue();
-            closeChannel(getChannelName(topic), listener);
-        }
+        closeChannel(META_CONNECT);
+        closeChannel(META_SUBSCRIBE);
+        closeChannel(META_HANDSHAKE);
 
         if (client == null) {
             return;
         }
 
         client.disconnect();
-        boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);
+        boolean disconnected = client.waitFor(60_000, State.DISCONNECTED);
         if (!disconnected) {
-            LOG.warn("Could not disconnect client connected to: {} after: {} 
msec.", getEndpointUrl(component), timeout);
+            LOG.warn("Could not disconnect client connected to: {}", 
getEndpointUrl(component));
             client.abort();
         }
 
@@ -381,9 +348,13 @@ public class SubscriptionHelper extends ServiceSupport {
         final SalesforceHttpClient httpClient = 
component.getConfig().getHttpClient();
 
         Map<String, Object> options = new HashMap<>();
-        options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, 
httpClient.getTimeout());
+        /*
+        The timeout should be greater than 110 sec as per 
https://github.com/cometd/cometd/issues/1142#issuecomment-1048256297
+        and 
https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_timeouts.htm
+        */
+        options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, 120000);
         if (component.getLongPollingTransportProperties() != null) {
-            options = component.getLongPollingTransportProperties();
+            options.putAll(component.getLongPollingTransportProperties());
         }
 
         // check login access token
@@ -442,109 +413,25 @@ public class SubscriptionHelper extends ServiceSupport {
         return client;
     }
 
-    public void subscribe(final String topicName, final StreamingApiConsumer 
consumer) {
-        subscribe(topicName, consumer, false);
-    }
-
-    public void subscribe(
-            final String topicName, final StreamingApiConsumer consumer,
-            final boolean skipReplayId) {
+    public synchronized void subscribe(StreamingApiConsumer consumer) {
         // create subscription for consumer
-        final String channelName = getChannelName(topicName);
+        final String channelName = getChannelName(consumer.getTopicName());
+        channelToConsumers.computeIfAbsent(channelName, key -> 
ConcurrentHashMap.newKeySet()).add(consumer);
+        channelsToSubscribe.add(channelName);
 
-        if (!reconnecting && !skipReplayId) {
-            setupReplay((SalesforceEndpoint) consumer.getEndpoint());
-        }
+        setReplayIdIfAbsent(consumer.getEndpoint());
 
         // channel message listener
         LOG.info("Subscribing to channel {}...", channelName);
-        final ClientSessionChannel.MessageListener listener = new 
ClientSessionChannel.MessageListener() {
-
-            @Override
-            public void onMessage(ClientSessionChannel channel, Message 
message) {
-                LOG.debug("Received Message: {}", message);
-                // convert CometD message to Camel Message
-                consumer.processMessage(channel, message);
-            }
-        };
-
-        // listener for subscription
-        final ClientSessionChannel.MessageListener subscriptionListener = new 
ClientSessionChannel.MessageListener() {
-            public void onMessage(ClientSessionChannel channel, Message 
message) {
-                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
-                final String subscribedChannelName = 
message.getOrDefault(SUBSCRIPTION_FIELD, "").toString();
-                if ("".equals(subscribedChannelName)) {
-                    LOG.warn("[CHANNEL:META_SUBSCRIBE]: No field {} found. 
Skipping message: {}", SUBSCRIPTION_FIELD, message);
-                    return;
-                }
-                if (channelName.equals(subscribedChannelName)) {
-                    if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error == null) {
-                            error = "Missing error message";
-                        }
-
-                        Exception failure = getFailure(message);
-                        String msg = String.format("Error subscribing to %s: 
%s", topicName,
-                                failure != null ? failure.getMessage() : 
error);
-                        boolean abort = true;
-
-                        if (isTemporaryError(message)) {
-                            LOG.warn(msg);
-
-                            // retry after delay
-                            final long backoff = 
handshakeBackoff.getAndAdd(backoffIncrement);
-                            if (backoff > maxBackoff) {
-                                LOG.error("Subscribe aborted after exceeding 
{} msecs backoff", maxBackoff);
-                            } else {
-                                abort = false;
-
-                                try {
-                                    LOG.debug("Pausing for {} msecs before 
subscribe attempt", backoff);
-                                    Thread.sleep(backoff);
-
-                                    
component.getHttpClient().getWorkerPool().execute(() -> subscribe(topicName, 
consumer));
-                                } catch (InterruptedException e) {
-                                    LOG.warn("Aborting subscribe on 
interrupt!", e);
-                                    Thread.currentThread().interrupt();
-                                }
-                            }
-                        } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
-                            abort = false;
-                            final Long fallBackReplayId
-                                    = ((SalesforceEndpoint) 
consumer.getEndpoint()).getConfiguration().getFallBackReplayId();
-                            LOG.warn(error);
-                            LOG.warn("Falling back to replayId {} for channel 
{}", fallBackReplayId, channelName);
-                            REPLAY_EXTENSION.addChannelReplayId(channelName, 
fallBackReplayId);
-                            subscribe(topicName, consumer, true);
-                        }
-
-                        if (abort && client != null) {
-                            consumer.handleException(msg, new 
SalesforceException(msg, failure));
-                        }
-                    } else {
-                        // remember subscription
-                        LOG.info("Subscribed to channel {}", 
subscribedChannelName);
-                        listenerMap.put(consumer, listener);
-
-                        // reset backoff interval
-                        handshakeBackoff.set(0);
-                    }
-
-                    // remove this subscription listener
-                    if (client != null) {
-                        client.getChannel(META_SUBSCRIBE).removeListener(this);
-                    } else {
-                        LOG.warn("Trying to handle a subscription message but 
the client is already destroyed");
-                    }
-                }
-            }
-        };
-        client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
+        var messageListener = consumerToListener.computeIfAbsent(consumer, key 
-> (channel, message) -> {
+            LOG.debug("Received Message: {}", message);
+            // convert CometD message to Camel Message
+            consumer.processMessage(channel, message);
+        });
 
         // subscribe asynchronously
         final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
-        clientChannel.subscribe(listener);
+        clientChannel.subscribe(messageListener);
     }
 
     private static boolean isTemporaryError(Message message) {
@@ -564,7 +451,7 @@ public class SubscriptionHelper extends ServiceSupport {
         return failureReason;
     }
 
-    void setupReplay(final SalesforceEndpoint endpoint) {
+    private void setReplayIdIfAbsent(final SalesforceEndpoint endpoint) {
         final String topicName = endpoint.getTopicName();
 
         final Optional<Long> replayId = determineReplayIdFor(endpoint, 
topicName);
@@ -573,9 +460,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
             final Long replayIdValue = replayId.get();
 
-            LOG.info("Set Replay extension to replay from `{}` for channel 
`{}`", replayIdValue, channelName);
-
-            REPLAY_EXTENSION.addChannelReplayId(channelName, replayIdValue);
+            REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue);
         }
     }
 
@@ -621,20 +506,26 @@ public class SubscriptionHelper extends ServiceSupport {
         return channelName.toString();
     }
 
-    public void unsubscribe(String topicName, StreamingApiConsumer consumer) {
-
+    public synchronized void unsubscribe(StreamingApiConsumer consumer) {
         // channel name
-        final String channelName = getChannelName(topicName);
+        final String channelName = getChannelName(consumer.getTopicName());
 
         // unsubscribe from channel
-        final ClientSessionChannel.MessageListener listener = 
listenerMap.remove(consumer);
+        var consumers = channelToConsumers.get(channelName);
+        if (consumers != null) {
+            consumers.remove(consumer);
+            if (consumers.isEmpty()) {
+                channelToConsumers.remove(channelName);
+            }
+        }
+        final ClientSessionChannel.MessageListener listener = 
consumerToListener.remove(consumer);
         if (listener != null) {
-
             LOG.debug("Unsubscribing from channel {}...", channelName);
             final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
             // if there are other listeners on this channel, an unsubscribe 
message will not be sent,
             // so we're not going to listen for and expect an unsub response. 
Just unsub and move on.
             clientChannel.unsubscribe(listener);
+            clientChannel.release();
         }
     }
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
index 4acf1adb510..e6008a4d8dd 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
@@ -84,7 +84,7 @@ public class ReplayExtensionTest {
         final ReplayExtension replayExtension = new ReplayExtension();
         replayExtension.rcvMeta(null, createHandshakeMessage(true));
 
-        replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 
123L);
+        replayExtension.setReplayIdIfAbsent(pushTopicMessage.getChannel(), 
123L);
 
         replayExtension.rcv(null, pushTopicMessage);
 
@@ -100,7 +100,7 @@ public class ReplayExtensionTest {
         final ReplayExtension replayExtension = new ReplayExtension();
         replayExtension.rcvMeta(null, createHandshakeMessage(true));
 
-        replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 
123L);
+        replayExtension.setReplayIdIfAbsent(pushTopicMessage.getChannel(), 
123L);
 
         replayExtension.rcv(null, pushTopicMessage);
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
index 4429b657a90..1962cdf7b43 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
@@ -52,8 +52,8 @@ class StubServer {
     class StubHandler extends Handler.Abstract {
 
         private StubResponse stubFor(final Request request, final String body) 
throws IOException {
-            final List<StubResponse> allResponses = new 
ArrayList<>(defaultStubs);
-            allResponses.addAll(stubs);
+            final List<StubResponse> allResponses = new ArrayList<>(stubs);
+            allResponses.addAll(defaultStubs);
 
             for (final StubResponse stub : allResponses) {
                 if (stub.matches(request, body)) {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java
index 8967103566e..abbdcbacfab 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java
@@ -19,6 +19,7 @@ package 
org.apache.camel.component.salesforce.internal.streaming;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
@@ -31,6 +32,7 @@ import 
org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -40,7 +42,11 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperManualIT.MessageArgumentMatcher.messageForAccountCreationWithName;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperManualIT.MessageArgumentMatcher.messageWithName;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.atLeastOnce;
@@ -58,9 +64,7 @@ public class SubscriptionHelperManualIT {
     final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
     final SalesforceComponent salesforce;
     final StubServer server;
-    final SubscriptionHelper subscription;
-
-    StreamingApiConsumer toUnsubscribe;
+    private SubscriptionHelper subscription;
 
     static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
 
@@ -74,21 +78,24 @@ public class SubscriptionHelperManualIT {
         public boolean matches(final Message message) {
             final Map<String, Object> data = message.getDataAsMap();
             @SuppressWarnings("unchecked")
-            final Map<String, Object> event = (Map<String, Object>) 
data.get("event");
+            final Map<String, Object> event = (Map<String, Object>) data.get(
+                    "event");
             @SuppressWarnings("unchecked")
-            final Map<String, Object> sobject = (Map<String, Object>) 
data.get("sobject");
+            final Map<String, Object> sobject = (Map<String, Object>) data.get(
+                    "sobject");
             return "created".equals(event.get("type")) && 
name.equals(sobject.get("Name"));
         }
 
-        static Message messageForAccountCreationWithName(final String name) {
+        static Message messageWithName(final String name) {
             return argThat(new MessageArgumentMatcher(name));
         }
     }
 
     public SubscriptionHelperManualIT() throws SalesforceException {
         server = new StubServer();
-        LoggerFactory.getLogger(SubscriptionHelperManualIT.class).info("Port 
for wireshark to filter: {}",
-                server.port());
+        LoggerFactory.getLogger(SubscriptionHelperManualIT.class)
+                .info("Port for wireshark to filter: {}",
+                        server.port());
         final String instanceUrl = "http://localhost:"; + server.port();
         server.replyTo(
                 "POST", "/services/oauth2/token",
@@ -103,67 +110,54 @@ public class SubscriptionHelperManualIT {
 
         server.replyTo(
                 "POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION 
+ "/handshake",
-                "[\n"
-                                                                               
               + "  {\n"
-                                                                               
               + "    \"ext\": {\n"
-                                                                               
               + "      \"replay\": true,\n"
-                                                                               
               + "      \"payload.format\": true\n"
-                                                                               
               + "    },\n"
-                                                                               
               + "    \"minimumVersion\": \"1.0\",\n"
-                                                                               
               + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                                                                               
               + "    \"supportedConnectionTypes\": [\n"
-                                                                               
               + "      \"long-polling\"\n"
-                                                                               
               + "    ],\n"
-                                                                               
               + "    \"channel\": \"/meta/handshake\",\n"
-                                                                               
               + "    \"id\": \"$id\",\n"
-                                                                               
               + "    \"version\": \"1.0\",\n"
-                                                                               
               + "    \"successful\": true\n"
-                                                                               
               + "  }\n"
-                                                                               
               + "]");
+                """
+                        [
+                          {
+                            "ext": {
+                              "replay": true,
+                              "payload.format": true
+                            },
+                            "minimumVersion": "1.0",
+                            "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                            "supportedConnectionTypes": [
+                              "long-polling"
+                            ],
+                            "channel": "/meta/handshake",
+                            "id": "$id",
+                            "version": "1.0",
+                            "successful": true
+                          }
+                        ]""");
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
+                req -> {
+                    return req.contains("\"timeout\":0");
+                }, """
+                        [
+                          {
+                            "clientId": "1f0agp5a95yiaeb1kifib37r5z4g",
+                            "advice": {
+                              "interval": 0,
+                              "timeout": 110000,
+                              "reconnect": "retry"
+                            },
+                            "channel": "/meta/connect",
+                            "id": "$id",
+                            "successful": true
+                          }
+                        ]""");
 
         server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
-                req -> req.contains("\"timeout\":0"), "[\n"
-                                                      + "  {\n"
-                                                      + "    \"clientId\": 
\"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
-                                                      + "    \"advice\": {\n"
-                                                      + "      \"interval\": 
0,\n"
-                                                      + "      \"timeout\": 
110000,\n"
-                                                      + "      \"reconnect\": 
\"retry\"\n"
-                                                      + "    },\n"
-                                                      + "    \"channel\": 
\"/meta/connect\",\n"
-                                                      + "    \"id\": 
\"$id\",\n"
-                                                      + "    \"successful\": 
true\n"
-                                                      + "  }\n"
-                                                      + "]");
-
-        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);
-
-        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n"
-                                                                               
                      + "  {\n"
-                                                                               
                      + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                                                                               
                      + "    \"channel\": \"/meta/subscribe\",\n"
-                                                                               
                      + "    \"id\": \"$id\",\n"
-                                                                               
                      + "    \"subscription\": \"/topic/Account\",\n"
-                                                                               
                      + "    \"successful\": true\n"
-                                                                               
                      + "  }\n"
-                                                                               
                      + "]");
-
-        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n"
-                                                                               
                        + "  {\n"
-                                                                               
                        + "    \"clientId\": 
\"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                                                                               
                        + "    \"channel\": \"/meta/unsubscribe\",\n"
-                                                                               
                        + "    \"id\": \"$id\",\n"
-                                                                               
                        + "    \"subscription\": \"/topic/Account\",\n"
-                                                                               
                        + "    \"successful\": true\n"
-                                                                               
                        + "  }\n"
-                                                                               
                        + "]");
-
-        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n"
-                                                                               
                       + "  {\n"
-                                                                               
                       + "     \"channel\": \"/meta/disconnect\",\n"
-                                                                               
                       + "     \"clientId\": \"client-id\"\n"
-                                                                               
                       + "   }\n"
-                                                                               
                       + "]");
+                messages);
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect",
+                """
+                        [
+                          {
+                             "channel": "/meta/disconnect",
+                             "clientId": "client-id"
+                           }
+                        ]""");
 
         server.replyTo("GET", "/services/oauth2/revoke", 200);
 
@@ -178,16 +172,13 @@ public class SubscriptionHelperManualIT {
         salesforce.setRefreshToken("refreshToken");
         salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN);
         salesforce.setConfig(config);
-
         salesforce.start();
         subscription = new SubscriptionHelper(salesforce);
+        subscription.start();
     }
 
     @BeforeEach
-    public void cleanSlate() throws CamelException {
-        if (toUnsubscribe != null) {
-            subscription.unsubscribe("Account", toUnsubscribe);
-        }
+    public void resetServer() throws CamelException {
         server.reset();
     }
 
@@ -200,93 +191,80 @@ public class SubscriptionHelperManualIT {
 
     @Test
     void shouldResubscribeOnConnectionFailures() throws InterruptedException {
-        // handshake and connect
-        subscription.start();
-
-        final StreamingApiConsumer consumer
-                = toUnsubscribe = mock(StreamingApiConsumer.class, 
"shouldResubscribeOnConnectionFailures:consumer");
-
-        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, 
"shouldResubscribeOnConnectionFailures:endpoint");
-
-        // subscribe
-        when(consumer.getTopicName()).thenReturn("Account");
-
-        when(consumer.getEndpoint()).thenReturn(endpoint);
-        when(endpoint.getConfiguration()).thenReturn(config);
-        when(endpoint.getComponent()).thenReturn(salesforce);
-        when(endpoint.getTopicName()).thenReturn("Account");
-
-        subscription.subscribe("Account", consumer);
-
-        // push one message so we know connection is established and consumer
-        // receives notifications
-        messages.add("[\n"
-                     + "  {\n"
-                     + "    \"data\": {\n"
-                     + "      \"event\": {\n"
-                     + "        \"createdDate\": 
\"2020-12-11T13:44:56.891Z\",\n"
-                     + "        \"replayId\": 1,\n"
-                     + "        \"type\": \"created\"\n"
-                     + "      },\n"
-                     + "      \"sobject\": {\n"
-                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
-                     + "        \"Name\": 
\"shouldResubscribeOnConnectionFailures 1\"\n"
-                     + "      }\n"
-                     + "    },\n"
-                     + "    \"channel\": \"/topic/Account\"\n"
-                     + "  },\n"
-                     + "  {\n"
-                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                     + "    \"channel\": \"/meta/connect\",\n"
-                     + "    \"id\": \"$id\",\n"
-                     + "    \"successful\": true\n"
-                     + "  }\n"
-                     + "]");
-
-        verify(consumer, 
Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
-                
messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
-
-        // send failed connection message w/o reconnect advice so we handshake 
again
-
-        messages.add("[\n" +
-                     "  {\n" +
-                     "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" +
-                     "    \"channel\": \"/meta/connect\",\n" +
-                     "    \"id\": \"$id\",\n" +
-                     "    \"successful\": false,\n" +
-                     "    \"advice\": {\n" +
-                     "       \"reconnect\": \"none\"\n" +
-                     "    }\n" +
-                     "  }\n" +
-                     "]");
-
-        // queue next message for when the client recovers
-        messages.add("[\n"
-                     + "  {\n"
-                     + "    \"data\": {\n"
-                     + "      \"event\": {\n"
-                     + "        \"createdDate\": 
\"2020-12-11T13:44:56.891Z\",\n"
-                     + "        \"replayId\": 2,\n"
-                     + "        \"type\": \"created\"\n"
-                     + "      },\n"
-                     + "      \"sobject\": {\n"
-                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
-                     + "        \"Name\": 
\"shouldResubscribeOnConnectionFailures 2\"\n"
-                     + "      }\n"
-                     + "    },\n"
-                     + "    \"channel\": \"/topic/Account\"\n"
-                     + "  },\n"
-                     + "  {\n"
-                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                     + "    \"channel\": \"/meta/connect\",\n"
-                     + "    \"id\": \"$id\",\n"
-                     + "    \"successful\": true\n"
-                     + "  }\n"
-                     + "]");
-
-        // assert last message was received, recovery can take a bit
-        verify(consumer, 
timeout(10000)).processMessage(any(ClientSessionChannel.class),
-                
messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+        var consumer = createConsumer("Account");
+        subscription.subscribe(consumer);
+
+        messages.add("""
+                [
+                  {
+                    "data": {
+                      "event": {
+                        "createdDate": "2020-12-11T13:44:56.891Z",
+                        "replayId": 1,
+                        "type": "created"
+                      },
+                      "sobject": {
+                        "Id": "0011n00002XWMgVAAX",
+                        "Name": "shouldResubscribeOnConnectionFailures 1"
+                      }
+                    },
+                    "channel": "/topic/Account"
+                  },
+                  {
+                    "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                    "channel": "/meta/connect",
+                    "id": "$id",
+                    "successful": true
+                  }
+                ]""");
+        verify(consumer, 
Mockito.timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        subscription.client.getChannel("/meta/subscribe").addListener(
+                (MessageListener) (clientSessionChannel, message) -> {
+                    var subscription = (String) message.get("subscription");
+                    if (subscription != null && 
subscription.contains("Account")) {
+                        messages.add("""
+                                [
+                                  {
+                                    "data": {
+                                      "event": {
+                                        "createdDate": 
"2020-12-11T13:44:57.891Z",
+                                        "replayId": 2,
+                                        "type": "created"
+                                      },
+                                      "sobject": {
+                                        "Id": "0011n00002XWMgVAAX",
+                                        "Name": 
"shouldResubscribeOnConnectionFailures 2"
+                                      }
+                                    },
+                                    "channel": "/topic/Account"
+                                  },
+                                  {
+                                    "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                                    "channel": "/meta/connect",
+                                    "id": "$id",
+                                    "successful": true
+                                  }
+                                ]""");
+                    }
+                });
+        subscription.client.disconnect(10000);
+        messages.add("""
+                [
+                  {
+                    "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                    "channel": "/meta/connect",
+                    "id": "$id",
+                    "successful": false,
+                    "advice": {
+                       "reconnect": "none"
+                    }
+                  }
+                ]""");
+
+        verify(consumer, 
timeout(20000)).processMessage(any(ClientSessionChannel.class),
+                messageWithName("shouldResubscribeOnConnectionFailures 2"));
 
         verify(consumer, atLeastOnce()).getEndpoint();
         verify(consumer, atLeastOnce()).getTopicName();
@@ -294,86 +272,168 @@ public class SubscriptionHelperManualIT {
     }
 
     @Test
-    void shouldResubscribeOnHelperRestart() {
-        // handshake and connect
-        subscription.start();
-
-        final StreamingApiConsumer consumer
-                = toUnsubscribe = mock(StreamingApiConsumer.class, 
"shouldResubscribeOnHelperRestart:consumer");
+    void shouldResubscribeOnSubscriptionFailure() {
+        var consumer = createConsumer("Contact");
+        var subscribeAttempts = new AtomicInteger(0);
+        subscription.client.getChannel("/meta/subscribe").addListener(
+                (MessageListener) (clientSessionChannel, message) -> {
+                    var subscription = (String) message.get("subscription");
+                    if (subscription != null && 
subscription.contains("Contact")) {
+                        subscribeAttempts.incrementAndGet();
+                    }
+                });
 
-        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, 
"shouldResubscribeOnHelperRestart:endpoint");
+        server.reset();
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe",
+                s -> s.contains("/topic/Contact"),
+                """
+                        [
+                          {
+                            "clientId": "5ra4927ikfky6cb12juthkpofeu8aaa",
+                            "channel": "/meta/subscribe",
+                            "id": "$id",
+                            "subscription": "/topic/Contact",
+                            "successful": false
+                          }
+                        ]""");
+        subscription.subscribe(consumer);
+        await().atMost(10, SECONDS).until(() -> subscribeAttempts.get() == 1);
+        
assertThat(subscription.client.getChannel("/topic/Contact").getSubscribers(), 
hasSize(0));
 
-        // subscribe
-        when(consumer.getTopicName()).thenReturn("Account");
+        server.reset();
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe",
+                s -> s.contains("/topic/Contact"),
+                """
+                        [
+                          {
+                            "clientId": "5ra4927ikfky6cb12juthkpofeu8bbb",
+                            "channel": "/meta/subscribe",
+                            "id": "$id",
+                            "subscription": "/topic/Contact",
+                            "successful": true
+                          }
+                        ]""");
+        messages.add("""
+                [
+                  {
+                    "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                    "channel": "/meta/connect",
+                    "id": "$id",
+                    "successful": true
+                  }
+                ]""");
+        await().atMost(10, SECONDS).until(() -> subscribeAttempts.get() == 2);
+        
assertThat(subscription.client.getChannel("/topic/Contact").getSubscribers(), 
hasSize(1));
+    }
 
-        when(consumer.getEndpoint()).thenReturn(endpoint);
-        when(endpoint.getConfiguration()).thenReturn(config);
-        when(endpoint.getComponent()).thenReturn(salesforce);
-        when(endpoint.getTopicName()).thenReturn("Account");
-
-        subscription.subscribe("Account", consumer);
-
-        // push one message so we know connection is established and consumer
-        // receives notifications
-        messages.add("[\n"
-                     + "  {\n"
-                     + "    \"data\": {\n"
-                     + "      \"event\": {\n"
-                     + "        \"createdDate\": 
\"2020-12-11T13:44:56.891Z\",\n"
-                     + "        \"replayId\": 1,\n"
-                     + "        \"type\": \"created\"\n"
-                     + "      },\n"
-                     + "      \"sobject\": {\n"
-                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
-                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 
1\"\n"
-                     + "      }\n"
-                     + "    },\n"
-                     + "    \"channel\": \"/topic/Account\"\n"
-                     + "  },\n"
-                     + "  {\n"
-                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                     + "    \"channel\": \"/meta/connect\",\n"
-                     + "    \"id\": \"$id\",\n"
-                     + "    \"successful\": true\n"
-                     + "  }\n"
-                     + "]");
-        verify(consumer, 
timeout(100)).processMessage(any(ClientSessionChannel.class),
-                
messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 1"));
+    @Test
+    void shouldResubscribeOnHelperRestart() {
+        var consumer = createConsumer("Person");
+        subscription.subscribe(consumer);
+
+        messages.add("""
+                [
+                  {
+                    "data": {
+                      "event": {
+                        "createdDate": "2020-12-11T13:44:56.891Z",
+                        "replayId": 1,
+                        "type": "created"
+                      },
+                      "sobject": {
+                        "Id": "0011n00002XWMgVAAX",
+                        "Name": "shouldResubscribeOnHelperRestart 1"
+                      }
+                    },
+                    "channel": "/topic/Person"
+                  },
+                  {
+                    "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                    "channel": "/meta/connect",
+                    "id": "$id",
+                    "successful": true
+                  }
+                ]""");
+        verify(consumer, 
timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageWithName("shouldResubscribeOnHelperRestart 1"));
 
         // stop and start the subscription helper
         subscription.stop();
         subscription.start();
 
         // queue next message for when the client recovers
-        messages.add("[\n"
-                     + "  {\n"
-                     + "    \"data\": {\n"
-                     + "      \"event\": {\n"
-                     + "        \"createdDate\": 
\"2020-12-11T13:44:56.891Z\",\n"
-                     + "        \"replayId\": 2,\n"
-                     + "        \"type\": \"created\"\n"
-                     + "      },\n"
-                     + "      \"sobject\": {\n"
-                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
-                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 
2\"\n"
-                     + "      }\n"
-                     + "    },\n"
-                     + "    \"channel\": \"/topic/Account\"\n"
-                     + "  },\n"
-                     + "  {\n"
-                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                     + "    \"channel\": \"/meta/connect\",\n"
-                     + "    \"id\": \"$id\",\n"
-                     + "    \"successful\": true\n"
-                     + "  }\n"
-                     + "]");
+        messages.add("""
+                [
+                  {
+                    "data": {
+                      "event": {
+                        "createdDate": "2020-12-11T13:44:56.891Z",
+                        "replayId": 2,
+                        "type": "created"
+                      },
+                      "sobject": {
+                        "Id": "0011n00002XWMgVAAX",
+                        "Name": "shouldResubscribeOnHelperRestart 2"
+                      }
+                    },
+                    "channel": "/topic/Person"
+                  },
+                  {
+                    "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                    "channel": "/meta/connect",
+                    "id": "$id",
+                    "successful": true
+                  }
+                ]""");
 
         // assert last message was received, recovery can take a bit
-        verify(consumer, 
timeout(2000)).processMessage(any(ClientSessionChannel.class),
-                
messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 2"));
+        verify(consumer, 
timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageWithName("shouldResubscribeOnHelperRestart 2"));
 
         verify(consumer, atLeastOnce()).getEndpoint();
         verify(consumer, atLeastOnce()).getTopicName();
         verifyNoMoreInteractions(consumer);
     }
+
+    private StreamingApiConsumer createConsumer(String topic) {
+        var endpoint = createAccountEndpoint(topic);
+        var consumer = mock(StreamingApiConsumer.class, topic + ":consumer");
+        when(consumer.getTopicName()).thenReturn(topic);
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe",
+                s -> s.contains("\"subscription\":\"/topic/" + topic + "\""),
+                """
+                        [
+                          {
+                            "clientId": "5ra4927ikfky6cb12juthkpofeu8qqq",
+                            "channel": "/meta/subscribe",
+                            "id": "$id",
+                            "subscription": "/topic/""" + topic + "\"," + """
+                            "successful": true
+                          }
+                        ]""");
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe",
+                s -> s.contains("\"subscription\":\"/topic/" + topic + "\""),
+                """
+                        [
+                          {
+                            "clientId": "5ra4927ikfky6cb12juthkpofeu8",
+                            "channel": "/meta/unsubscribe",
+                            "id": "$id",
+                            "subscription": "/topic/""" + topic + "\"," + """
+                            "successful": true
+                          }
+                        ]""");
+        return consumer;
+    }
+
+    private SalesforceEndpoint createAccountEndpoint(String topic) {
+        SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, topic + 
":endpoint");
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn(topic);
+        return endpoint;
+    }
+
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index 5670219c21e..42d68baeee0 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -29,10 +29,14 @@ import 
org.apache.camel.component.salesforce.SalesforceLoginConfig;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.cometd.client.BayeuxClient;
+import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
 import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
@@ -166,4 +170,21 @@ public class SubscriptionHelperTest {
         assertNotNull(bayeuxClient);
         verify(session).login(null);
     }
+
+    @Test
+    public void 
defaultLongPollingTimeoutShouldBeGreaterThanSalesforceTimeout() throws 
SalesforceException {
+        var endpointConfig = new SalesforceEndpointConfig();
+        endpointConfig.setHttpClient(mock(SalesforceHttpClient.class));
+        var session = mock(SalesforceSession.class);
+        var component = mock(SalesforceComponent.class);
+        when(component.getLoginConfig()).thenReturn(new 
SalesforceLoginConfig());
+        when(component.getConfig()).thenReturn(endpointConfig);
+        when(component.getSession()).thenReturn(session);
+        var bayeuxClient = SubscriptionHelper.createClient(component, session);
+
+        var longPollingTimeout = 
bayeuxClient.getTransport("long-polling").getOption(MAX_NETWORK_DELAY_OPTION);
+
+        MatcherAssert.assertThat(longPollingTimeout, 
instanceOf(Integer.class));
+        MatcherAssert.assertThat((Integer) longPollingTimeout, 
greaterThan(110000));
+    }
 }

Reply via email to