This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push: new e7164cb395a CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesfo… (#17696) e7164cb395a is described below commit e7164cb395acc7b3d86ece56e40872ccce63d590 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 10 13:45:19 2025 +0200 CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesfo… (#17696) * CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesforce cannot subscribe due to invalid initial replyId from a preconfigured initialReplyIdMap. --- .../internal/streaming/ReplayExtension.java | 8 +++-- .../internal/streaming/SubscriptionHelper.java | 4 +-- .../internal/streaming/SubscriptionHelperTest.java | 37 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java index 0d2d32f012e..a43d8a8e38f 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java @@ -36,9 +36,6 @@ import org.cometd.bayeux.client.ClientSession.Extension; /** * The Bayeux extension for replay - * - * @author hal.hildebrand - * @since API v37.0 */ public class ReplayExtension implements Extension { private static final String EXTENSION_NAME = "replay"; @@ -52,6 +49,11 @@ public class ReplayExtension implements Extension { dataMap.putIfAbsent(channelName, replayId); } + public void setReplayId(final String channelName, final long replayId) { + // force setting with a specific value + dataMap.put(channelName, replayId); + } + @Override public boolean rcv(ClientSession session, Message.Mutable message) { Long replayId = getReplayId(message); diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 894fcb16820..737d85327ad 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -228,10 +228,10 @@ public class SubscriptionHelper extends ServiceSupport { } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) { abort = false; long fallBackReplayId - = ((SalesforceEndpoint) firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId(); + = firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId(); LOG.warn(error); LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName); - REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, fallBackReplayId); + REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId); for (var consumer : consumers) { subscribe(consumer); } diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java index 42d68baeee0..fd72d7345c8 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.salesforce.internal.streaming; +import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -28,10 +29,12 @@ import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.SalesforceLoginConfig; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.internal.SalesforceSession; +import org.apache.camel.util.ReflectionHelper; import org.cometd.client.BayeuxClient; import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.Test; +import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION; import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor; import static org.assertj.core.api.Assertions.assertThat; import static org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION; @@ -187,4 +190,38 @@ public class SubscriptionHelperTest { MatcherAssert.assertThat(longPollingTimeout, instanceOf(Integer.class)); MatcherAssert.assertThat((Integer) longPollingTimeout, greaterThan(110000)); } + + @Test + public void fallbackReplyId() throws Exception { + final SalesforceEndpointConfig componentConfig = new SalesforceEndpointConfig(); + componentConfig.setFallBackReplayId(-2L); + + final SalesforceEndpointConfig endpointConfig = new SalesforceEndpointConfig(); + endpointConfig.setDefaultReplayId(-1L); + endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 2L)); + + final SalesforceComponent component = mock(SalesforceComponent.class); + when(component.getConfig()).thenReturn(componentConfig); + + final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class); + when(endpoint.getReplayId()).thenReturn(null); + when(endpoint.getComponent()).thenReturn(component); + when(endpoint.getConfiguration()).thenReturn(endpointConfig); + + assertEquals(Optional.of(2L), determineReplayIdFor(endpoint, "my-topic-1"), + "Expecting replayId for `my-topic-1` to be 2, from initial reply id map"); + + REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L); + REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L); + + // should still be 3L + Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap"); + Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION); + assertEquals(3L, m.get("my-topic-1")); + + // there is some subscription error due to INVALID_REPLAY_ID_PATTERN so we force setting another reply id + REPLAY_EXTENSION.setReplayId("my-topic-1", -2L); + assertEquals(-2L, m.get("my-topic-1")); + } + }