This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 68fe1d415c08 CAMEL-22830: avoid storing replayId in static map (#20777)
68fe1d415c08 is described below
commit 68fe1d415c087304c732206c9bc3b3264ff98b14
Author: François de Parscau <[email protected]>
AuthorDate: Mon Jan 12 16:11:15 2026 +0100
CAMEL-22830: avoid storing replayId in static map (#20777)
---
.../component/salesforce/SalesforceComponent.java | 2 +-
.../internal/streaming/SubscriptionHelper.java | 12 ++++++++----
.../internal/streaming/SubscriptionHelperTest.java | 20 ++++++++++++++------
3 files changed, 23 insertions(+), 11 deletions(-)
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 1fdd3a0f2b81..8224c736f693 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -521,7 +521,7 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
}
}
- public SubscriptionHelper getSubscriptionHelper() throws Exception {
+ public SubscriptionHelper getSubscriptionHelper() {
if (subscriptionHelper == null) {
// lazily create subscription helper
subscriptionHelper = new SubscriptionHelper(this);
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index f0c22e6454f8..26de3104ce4e 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -65,7 +65,7 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
public class SubscriptionHelper extends ServiceSupport {
- static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
+ private final ReplayExtension replayExtension = new ReplayExtension();
private static final Logger LOG =
LoggerFactory.getLogger(SubscriptionHelper.class);
@@ -231,7 +231,7 @@ public class SubscriptionHelper extends ServiceSupport {
=
firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
LOG.warn(error);
LOG.warn("Falling back to replayId {} for channel {}",
fallBackReplayId, channelName);
- REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
+ replayExtension.setReplayId(channelName, fallBackReplayId);
for (var consumer : consumers) {
subscribe(consumer);
}
@@ -408,7 +408,7 @@ public class SubscriptionHelper extends ServiceSupport {
BayeuxClient client = new BayeuxClient(getEndpointUrl(component),
transport);
// added eagerly to check for support during handshake
- client.addExtension(REPLAY_EXTENSION);
+
client.addExtension(component.getSubscriptionHelper().getReplayExtension());
return client;
}
@@ -439,6 +439,10 @@ public class SubscriptionHelper extends ServiceSupport {
}
}
+ ReplayExtension getReplayExtension() {
+ return replayExtension;
+ }
+
private static boolean isTemporaryError(Message message) {
String failureReason = getFailureReason(message);
return failureReason != null &&
failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
@@ -465,7 +469,7 @@ public class SubscriptionHelper extends ServiceSupport {
final Long replayIdValue = replayId.get();
- REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue);
+ replayExtension.setReplayIdIfAbsent(channelName, replayIdValue);
}
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index fd72d7345c86..963e9d8fbe6b 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -34,7 +34,6 @@ import org.cometd.client.BayeuxClient;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
-import static
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION;
import static
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
import static org.assertj.core.api.Assertions.assertThat;
import static
org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
@@ -144,6 +143,8 @@ public class SubscriptionHelperTest {
when(component.getLoginConfig()).thenReturn(loginConfig);
when(component.getConfig()).thenReturn(endpointConfig);
when(component.getSession()).thenReturn(session);
+ final SubscriptionHelper subscriptionHelper = new
SubscriptionHelper(component);
+ when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component,
session);
@@ -167,6 +168,8 @@ public class SubscriptionHelperTest {
when(component.getLoginConfig()).thenReturn(loginConfig);
when(component.getConfig()).thenReturn(endpointConfig);
when(component.getSession()).thenReturn(session);
+ final SubscriptionHelper subscriptionHelper = new
SubscriptionHelper(component);
+ when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component,
session);
@@ -183,6 +186,8 @@ public class SubscriptionHelperTest {
when(component.getLoginConfig()).thenReturn(new
SalesforceLoginConfig());
when(component.getConfig()).thenReturn(endpointConfig);
when(component.getSession()).thenReturn(session);
+ final SubscriptionHelper subscriptionHelper = new
SubscriptionHelper(component);
+ when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
var bayeuxClient = SubscriptionHelper.createClient(component, session);
var longPollingTimeout =
bayeuxClient.getTransport("long-polling").getOption(MAX_NETWORK_DELAY_OPTION);
@@ -207,20 +212,23 @@ public class SubscriptionHelperTest {
when(endpoint.getReplayId()).thenReturn(null);
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(endpointConfig);
+ final SubscriptionHelper subscriptionHelper = new
SubscriptionHelper(component);
+ when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
assertEquals(Optional.of(2L), determineReplayIdFor(endpoint,
"my-topic-1"),
"Expecting replayId for `my-topic-1` to be 2, from initial
reply id map");
- REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L);
- REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L);
+ ReplayExtension replayExtension =
component.getSubscriptionHelper().getReplayExtension();
+ replayExtension.setReplayIdIfAbsent("my-topic-1", 3L);
+ replayExtension.setReplayIdIfAbsent("my-topic-1", 4L);
// should still be 3L
- Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap");
- Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION);
+ Field f = replayExtension.getClass().getDeclaredField("dataMap");
+ Map m = (Map) ReflectionHelper.getField(f, replayExtension);
assertEquals(3L, m.get("my-topic-1"));
// there is some subscription error due to INVALID_REPLAY_ID_PATTERN
so we force setting another reply id
- REPLAY_EXTENSION.setReplayId("my-topic-1", -2L);
+ replayExtension.setReplayId("my-topic-1", -2L);
assertEquals(-2L, m.get("my-topic-1"));
}