This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.10.x by this push:
     new e7164cb395a CAMEL-21901: camel-salesforce: Fix using fallbackReplyId 
when salesfo… (#17696)
e7164cb395a is described below

commit e7164cb395acc7b3d86ece56e40872ccce63d590
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Apr 10 13:45:19 2025 +0200

    CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesfo… 
(#17696)
    
    * CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesforce 
cannot subscribe due to invalid initial replyId from a preconfigured 
initialReplyIdMap.
---
 .../internal/streaming/ReplayExtension.java        |  8 +++--
 .../internal/streaming/SubscriptionHelper.java     |  4 +--
 .../internal/streaming/SubscriptionHelperTest.java | 37 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index 0d2d32f012e..a43d8a8e38f 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -36,9 +36,6 @@ import org.cometd.bayeux.client.ClientSession.Extension;
 
 /**
  * The Bayeux extension for replay
- *
- * @author hal.hildebrand
- * @since  API v37.0
  */
 public class ReplayExtension implements Extension {
     private static final String EXTENSION_NAME = "replay";
@@ -52,6 +49,11 @@ public class ReplayExtension implements Extension {
         dataMap.putIfAbsent(channelName, replayId);
     }
 
+    public void setReplayId(final String channelName, final long replayId) {
+        // force setting with a specific value
+        dataMap.put(channelName, replayId);
+    }
+
     @Override
     public boolean rcv(ClientSession session, Message.Mutable message) {
         Long replayId = getReplayId(message);
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 894fcb16820..737d85327ad 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -228,10 +228,10 @@ public class SubscriptionHelper extends ServiceSupport {
         } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
             abort = false;
             long fallBackReplayId
-                    = ((SalesforceEndpoint) 
firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId();
+                    = 
firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
             LOG.warn(error);
             LOG.warn("Falling back to replayId {} for channel {}", 
fallBackReplayId, channelName);
-            REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, 
fallBackReplayId);
+            REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
             for (var consumer : consumers) {
                 subscribe(consumer);
             }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index 42d68baeee0..fd72d7345c8 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.salesforce.internal.streaming;
 
+import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,10 +29,12 @@ import 
org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.SalesforceLoginConfig;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.util.ReflectionHelper;
 import org.cometd.client.BayeuxClient;
 import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION;
 import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
 import static org.assertj.core.api.Assertions.assertThat;
 import static 
org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
@@ -187,4 +190,38 @@ public class SubscriptionHelperTest {
         MatcherAssert.assertThat(longPollingTimeout, 
instanceOf(Integer.class));
         MatcherAssert.assertThat((Integer) longPollingTimeout, 
greaterThan(110000));
     }
+
+    @Test
+    public void fallbackReplyId() throws Exception {
+        final SalesforceEndpointConfig componentConfig = new 
SalesforceEndpointConfig();
+        componentConfig.setFallBackReplayId(-2L);
+
+        final SalesforceEndpointConfig endpointConfig = new 
SalesforceEndpointConfig();
+        endpointConfig.setDefaultReplayId(-1L);
+        
endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 
2L));
+
+        final SalesforceComponent component = mock(SalesforceComponent.class);
+        when(component.getConfig()).thenReturn(componentConfig);
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
+        when(endpoint.getReplayId()).thenReturn(null);
+        when(endpoint.getComponent()).thenReturn(component);
+        when(endpoint.getConfiguration()).thenReturn(endpointConfig);
+
+        assertEquals(Optional.of(2L), determineReplayIdFor(endpoint, 
"my-topic-1"),
+                "Expecting replayId for `my-topic-1` to be 2, from initial 
reply id map");
+
+        REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L);
+        REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L);
+
+        // should still be 3L
+        Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap");
+        Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION);
+        assertEquals(3L, m.get("my-topic-1"));
+
+        // there is some subscription error due to INVALID_REPLAY_ID_PATTERN 
so we force setting another reply id
+        REPLAY_EXTENSION.setReplayId("my-topic-1", -2L);
+        assertEquals(-2L, m.get("my-topic-1"));
+    }
+
 }

Reply via email to