This is an automated email from the ASF dual-hosted git repository. jeremyross pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit d49586b851d8e096934b3ec8137a135d4643bcf3 Author: Jeremy Ross <jeremy.g.r...@gmail.com> AuthorDate: Wed Oct 23 13:07:52 2024 -0500 CAMEL-21379: camel-salesforce: introduce delay in pub/sub reconnect attempts --- .../salesforce/internal/client/PubSubApiClient.java | 13 ++++++++++++- .../apache/camel/component/salesforce/PubSubApiTest.java | 6 ++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java index d6caba9b317..8ba356d4814 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java @@ -80,6 +80,7 @@ public class PubSubApiClient extends ServiceSupport { private final long backoffIncrement; private final long maxBackoff; + private long reconnectDelay; private final String pubSubHost; private final int pubSubPort; @@ -106,6 +107,7 @@ public class PubSubApiClient extends ServiceSupport { this.pubSubPort = pubSubPort; this.maxBackoff = maxBackoff; this.backoffIncrement = backoffIncrement; + this.reconnectDelay = backoffIncrement; } public List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult> publishMessage( @@ -289,6 +291,9 @@ public class PubSubApiClient extends ServiceSupport { @Override public void onNext(FetchResponse fetchResponse) { + // reset reconnect delay in case we previously had errors + reconnectDelay = backoffIncrement; + String topic = consumer.getTopic(); LOG.debug("Received {} events on topic: {}", fetchResponse.getEventsList().size(), topic); @@ -339,11 +344,17 @@ public class PubSubApiClient extends ServiceSupport { } else { LOG.error("An unexpected error occurred.", throwable); } - LOG.debug("Attempting subscribe after error"); resubscribeOnError(); } private void resubscribeOnError() { + try { + LOG.debug("Will attempt resubscribe in {} ms", reconnectDelay); + Thread.sleep(reconnectDelay); + reconnectDelay = reconnectDelay + backoffIncrement; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } if (replayId != null) { subscribe(consumer, ReplayPreset.CUSTOM, replayId); } else { diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java index 38b9533411f..04cb67b3cd5 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java @@ -98,7 +98,7 @@ public class PubSubApiTest { client.subscribe(consumer, ReplayPreset.CUSTOM, "initial"); verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); - verify(client, times(2)).subscribe(consumer, ReplayPreset.CUSTOM, "initial"); + verify(client, timeout(5000).times(2)).subscribe(consumer, ReplayPreset.CUSTOM, "initial"); } @Test @@ -126,8 +126,10 @@ public class PubSubApiTest { client.start(); client.subscribe(consumer, ReplayPreset.LATEST, null); + Thread.sleep(1000); + verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); - verify(client, times(2)).subscribe(consumer, ReplayPreset.LATEST, null); + verify(client, timeout(5000).times(2)).subscribe(consumer, ReplayPreset.LATEST, null); } @Test