This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch replyid in repository https://gitbox.apache.org/repos/asf/camel.git
commit a5085ef1db6760df7617c1823021a35ed05a4ef9 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Apr 8 16:12:38 2025 +0200 CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesforce cannot subscribe due to invalid initial replyId from a preconfigured initialReplyIdMap. --- .../component/salesforce/internal/streaming/ReplayExtension.java | 8 +++++--- .../salesforce/internal/streaming/SubscriptionHelper.java | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java index 0d2d32f012e..a43d8a8e38f 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java @@ -36,9 +36,6 @@ import org.cometd.bayeux.client.ClientSession.Extension; /** * The Bayeux extension for replay - * - * @author hal.hildebrand - * @since API v37.0 */ public class ReplayExtension implements Extension { private static final String EXTENSION_NAME = "replay"; @@ -52,6 +49,11 @@ public class ReplayExtension implements Extension { dataMap.putIfAbsent(channelName, replayId); } + public void setReplayId(final String channelName, final long replayId) { + // force setting with a specific value + dataMap.put(channelName, replayId); + } + @Override public boolean rcv(ClientSession session, Message.Mutable message) { Long replayId = getReplayId(message); diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 894fcb16820..737d85327ad 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -228,10 +228,10 @@ public class SubscriptionHelper extends ServiceSupport { } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) { abort = false; long fallBackReplayId - = ((SalesforceEndpoint) firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId(); + = firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId(); LOG.warn(error); LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName); - REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, fallBackReplayId); + REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId); for (var consumer : consumers) { subscribe(consumer); }