This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new c32244de7bb Camel-20918: fix exception handling for Salesforce Streaming API (#14778) c32244de7bb is described below commit c32244de7bbc0d4ef86496d4f842dde99d02fa12 Author: Bartosz Popiela <bartosz...@gmail.com> AuthorDate: Mon Jul 22 12:24:59 2024 +0200 Camel-20918: fix exception handling for Salesforce Streaming API (#14778) * CAMEL-20918: set MAX_NETWORK_DELAY_OPTION to be 120 sec as per https://github.com/cometd/cometd/issues/1142#issuecomment-1048256297 and https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_timeouts.htm * CAMEL-20918: fix exception handling for Salesforce Streaming API: - execute subscriptionListener in a separate thread similarly to handshakeListener and connectListener, - fix SubscriptionHelper#listenerMap remaining empty when an exception occurs while subscribing to a channel, - remove SubscriptionHelper#handshake as there is always a thread hanging in this method as per https://issues.apache.org/jira/browse/CAMEL-20388, - remove the call to closeChannel for topic listeners as they will be removed on SubscriptionHelper#unsubscribe * CAMEL-20918: fix a failing build * CAMEL-20918: set MAX_NETWORK_DELAY_OPTION as first to allow overwriting * CAMEL-20918: update replayId per channel when explicitly subscribed and not after rehandshake * CAMEL-20918: fix replay extension not to overwrite ReplayId per channel if already set * CAMEL-20918: fix formatting * CAMEL-20918: remove unnecessary log --- .../component/salesforce/StreamingApiConsumer.java | 9 +- .../internal/streaming/ReplayExtension.java | 4 +- .../internal/streaming/SubscriptionHelper.java | 521 ++++++++------------- .../internal/streaming/ReplayExtensionTest.java | 4 +- .../salesforce/internal/streaming/StubServer.java | 4 +- .../streaming/SubscriptionHelperManualIT.java | 512 +++++++++++--------- .../internal/streaming/SubscriptionHelperTest.java | 21 + 7 files changed, 526 insertions(+), 549 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java index 02f63c61a25..4dcfd100a6e 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java @@ -292,10 +292,15 @@ public class StreamingApiConsumer extends DefaultConsumer { // subscribe to topic ServiceHelper.startService(subscriptionHelper); - subscriptionHelper.subscribe(topicName, this); + subscriptionHelper.subscribe(this); subscribed = true; } + @Override + public SalesforceEndpoint getEndpoint() { + return this.endpoint; + } + /** * Stops this consumer. * @@ -324,7 +329,7 @@ public class StreamingApiConsumer extends DefaultConsumer { if (subscribed) { subscribed = false; // unsubscribe from topic - subscriptionHelper.unsubscribe(topicName, this); + subscriptionHelper.unsubscribe(this); } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java index 287d4400da5..0d2d32f012e 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java @@ -48,8 +48,8 @@ public class ReplayExtension implements Extension { private final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap<>(); private final AtomicBoolean supported = new AtomicBoolean(); - public void addChannelReplayId(final String channelName, final long replayId) { - dataMap.put(channelName, replayId); + public void setReplayIdIfAbsent(final String channelName, final long replayId) { + dataMap.putIfAbsent(channelName, replayId); } @Override diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 4cf48067b69..0ffda9096fe 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -21,14 +21,22 @@ import java.net.CookieManager; import java.net.CookiePolicy; import java.net.CookieStore; import java.net.URI; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.apache.camel.CamelException; -import org.apache.camel.component.salesforce.*; +import org.apache.camel.component.salesforce.SalesforceComponent; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.SalesforceEndpointConfig; +import org.apache.camel.component.salesforce.SalesforceHttpClient; +import org.apache.camel.component.salesforce.StreamingApiConsumer; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.support.service.ServiceSupport; @@ -46,9 +54,12 @@ import org.eclipse.jetty.http.HttpHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Collections.emptySet; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.cometd.bayeux.Channel.*; +import static org.cometd.bayeux.Channel.META_CONNECT; +import static org.cometd.bayeux.Channel.META_HANDSHAKE; +import static org.cometd.bayeux.Channel.META_SUBSCRIBE; import static org.cometd.bayeux.Message.ERROR_FIELD; import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD; @@ -58,13 +69,13 @@ public class SubscriptionHelper extends ServiceSupport { private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class); - private static final int CONNECT_TIMEOUT = 110; + private static final int HANDSHAKE_TIMEOUT_SEC = 120; private static final String FAILURE_FIELD = "failure"; private static final String EXCEPTION_FIELD = "exception"; private static final String SFDC_FIELD = "sfdc"; private static final String FAILURE_REASON_FIELD = "failureReason"; - private static final int DISCONNECT_INTERVAL = 5000; + private static final String SERVER_TOO_BUSY_ERROR = "503::"; private static final String AUTHENTICATION_INVALID = "401::Authentication invalid"; private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*"; @@ -73,32 +84,166 @@ public class SubscriptionHelper extends ServiceSupport { private final SalesforceComponent component; private SalesforceSession session; - private final long timeout = 60 * 1000L; - private final Map<StreamingApiConsumer, ClientSessionChannel.MessageListener> listenerMap; private final long maxBackoff; private final long backoffIncrement; - - private ClientSessionChannel.MessageListener handshakeListener; - private ClientSessionChannel.MessageListener connectListener; - private volatile String handshakeError; private volatile Exception handshakeException; private volatile String connectError; private volatile Exception connectException; - private volatile boolean reconnecting; private final AtomicLong handshakeBackoff; - private final AtomicBoolean handshaking = new AtomicBoolean(); + + private final Map<String, Set<StreamingApiConsumer>> channelToConsumers = new ConcurrentHashMap<>(); + + private final Map<StreamingApiConsumer, ClientSessionChannel.MessageListener> consumerToListener + = new ConcurrentHashMap<>(); + + private final Set<String> channelsToSubscribe = ConcurrentHashMap.newKeySet(); + + private final ClientSessionChannel.MessageListener handshakeListener = createHandshakeListener(); + + private final ClientSessionChannel.MessageListener subscriptionListener = createSubscriptionListener(); + + private final ClientSessionChannel.MessageListener connectListener = createConnectionListener(); public SubscriptionHelper(final SalesforceComponent component) { this.component = component; - listenerMap = new ConcurrentHashMap<>(); handshakeBackoff = new AtomicLong(); backoffIncrement = component.getConfig().getBackoffIncrement(); maxBackoff = component.getConfig().getMaxBackoff(); } + private MessageListener createHandshakeListener() { + return (channel, message) -> component.getHttpClient().getWorkerPool().execute(() -> { + LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message); + + if (!message.isSuccessful()) { + LOG.warn("Handshake failure: {}", message); + handshakeError = (String) message.get(ERROR_FIELD); + handshakeException = getFailure(message); + if (handshakeError != null) { + if (handshakeError.startsWith("403::")) { + String failureReason = getFailureReason(message); + if (failureReason.equals(AUTHENTICATION_INVALID)) { + LOG.debug( + "attempting login due to handshake error: 403 -> 401::Authentication invalid"); + session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); + } + } + } + // failed, so keep trying + LOG.debug("Handshake failed, so try again."); + client.handshake(); + } else if (!channelToConsumers.isEmpty()) { + channelsToSubscribe.clear(); + channelsToSubscribe.addAll(channelToConsumers.keySet()); + LOG.info("Handshake successful. Channels to subscribe: {}", channelsToSubscribe); + } + }); + } + + private MessageListener createConnectionListener() { + return (channel, message) -> component.getHttpClient().getWorkerPool().execute(() -> { + LOG.debug("[CHANNEL:META_CONNECT]: {}", message); + + if (!message.isSuccessful()) { + LOG.warn("Connect failure: {}", message); + connectError = (String) message.get(ERROR_FIELD); + connectException = getFailure(message); + + if (connectError != null && connectError.equals(AUTHENTICATION_INVALID)) { + LOG.debug("connectError: {}", connectError); + LOG.debug("Attempting login..."); + session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); + } + if (message.getAdvice() == null || "none".equals(message.getAdvice().get("reconnect"))) { + LOG.debug("Advice == none, so handshaking"); + client.handshake(); + } + } else if (!channelsToSubscribe.isEmpty()) { + LOG.info("Subscribing to channels: {}", channelsToSubscribe); + for (var channelName : channelsToSubscribe) { + for (var consumer : channelToConsumers.get(channelName)) { + subscribe(consumer); + } + } + } + }); + } + + private MessageListener createSubscriptionListener() { + return (channel, message) -> component.getHttpClient().getWorkerPool().execute(() -> { + LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message); + var channelName = message.getOrDefault(SUBSCRIPTION_FIELD, "").toString(); + if (!message.isSuccessful()) { + LOG.warn("Subscription failure: {}", message); + var consumers = channelToConsumers.getOrDefault(channelName, emptySet()); + consumers.stream().findFirst().ifPresent(salesforceConsumer -> subscriptionFailed(salesforceConsumer, message)); + } else { + // remember subscription + LOG.info("Subscribed to channel {}", channelName); + channelsToSubscribe.remove(channelName); + + // reset backoff interval + handshakeBackoff.set(0); + } + }); + } + + private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message message) { + var channelName = message.getOrDefault(SUBSCRIPTION_FIELD, "").toString(); + var consumers = channelToConsumers.getOrDefault(channelName, emptySet()); + + String error = (String) message.get(ERROR_FIELD); + if (error == null) { + error = "Missing error message"; + } + + Exception failure = getFailure(message); + String msg = String.format("Error subscribing to %s: %s", firstConsumer.getTopicName(), + failure != null ? failure.getMessage() : error); + boolean abort = true; + + LOG.warn(msg); + if (isTemporaryError(message)) { + + // retry after delay + final long backoff = handshakeBackoff.getAndAdd(backoffIncrement); + if (backoff > maxBackoff) { + LOG.error("Subscribe aborted after exceeding {} msecs backoff", maxBackoff); + } else { + abort = false; + + try { + LOG.debug("Pausing for {} msecs before subscribe attempt", backoff); + Thread.sleep(backoff); + for (var consumer : consumers) { + subscribe(consumer); + } + } catch (InterruptedException e) { + LOG.warn("Aborting subscribe on interrupt!", e); + } + } + } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) { + abort = false; + long fallBackReplayId + = ((SalesforceEndpoint) firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId(); + LOG.warn(error); + LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName); + REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, fallBackReplayId); + for (var consumer : consumers) { + subscribe(consumer); + } + } + + if (abort && client != null) { + for (var consumer : consumers) { + consumer.handleException(msg, new SalesforceException(msg, failure)); + } + } + } + @Override protected void doStart() throws Exception { session = component.getSession(); @@ -111,97 +256,20 @@ public class SubscriptionHelper extends ServiceSupport { client = createClient(component, session); initMessageListeners(); - connect(); + handshake(); } private void initMessageListeners() { - // listener for handshake error or exception - if (handshakeListener == null) { - // first start - handshakeListener = new ClientSessionChannel.MessageListener() { - public void onMessage(ClientSessionChannel channel, Message message) { - component.getHttpClient().getWorkerPool().execute(() -> { - LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message); - - if (!message.isSuccessful()) { - LOG.warn("Handshake failure: {}", message); - handshakeError = (String) message.get(ERROR_FIELD); - handshakeException = getFailure(message); - if (handshakeError != null) { - if (handshakeError.startsWith("403::")) { - String failureReason = getFailureReason(message); - if (failureReason.equals(AUTHENTICATION_INVALID)) { - LOG.debug( - "attempting login due to handshake error: 403 -> 401::Authentication invalid"); - session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); - } - } - } - - // failed, so keep trying - LOG.debug("Handshake failed, so try again."); - handshake(); - - } else if (!listenerMap.isEmpty()) { - reconnecting = true; - } - }); - } - }; - } client.getChannel(META_HANDSHAKE).addListener(handshakeListener); - - // listener for connect error - if (connectListener == null) { - connectListener = new ClientSessionChannel.MessageListener() { - public void onMessage(ClientSessionChannel channel, Message message) { - component.getHttpClient().getWorkerPool().execute(() -> { - LOG.debug("[CHANNEL:META_CONNECT]: {}", message); - - if (!message.isSuccessful()) { - - LOG.warn("Connect failure: {}", message); - connectError = (String) message.get(ERROR_FIELD); - connectException = getFailure(message); - client.disconnect(); - - if (connectError != null && connectError.equals(AUTHENTICATION_INVALID)) { - LOG.debug("connectError: {}", connectError); - LOG.debug("Attempting login..."); - session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); - } - // Server says don't retry to connect, so we'll handshake instead - // Otherwise, Bayeux client automatically re-attempts connection - if (message.getAdvice() != null && - !message.getAdvice().get("reconnect").equals("retry")) { - LOG.debug("Advice != retry, so handshaking"); - handshake(); - } - } else if (reconnecting) { - LOG.debug("Refreshing subscriptions to {} channels on reconnect", listenerMap.size()); - // reconnected to Salesforce, subscribe to existing - // channels - final Map<StreamingApiConsumer, MessageListener> map = new HashMap<>(listenerMap); - listenerMap.clear(); - for (Map.Entry<StreamingApiConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) { - final StreamingApiConsumer consumer = entry.getKey(); - final String topicName = consumer.getTopicName(); - subscribe(topicName, consumer); - } - reconnecting = false; - } - }); - } - }; - } + client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener); client.getChannel(META_CONNECT).addListener(connectListener); } - private void connect() throws CamelException { + private void handshake() throws CamelException { // connect to Salesforce cometd endpoint client.handshake(); - final long waitMs = MILLISECONDS.convert(CONNECT_TIMEOUT, SECONDS); + final long waitMs = MILLISECONDS.convert(HANDSHAKE_TIMEOUT_SEC, SECONDS); if (!client.waitFor(waitMs, BayeuxClient.State.CONNECTED)) { if (handshakeException != null) { throw new CamelException( @@ -214,105 +282,9 @@ public class SubscriptionHelper extends ServiceSupport { } else if (connectError != null) { throw new CamelException(String.format("Error during CONNECT: %s", connectError)); } else { - throw new CamelException(String.format("Handshake request timeout after %s seconds", CONNECT_TIMEOUT)); - } - } - } - - private void handshake() { - LOG.debug("Begin handshake if not already in progress."); - if (!handshaking.compareAndSet(false, true)) { - return; - } - - LOG.debug("Continuing with handshake."); - try { - doHandshake(); - } finally { - handshaking.set(false); - } - } - - private void doHandshake() { - if (isStoppingOrStopped()) { - return; - } - - LOG.info("Handshaking after unexpected disconnect from Salesforce..."); - boolean abort = false; - - // wait for disconnect - LOG.debug("Waiting to disconnect..."); - while (!abort && !client.isDisconnected()) { - try { - Thread.sleep(DISCONNECT_INTERVAL); - } catch (InterruptedException e) { - LOG.error("Aborting handshake on interrupt!"); - abort = true; - Thread.currentThread().interrupt(); - } - - abort = abort || isStoppingOrStopped(); - } - - if (!abort) { - - // update handshake attempt backoff - final long backoff = handshakeBackoff.getAndAdd(backoffIncrement); - if (backoff > maxBackoff) { - LOG.error("Handshake aborted after exceeding {} msecs backoff", maxBackoff); - abort = true; - } else { - - // pause before handshake attempt - LOG.debug("Pausing for {} msecs before handshake attempt", backoff); - try { - Thread.sleep(backoff); - } catch (InterruptedException e) { - LOG.error("Aborting handshake on interrupt!"); - abort = true; - Thread.currentThread().interrupt(); - } - } - - if (!abort) { - Exception lastError = new SalesforceException("Unknown error", null); - try { - // reset client. If we fail to stop and logout, catch the exception - // so we can still continue to doStart() - if (client != null) { - client.disconnect(); - boolean disconnected = client.waitFor(timeout, State.DISCONNECTED); - if (!disconnected) { - LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(component), - timeout); - client.abort(); - } - - client.handshake(); - final long waitMs = MILLISECONDS.convert(CONNECT_TIMEOUT, SECONDS); - client.waitFor(waitMs, BayeuxClient.State.CONNECTED); - } - } catch (Exception e) { - LOG.error("Error handshaking: {}", e.getMessage(), e); - lastError = e; - } - - if (client != null && client.isHandshook()) { - LOG.debug("Successful handshake!"); - // reset backoff interval - handshakeBackoff.set(backoffIncrement); - } else { - LOG.error("Failed to handshake after pausing for {} msecs", backoff); - if ((backoff + backoffIncrement) > maxBackoff) { - // notify all consumers - String abortMsg = "Aborting handshake attempt due to: " + lastError.getMessage(); - SalesforceException ex = new SalesforceException(abortMsg, lastError); - for (StreamingApiConsumer consumer : listenerMap.keySet()) { - consumer.handleException(abortMsg, ex); - } - } - } + throw new CamelException( + String.format("Handshake request timeout after %s seconds", + HANDSHAKE_TIMEOUT_SEC)); } } } @@ -333,37 +305,32 @@ public class SubscriptionHelper extends ServiceSupport { return exception; } - private void closeChannel(final String name, MessageListener listener) { + private void closeChannel(final String name) { if (client == null) { return; } final ClientSessionChannel channel = client.getChannel(name); - channel.removeListener(listener); + for (var listener : channel.getListeners()) { + channel.removeListener(listener); + } channel.release(); } @Override protected void doStop() throws Exception { - closeChannel(META_CONNECT, connectListener); - closeChannel(META_HANDSHAKE, handshakeListener); - - for (Map.Entry<StreamingApiConsumer, MessageListener> entry : listenerMap.entrySet()) { - final StreamingApiConsumer consumer = entry.getKey(); - final String topic = consumer.getTopicName(); - - final MessageListener listener = entry.getValue(); - closeChannel(getChannelName(topic), listener); - } + closeChannel(META_CONNECT); + closeChannel(META_SUBSCRIBE); + closeChannel(META_HANDSHAKE); if (client == null) { return; } client.disconnect(); - boolean disconnected = client.waitFor(timeout, State.DISCONNECTED); + boolean disconnected = client.waitFor(60_000, State.DISCONNECTED); if (!disconnected) { - LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(component), timeout); + LOG.warn("Could not disconnect client connected to: {}", getEndpointUrl(component)); client.abort(); } @@ -381,9 +348,13 @@ public class SubscriptionHelper extends ServiceSupport { final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); Map<String, Object> options = new HashMap<>(); - options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, httpClient.getTimeout()); + /* + The timeout should be greater than 110 sec as per https://github.com/cometd/cometd/issues/1142#issuecomment-1048256297 + and https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_timeouts.htm + */ + options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, 120000); if (component.getLongPollingTransportProperties() != null) { - options = component.getLongPollingTransportProperties(); + options.putAll(component.getLongPollingTransportProperties()); } // check login access token @@ -442,109 +413,25 @@ public class SubscriptionHelper extends ServiceSupport { return client; } - public void subscribe(final String topicName, final StreamingApiConsumer consumer) { - subscribe(topicName, consumer, false); - } - - public void subscribe( - final String topicName, final StreamingApiConsumer consumer, - final boolean skipReplayId) { + public synchronized void subscribe(StreamingApiConsumer consumer) { // create subscription for consumer - final String channelName = getChannelName(topicName); + final String channelName = getChannelName(consumer.getTopicName()); + channelToConsumers.computeIfAbsent(channelName, key -> ConcurrentHashMap.newKeySet()).add(consumer); + channelsToSubscribe.add(channelName); - if (!reconnecting && !skipReplayId) { - setupReplay((SalesforceEndpoint) consumer.getEndpoint()); - } + setReplayIdIfAbsent(consumer.getEndpoint()); // channel message listener LOG.info("Subscribing to channel {}...", channelName); - final ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener() { - - @Override - public void onMessage(ClientSessionChannel channel, Message message) { - LOG.debug("Received Message: {}", message); - // convert CometD message to Camel Message - consumer.processMessage(channel, message); - } - }; - - // listener for subscription - final ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener() { - public void onMessage(ClientSessionChannel channel, Message message) { - LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message); - final String subscribedChannelName = message.getOrDefault(SUBSCRIPTION_FIELD, "").toString(); - if ("".equals(subscribedChannelName)) { - LOG.warn("[CHANNEL:META_SUBSCRIBE]: No field {} found. Skipping message: {}", SUBSCRIPTION_FIELD, message); - return; - } - if (channelName.equals(subscribedChannelName)) { - if (!message.isSuccessful()) { - String error = (String) message.get(ERROR_FIELD); - if (error == null) { - error = "Missing error message"; - } - - Exception failure = getFailure(message); - String msg = String.format("Error subscribing to %s: %s", topicName, - failure != null ? failure.getMessage() : error); - boolean abort = true; - - if (isTemporaryError(message)) { - LOG.warn(msg); - - // retry after delay - final long backoff = handshakeBackoff.getAndAdd(backoffIncrement); - if (backoff > maxBackoff) { - LOG.error("Subscribe aborted after exceeding {} msecs backoff", maxBackoff); - } else { - abort = false; - - try { - LOG.debug("Pausing for {} msecs before subscribe attempt", backoff); - Thread.sleep(backoff); - - component.getHttpClient().getWorkerPool().execute(() -> subscribe(topicName, consumer)); - } catch (InterruptedException e) { - LOG.warn("Aborting subscribe on interrupt!", e); - Thread.currentThread().interrupt(); - } - } - } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) { - abort = false; - final Long fallBackReplayId - = ((SalesforceEndpoint) consumer.getEndpoint()).getConfiguration().getFallBackReplayId(); - LOG.warn(error); - LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName); - REPLAY_EXTENSION.addChannelReplayId(channelName, fallBackReplayId); - subscribe(topicName, consumer, true); - } - - if (abort && client != null) { - consumer.handleException(msg, new SalesforceException(msg, failure)); - } - } else { - // remember subscription - LOG.info("Subscribed to channel {}", subscribedChannelName); - listenerMap.put(consumer, listener); - - // reset backoff interval - handshakeBackoff.set(0); - } - - // remove this subscription listener - if (client != null) { - client.getChannel(META_SUBSCRIBE).removeListener(this); - } else { - LOG.warn("Trying to handle a subscription message but the client is already destroyed"); - } - } - } - }; - client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener); + var messageListener = consumerToListener.computeIfAbsent(consumer, key -> (channel, message) -> { + LOG.debug("Received Message: {}", message); + // convert CometD message to Camel Message + consumer.processMessage(channel, message); + }); // subscribe asynchronously final ClientSessionChannel clientChannel = client.getChannel(channelName); - clientChannel.subscribe(listener); + clientChannel.subscribe(messageListener); } private static boolean isTemporaryError(Message message) { @@ -564,7 +451,7 @@ public class SubscriptionHelper extends ServiceSupport { return failureReason; } - void setupReplay(final SalesforceEndpoint endpoint) { + private void setReplayIdIfAbsent(final SalesforceEndpoint endpoint) { final String topicName = endpoint.getTopicName(); final Optional<Long> replayId = determineReplayIdFor(endpoint, topicName); @@ -573,9 +460,7 @@ public class SubscriptionHelper extends ServiceSupport { final Long replayIdValue = replayId.get(); - LOG.info("Set Replay extension to replay from `{}` for channel `{}`", replayIdValue, channelName); - - REPLAY_EXTENSION.addChannelReplayId(channelName, replayIdValue); + REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue); } } @@ -621,20 +506,26 @@ public class SubscriptionHelper extends ServiceSupport { return channelName.toString(); } - public void unsubscribe(String topicName, StreamingApiConsumer consumer) { - + public synchronized void unsubscribe(StreamingApiConsumer consumer) { // channel name - final String channelName = getChannelName(topicName); + final String channelName = getChannelName(consumer.getTopicName()); // unsubscribe from channel - final ClientSessionChannel.MessageListener listener = listenerMap.remove(consumer); + var consumers = channelToConsumers.get(channelName); + if (consumers != null) { + consumers.remove(consumer); + if (consumers.isEmpty()) { + channelToConsumers.remove(channelName); + } + } + final ClientSessionChannel.MessageListener listener = consumerToListener.remove(consumer); if (listener != null) { - LOG.debug("Unsubscribing from channel {}...", channelName); final ClientSessionChannel clientChannel = client.getChannel(channelName); // if there are other listeners on this channel, an unsubscribe message will not be sent, // so we're not going to listen for and expect an unsub response. Just unsub and move on. clientChannel.unsubscribe(listener); + clientChannel.release(); } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java index 4acf1adb510..e6008a4d8dd 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java @@ -84,7 +84,7 @@ public class ReplayExtensionTest { final ReplayExtension replayExtension = new ReplayExtension(); replayExtension.rcvMeta(null, createHandshakeMessage(true)); - replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L); + replayExtension.setReplayIdIfAbsent(pushTopicMessage.getChannel(), 123L); replayExtension.rcv(null, pushTopicMessage); @@ -100,7 +100,7 @@ public class ReplayExtensionTest { final ReplayExtension replayExtension = new ReplayExtension(); replayExtension.rcvMeta(null, createHandshakeMessage(true)); - replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L); + replayExtension.setReplayIdIfAbsent(pushTopicMessage.getChannel(), 123L); replayExtension.rcv(null, pushTopicMessage); diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java index 4429b657a90..1962cdf7b43 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java @@ -52,8 +52,8 @@ class StubServer { class StubHandler extends Handler.Abstract { private StubResponse stubFor(final Request request, final String body) throws IOException { - final List<StubResponse> allResponses = new ArrayList<>(defaultStubs); - allResponses.addAll(stubs); + final List<StubResponse> allResponses = new ArrayList<>(stubs); + allResponses.addAll(defaultStubs); for (final StubResponse stub : allResponses) { if (stub.matches(request, body)) { diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java index 8967103566e..abbdcbacfab 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperManualIT.java @@ -19,6 +19,7 @@ package org.apache.camel.component.salesforce.internal.streaming; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelContext; import org.apache.camel.CamelException; @@ -31,6 +32,7 @@ import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.impl.DefaultCamelContext; import org.cometd.bayeux.Message; import org.cometd.bayeux.client.ClientSessionChannel; +import org.cometd.bayeux.client.ClientSessionChannel.MessageListener; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -40,7 +42,11 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.slf4j.LoggerFactory; -import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperManualIT.MessageArgumentMatcher.messageForAccountCreationWithName; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperManualIT.MessageArgumentMatcher.messageWithName; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.atLeastOnce; @@ -58,9 +64,7 @@ public class SubscriptionHelperManualIT { final BlockingQueue<String> messages = new LinkedBlockingDeque<>(); final SalesforceComponent salesforce; final StubServer server; - final SubscriptionHelper subscription; - - StreamingApiConsumer toUnsubscribe; + private SubscriptionHelper subscription; static class MessageArgumentMatcher implements ArgumentMatcher<Message> { @@ -74,21 +78,24 @@ public class SubscriptionHelperManualIT { public boolean matches(final Message message) { final Map<String, Object> data = message.getDataAsMap(); @SuppressWarnings("unchecked") - final Map<String, Object> event = (Map<String, Object>) data.get("event"); + final Map<String, Object> event = (Map<String, Object>) data.get( + "event"); @SuppressWarnings("unchecked") - final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject"); + final Map<String, Object> sobject = (Map<String, Object>) data.get( + "sobject"); return "created".equals(event.get("type")) && name.equals(sobject.get("Name")); } - static Message messageForAccountCreationWithName(final String name) { + static Message messageWithName(final String name) { return argThat(new MessageArgumentMatcher(name)); } } public SubscriptionHelperManualIT() throws SalesforceException { server = new StubServer(); - LoggerFactory.getLogger(SubscriptionHelperManualIT.class).info("Port for wireshark to filter: {}", - server.port()); + LoggerFactory.getLogger(SubscriptionHelperManualIT.class) + .info("Port for wireshark to filter: {}", + server.port()); final String instanceUrl = "http://localhost:" + server.port(); server.replyTo( "POST", "/services/oauth2/token", @@ -103,67 +110,54 @@ public class SubscriptionHelperManualIT { server.replyTo( "POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", - "[\n" - + " {\n" - + " \"ext\": {\n" - + " \"replay\": true,\n" - + " \"payload.format\": true\n" - + " },\n" - + " \"minimumVersion\": \"1.0\",\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"supportedConnectionTypes\": [\n" - + " \"long-polling\"\n" - + " ],\n" - + " \"channel\": \"/meta/handshake\",\n" - + " \"id\": \"$id\",\n" - + " \"version\": \"1.0\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); + """ + [ + { + "ext": { + "replay": true, + "payload.format": true + }, + "minimumVersion": "1.0", + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "supportedConnectionTypes": [ + "long-polling" + ], + "channel": "/meta/handshake", + "id": "$id", + "version": "1.0", + "successful": true + } + ]"""); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", + req -> { + return req.contains("\"timeout\":0"); + }, """ + [ + { + "clientId": "1f0agp5a95yiaeb1kifib37r5z4g", + "advice": { + "interval": 0, + "timeout": 110000, + "reconnect": "retry" + }, + "channel": "/meta/connect", + "id": "$id", + "successful": true + } + ]"""); server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", - req -> req.contains("\"timeout\":0"), "[\n" - + " {\n" - + " \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n" - + " \"advice\": {\n" - + " \"interval\": 0,\n" - + " \"timeout\": 110000,\n" - + " \"reconnect\": \"retry\"\n" - + " },\n" - + " \"channel\": \"/meta/connect\",\n" - + " \"id\": \"$id\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); - - server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages); - - server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n" - + " {\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"channel\": \"/meta/subscribe\",\n" - + " \"id\": \"$id\",\n" - + " \"subscription\": \"/topic/Account\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); - - server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n" - + " {\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"channel\": \"/meta/unsubscribe\",\n" - + " \"id\": \"$id\",\n" - + " \"subscription\": \"/topic/Account\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); - - server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n" - + " {\n" - + " \"channel\": \"/meta/disconnect\",\n" - + " \"clientId\": \"client-id\"\n" - + " }\n" - + "]"); + messages); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", + """ + [ + { + "channel": "/meta/disconnect", + "clientId": "client-id" + } + ]"""); server.replyTo("GET", "/services/oauth2/revoke", 200); @@ -178,16 +172,13 @@ public class SubscriptionHelperManualIT { salesforce.setRefreshToken("refreshToken"); salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN); salesforce.setConfig(config); - salesforce.start(); subscription = new SubscriptionHelper(salesforce); + subscription.start(); } @BeforeEach - public void cleanSlate() throws CamelException { - if (toUnsubscribe != null) { - subscription.unsubscribe("Account", toUnsubscribe); - } + public void resetServer() throws CamelException { server.reset(); } @@ -200,93 +191,80 @@ public class SubscriptionHelperManualIT { @Test void shouldResubscribeOnConnectionFailures() throws InterruptedException { - // handshake and connect - subscription.start(); - - final StreamingApiConsumer consumer - = toUnsubscribe = mock(StreamingApiConsumer.class, "shouldResubscribeOnConnectionFailures:consumer"); - - final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint"); - - // subscribe - when(consumer.getTopicName()).thenReturn("Account"); - - when(consumer.getEndpoint()).thenReturn(endpoint); - when(endpoint.getConfiguration()).thenReturn(config); - when(endpoint.getComponent()).thenReturn(salesforce); - when(endpoint.getTopicName()).thenReturn("Account"); - - subscription.subscribe("Account", consumer); - - // push one message so we know connection is established and consumer - // receives notifications - messages.add("[\n" - + " {\n" - + " \"data\": {\n" - + " \"event\": {\n" - + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" - + " \"replayId\": 1,\n" - + " \"type\": \"created\"\n" - + " },\n" - + " \"sobject\": {\n" - + " \"Id\": \"0011n00002XWMgVAAX\",\n" - + " \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n" - + " }\n" - + " },\n" - + " \"channel\": \"/topic/Account\"\n" - + " },\n" - + " {\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"channel\": \"/meta/connect\",\n" - + " \"id\": \"$id\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); - - verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class), - messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1")); - - // send failed connection message w/o reconnect advice so we handshake again - - messages.add("[\n" + - " {\n" + - " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + - " \"channel\": \"/meta/connect\",\n" + - " \"id\": \"$id\",\n" + - " \"successful\": false,\n" + - " \"advice\": {\n" + - " \"reconnect\": \"none\"\n" + - " }\n" + - " }\n" + - "]"); - - // queue next message for when the client recovers - messages.add("[\n" - + " {\n" - + " \"data\": {\n" - + " \"event\": {\n" - + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" - + " \"replayId\": 2,\n" - + " \"type\": \"created\"\n" - + " },\n" - + " \"sobject\": {\n" - + " \"Id\": \"0011n00002XWMgVAAX\",\n" - + " \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n" - + " }\n" - + " },\n" - + " \"channel\": \"/topic/Account\"\n" - + " },\n" - + " {\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"channel\": \"/meta/connect\",\n" - + " \"id\": \"$id\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); - - // assert last message was received, recovery can take a bit - verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class), - messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2")); + var consumer = createConsumer("Account"); + subscription.subscribe(consumer); + + messages.add(""" + [ + { + "data": { + "event": { + "createdDate": "2020-12-11T13:44:56.891Z", + "replayId": 1, + "type": "created" + }, + "sobject": { + "Id": "0011n00002XWMgVAAX", + "Name": "shouldResubscribeOnConnectionFailures 1" + } + }, + "channel": "/topic/Account" + }, + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/connect", + "id": "$id", + "successful": true + } + ]"""); + verify(consumer, Mockito.timeout(10000)).processMessage(any(ClientSessionChannel.class), + messageWithName("shouldResubscribeOnConnectionFailures 1")); + + subscription.client.getChannel("/meta/subscribe").addListener( + (MessageListener) (clientSessionChannel, message) -> { + var subscription = (String) message.get("subscription"); + if (subscription != null && subscription.contains("Account")) { + messages.add(""" + [ + { + "data": { + "event": { + "createdDate": "2020-12-11T13:44:57.891Z", + "replayId": 2, + "type": "created" + }, + "sobject": { + "Id": "0011n00002XWMgVAAX", + "Name": "shouldResubscribeOnConnectionFailures 2" + } + }, + "channel": "/topic/Account" + }, + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/connect", + "id": "$id", + "successful": true + } + ]"""); + } + }); + subscription.client.disconnect(10000); + messages.add(""" + [ + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/connect", + "id": "$id", + "successful": false, + "advice": { + "reconnect": "none" + } + } + ]"""); + + verify(consumer, timeout(20000)).processMessage(any(ClientSessionChannel.class), + messageWithName("shouldResubscribeOnConnectionFailures 2")); verify(consumer, atLeastOnce()).getEndpoint(); verify(consumer, atLeastOnce()).getTopicName(); @@ -294,86 +272,168 @@ public class SubscriptionHelperManualIT { } @Test - void shouldResubscribeOnHelperRestart() { - // handshake and connect - subscription.start(); - - final StreamingApiConsumer consumer - = toUnsubscribe = mock(StreamingApiConsumer.class, "shouldResubscribeOnHelperRestart:consumer"); + void shouldResubscribeOnSubscriptionFailure() { + var consumer = createConsumer("Contact"); + var subscribeAttempts = new AtomicInteger(0); + subscription.client.getChannel("/meta/subscribe").addListener( + (MessageListener) (clientSessionChannel, message) -> { + var subscription = (String) message.get("subscription"); + if (subscription != null && subscription.contains("Contact")) { + subscribeAttempts.incrementAndGet(); + } + }); - final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnHelperRestart:endpoint"); + server.reset(); + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", + s -> s.contains("/topic/Contact"), + """ + [ + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8aaa", + "channel": "/meta/subscribe", + "id": "$id", + "subscription": "/topic/Contact", + "successful": false + } + ]"""); + subscription.subscribe(consumer); + await().atMost(10, SECONDS).until(() -> subscribeAttempts.get() == 1); + assertThat(subscription.client.getChannel("/topic/Contact").getSubscribers(), hasSize(0)); - // subscribe - when(consumer.getTopicName()).thenReturn("Account"); + server.reset(); + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", + s -> s.contains("/topic/Contact"), + """ + [ + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8bbb", + "channel": "/meta/subscribe", + "id": "$id", + "subscription": "/topic/Contact", + "successful": true + } + ]"""); + messages.add(""" + [ + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/connect", + "id": "$id", + "successful": true + } + ]"""); + await().atMost(10, SECONDS).until(() -> subscribeAttempts.get() == 2); + assertThat(subscription.client.getChannel("/topic/Contact").getSubscribers(), hasSize(1)); + } - when(consumer.getEndpoint()).thenReturn(endpoint); - when(endpoint.getConfiguration()).thenReturn(config); - when(endpoint.getComponent()).thenReturn(salesforce); - when(endpoint.getTopicName()).thenReturn("Account"); - - subscription.subscribe("Account", consumer); - - // push one message so we know connection is established and consumer - // receives notifications - messages.add("[\n" - + " {\n" - + " \"data\": {\n" - + " \"event\": {\n" - + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" - + " \"replayId\": 1,\n" - + " \"type\": \"created\"\n" - + " },\n" - + " \"sobject\": {\n" - + " \"Id\": \"0011n00002XWMgVAAX\",\n" - + " \"Name\": \"shouldResubscribeOnHelperRestart 1\"\n" - + " }\n" - + " },\n" - + " \"channel\": \"/topic/Account\"\n" - + " },\n" - + " {\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"channel\": \"/meta/connect\",\n" - + " \"id\": \"$id\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); - verify(consumer, timeout(100)).processMessage(any(ClientSessionChannel.class), - messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 1")); + @Test + void shouldResubscribeOnHelperRestart() { + var consumer = createConsumer("Person"); + subscription.subscribe(consumer); + + messages.add(""" + [ + { + "data": { + "event": { + "createdDate": "2020-12-11T13:44:56.891Z", + "replayId": 1, + "type": "created" + }, + "sobject": { + "Id": "0011n00002XWMgVAAX", + "Name": "shouldResubscribeOnHelperRestart 1" + } + }, + "channel": "/topic/Person" + }, + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/connect", + "id": "$id", + "successful": true + } + ]"""); + verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class), + messageWithName("shouldResubscribeOnHelperRestart 1")); // stop and start the subscription helper subscription.stop(); subscription.start(); // queue next message for when the client recovers - messages.add("[\n" - + " {\n" - + " \"data\": {\n" - + " \"event\": {\n" - + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" - + " \"replayId\": 2,\n" - + " \"type\": \"created\"\n" - + " },\n" - + " \"sobject\": {\n" - + " \"Id\": \"0011n00002XWMgVAAX\",\n" - + " \"Name\": \"shouldResubscribeOnHelperRestart 2\"\n" - + " }\n" - + " },\n" - + " \"channel\": \"/topic/Account\"\n" - + " },\n" - + " {\n" - + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" - + " \"channel\": \"/meta/connect\",\n" - + " \"id\": \"$id\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); + messages.add(""" + [ + { + "data": { + "event": { + "createdDate": "2020-12-11T13:44:56.891Z", + "replayId": 2, + "type": "created" + }, + "sobject": { + "Id": "0011n00002XWMgVAAX", + "Name": "shouldResubscribeOnHelperRestart 2" + } + }, + "channel": "/topic/Person" + }, + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/connect", + "id": "$id", + "successful": true + } + ]"""); // assert last message was received, recovery can take a bit - verify(consumer, timeout(2000)).processMessage(any(ClientSessionChannel.class), - messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 2")); + verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class), + messageWithName("shouldResubscribeOnHelperRestart 2")); verify(consumer, atLeastOnce()).getEndpoint(); verify(consumer, atLeastOnce()).getTopicName(); verifyNoMoreInteractions(consumer); } + + private StreamingApiConsumer createConsumer(String topic) { + var endpoint = createAccountEndpoint(topic); + var consumer = mock(StreamingApiConsumer.class, topic + ":consumer"); + when(consumer.getTopicName()).thenReturn(topic); + when(consumer.getEndpoint()).thenReturn(endpoint); + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", + s -> s.contains("\"subscription\":\"/topic/" + topic + "\""), + """ + [ + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8qqq", + "channel": "/meta/subscribe", + "id": "$id", + "subscription": "/topic/""" + topic + "\"," + """ + "successful": true + } + ]"""); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", + s -> s.contains("\"subscription\":\"/topic/" + topic + "\""), + """ + [ + { + "clientId": "5ra4927ikfky6cb12juthkpofeu8", + "channel": "/meta/unsubscribe", + "id": "$id", + "subscription": "/topic/""" + topic + "\"," + """ + "successful": true + } + ]"""); + return consumer; + } + + private SalesforceEndpoint createAccountEndpoint(String topic) { + SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, topic + ":endpoint"); + when(endpoint.getConfiguration()).thenReturn(config); + when(endpoint.getComponent()).thenReturn(salesforce); + when(endpoint.getTopicName()).thenReturn(topic); + return endpoint; + } + } diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java index 5670219c21e..42d68baeee0 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java @@ -29,10 +29,14 @@ import org.apache.camel.component.salesforce.SalesforceLoginConfig; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.cometd.client.BayeuxClient; +import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.Test; import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor; import static org.assertj.core.api.Assertions.assertThat; +import static org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.mock; @@ -166,4 +170,21 @@ public class SubscriptionHelperTest { assertNotNull(bayeuxClient); verify(session).login(null); } + + @Test + public void defaultLongPollingTimeoutShouldBeGreaterThanSalesforceTimeout() throws SalesforceException { + var endpointConfig = new SalesforceEndpointConfig(); + endpointConfig.setHttpClient(mock(SalesforceHttpClient.class)); + var session = mock(SalesforceSession.class); + var component = mock(SalesforceComponent.class); + when(component.getLoginConfig()).thenReturn(new SalesforceLoginConfig()); + when(component.getConfig()).thenReturn(endpointConfig); + when(component.getSession()).thenReturn(session); + var bayeuxClient = SubscriptionHelper.createClient(component, session); + + var longPollingTimeout = bayeuxClient.getTransport("long-polling").getOption(MAX_NETWORK_DELAY_OPTION); + + MatcherAssert.assertThat(longPollingTimeout, instanceOf(Integer.class)); + MatcherAssert.assertThat((Integer) longPollingTimeout, greaterThan(110000)); + } }