This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d49586b851d8e096934b3ec8137a135d4643bcf3
Author: Jeremy Ross <jeremy.g.r...@gmail.com>
AuthorDate: Wed Oct 23 13:07:52 2024 -0500

    CAMEL-21379: camel-salesforce: introduce delay in pub/sub reconnect attempts
---
 .../salesforce/internal/client/PubSubApiClient.java         | 13 ++++++++++++-
 .../apache/camel/component/salesforce/PubSubApiTest.java    |  6 ++++--
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index d6caba9b317..8ba356d4814 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -80,6 +80,7 @@ public class PubSubApiClient extends ServiceSupport {
 
     private final long backoffIncrement;
     private final long maxBackoff;
+    private long reconnectDelay;
     private final String pubSubHost;
     private final int pubSubPort;
 
@@ -106,6 +107,7 @@ public class PubSubApiClient extends ServiceSupport {
         this.pubSubPort = pubSubPort;
         this.maxBackoff = maxBackoff;
         this.backoffIncrement = backoffIncrement;
+        this.reconnectDelay = backoffIncrement;
     }
 
     public 
List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult> 
publishMessage(
@@ -289,6 +291,9 @@ public class PubSubApiClient extends ServiceSupport {
 
         @Override
         public void onNext(FetchResponse fetchResponse) {
+            // reset reconnect delay in case we previously had errors
+            reconnectDelay = backoffIncrement;
+
             String topic = consumer.getTopic();
 
             LOG.debug("Received {} events on topic: {}", 
fetchResponse.getEventsList().size(), topic);
@@ -339,11 +344,17 @@ public class PubSubApiClient extends ServiceSupport {
             } else {
                 LOG.error("An unexpected error occurred.", throwable);
             }
-            LOG.debug("Attempting subscribe after error");
             resubscribeOnError();
         }
 
         private void resubscribeOnError() {
+            try {
+                LOG.debug("Will attempt resubscribe in {} ms", reconnectDelay);
+                Thread.sleep(reconnectDelay);
+                reconnectDelay = reconnectDelay + backoffIncrement;
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
             if (replayId != null) {
                 subscribe(consumer, ReplayPreset.CUSTOM, replayId);
             } else {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
index 38b9533411f..04cb67b3cd5 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -98,7 +98,7 @@ public class PubSubApiTest {
         client.subscribe(consumer, ReplayPreset.CUSTOM, "initial");
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, times(2)).subscribe(consumer, ReplayPreset.CUSTOM, 
"initial");
+        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.CUSTOM, "initial");
     }
 
     @Test
@@ -126,8 +126,10 @@ public class PubSubApiTest {
         client.start();
         client.subscribe(consumer, ReplayPreset.LATEST, null);
 
+        Thread.sleep(1000);
+
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, times(2)).subscribe(consumer, ReplayPreset.LATEST, 
null);
+        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.LATEST, null);
     }
 
     @Test

Reply via email to