This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch replyid
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a5085ef1db6760df7617c1823021a35ed05a4ef9
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Apr 8 16:12:38 2025 +0200

    CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesforce 
cannot subscribe due to invalid initial replyId from a preconfigured 
initialReplyIdMap.
---
 .../component/salesforce/internal/streaming/ReplayExtension.java  | 8 +++++---
 .../salesforce/internal/streaming/SubscriptionHelper.java         | 4 ++--
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index 0d2d32f012e..a43d8a8e38f 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -36,9 +36,6 @@ import org.cometd.bayeux.client.ClientSession.Extension;
 
 /**
  * The Bayeux extension for replay
- *
- * @author hal.hildebrand
- * @since  API v37.0
  */
 public class ReplayExtension implements Extension {
     private static final String EXTENSION_NAME = "replay";
@@ -52,6 +49,11 @@ public class ReplayExtension implements Extension {
         dataMap.putIfAbsent(channelName, replayId);
     }
 
+    public void setReplayId(final String channelName, final long replayId) {
+        // force setting with a specific value
+        dataMap.put(channelName, replayId);
+    }
+
     @Override
     public boolean rcv(ClientSession session, Message.Mutable message) {
         Long replayId = getReplayId(message);
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 894fcb16820..737d85327ad 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -228,10 +228,10 @@ public class SubscriptionHelper extends ServiceSupport {
         } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
             abort = false;
             long fallBackReplayId
-                    = ((SalesforceEndpoint) 
firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId();
+                    = 
firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
             LOG.warn(error);
             LOG.warn("Falling back to replayId {} for channel {}", 
fallBackReplayId, channelName);
-            REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, 
fallBackReplayId);
+            REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
             for (var consumer : consumers) {
                 subscribe(consumer);
             }

Reply via email to