This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 40d4e5d2fe9 [CAMEL-21740] Check initial replay id validity on consumer 
startup (#17265)
40d4e5d2fe9 is described below

commit 40d4e5d2fe9b166f0225c7290c07af346d42a271
Author: jubar <j_b_...@proton.me>
AuthorDate: Wed Feb 26 08:49:53 2025 +0100

    [CAMEL-21740] Check initial replay id validity on consumer startup (#17265)
    
    * Revert "CAMEL-21740: added error handling for corrupt replay ids"
    
    This reverts commit 2ac45377c99a6a4b568af3055f6f719547848850.
    
    * CAMEL-21740: check for corrupt initialReplayId on route startup
    
    * CAMEL-21740: only throw exception in checkInitialReplayIdValidity() for 
corrupt replay id
    
    * CAMEL-21740: fix PubSubApiTest
---
 .../component/salesforce/PubSubApiConsumer.java    |  2 +-
 .../internal/client/PubSubApiClient.java           | 81 ++++++++++++++++++----
 .../camel/component/salesforce/PubSubApiTest.java  | 17 +++--
 3 files changed, 77 insertions(+), 23 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
index 6db8ce247e3..ab7310e3eec 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -88,7 +88,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
         
this.pubSubClient.setUsePlainTextConnection(this.usePlainTextConnection);
 
         ServiceHelper.startService(pubSubClient);
-        pubSubClient.subscribe(this, initialReplayPreset, initialReplayId);
+        pubSubClient.subscribe(this, initialReplayPreset, initialReplayId, 
true);
     }
 
     @Override
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index 72ae4d5963a..ef16d7ecf4f 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -24,8 +24,11 @@ import java.util.Base64;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import com.salesforce.eventbus.protobuf.ConsumerEvent;
 import com.salesforce.eventbus.protobuf.FetchRequest;
@@ -145,7 +148,8 @@ public class PubSubApiClient extends ServiceSupport {
         return publishResults;
     }
 
-    public void subscribe(PubSubApiConsumer consumer, ReplayPreset 
replayPreset, String initialReplayId) {
+    public void subscribe(
+            PubSubApiConsumer consumer, ReplayPreset replayPreset, String 
initialReplayId, boolean initialSubscribe) {
         LOG.debug("Starting subscribe {}", consumer.getTopic());
         this.initialReplayPreset = replayPreset;
         this.initialReplayId = initialReplayId;
@@ -153,11 +157,14 @@ public class PubSubApiClient extends ServiceSupport {
             throw new RuntimeException("initialReplayId is required for 
ReplayPreset.CUSTOM");
         }
 
+        String topic = consumer.getTopic();
         ByteString replayId = null;
         if (initialReplayId != null) {
             replayId = base64DecodeToByteString(initialReplayId);
+            if (initialSubscribe) {
+                checkInitialReplayIdValidity(topic, replayId);
+            }
         }
-        String topic = consumer.getTopic();
         LOG.info("Subscribing to topic: {}.", topic);
         final FetchResponseObserver responseObserver = new 
FetchResponseObserver(consumer);
         StreamObserver<FetchRequest> serverStream = 
asyncStub.subscribe(responseObserver);
@@ -174,6 +181,58 @@ public class PubSubApiClient extends ServiceSupport {
         serverStream.onNext(fetchRequestBuilder.build());
     }
 
+    public void checkInitialReplayIdValidity(String topic, ByteString 
replayId) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Checking initialReplayId {} for topic {}", 
base64EncodeByteString(replayId), topic);
+        }
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final StreamObserver<FetchResponse> responseObserver = new 
StreamObserver<>() {
+
+            @Override
+            public void onNext(FetchResponse value) {
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                if (t instanceof StatusRuntimeException e) {
+                    Metadata trailers = e.getTrailers();
+                    if (trailers != null && PUBSUB_ERROR_CORRUPTED_REPLAY_ID
+                            .equals(trailers.get(Metadata.Key.of("error-code", 
Metadata.ASCII_STRING_MARSHALLER)))) {
+                        error.set(t);
+                    }
+                }
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        };
+        StreamObserver<FetchRequest> serverStream = 
asyncStub.subscribe(responseObserver);
+        FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder()
+                .setReplayPreset(ReplayPreset.CUSTOM)
+                .setTopicName(topic)
+                .setNumRequested(1)
+                .setReplayId(replayId);
+        serverStream.onNext(fetchRequestBuilder.build());
+
+        try {
+            if (!Uninterruptibles.awaitUninterruptibly(latch, 10, 
TimeUnit.SECONDS)) {
+                throw new RuntimeException("timeout while checking 
initialReplayId.");
+            }
+        } finally {
+            serverStream.onCompleted();
+        }
+
+        if (error.get() != null) {
+            throw new RuntimeException(
+                    "initialReplayId " + base64EncodeByteString(replayId) + " 
is not valid",
+                    error.get());
+        }
+    }
+
     public TopicInfo getTopicInfo(String name) {
         return topicInfoCache.computeIfAbsent(name,
                 topic -> 
blockingStub.getTopic(TopicRequest.newBuilder().setTopicName(topic).build()));
@@ -346,14 +405,10 @@ public class PubSubApiClient extends ServiceSupport {
                             LOG.debug("logged in {}", consumer.getTopic());
                         }
                         case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> {
-                            if (initialReplayPreset == ReplayPreset.CUSTOM) {
-                                LOG.error("replay id: " + replayId + " is 
corrupt.");
-                            } else {
-                                LOG.error("replay id: " + replayId
-                                          + " is corrupt. Trying to recover by 
resubscribing with LATEST replay preset");
-                                replayId = null;
-                                initialReplayPreset = ReplayPreset.LATEST;
-                            }
+                            LOG.error("replay id: " + replayId
+                                      + " is corrupt. Trying to recover by 
resubscribing with LATEST replay preset");
+                            replayId = null;
+                            initialReplayPreset = ReplayPreset.LATEST;
                         }
                         default -> LOG.error("unexpected errorCode: {}", 
errorCode);
                     }
@@ -373,12 +428,12 @@ public class PubSubApiClient extends ServiceSupport {
                 throw new RuntimeException(e);
             }
             if (replayId != null) {
-                subscribe(consumer, ReplayPreset.CUSTOM, replayId);
+                subscribe(consumer, ReplayPreset.CUSTOM, replayId, false);
             } else {
                 if (initialReplayPreset == ReplayPreset.CUSTOM) {
-                    subscribe(consumer, initialReplayPreset, initialReplayId);
+                    subscribe(consumer, initialReplayPreset, initialReplayId, 
false);
                 } else {
-                    subscribe(consumer, initialReplayPreset, null);
+                    subscribe(consumer, initialReplayPreset, null, false);
                 }
             }
         }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
index 52447d76abd..8dbd1adb09f 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -65,11 +64,11 @@ public class PubSubApiTest {
                 port, 1000, 10000, true));
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.LATEST, null);
+        client.subscribe(consumer, ReplayPreset.LATEST, null, true);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.LATEST, null);
-        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.CUSTOM, "MTIz");
+        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.LATEST, null, true);
+        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.CUSTOM, "MTIz", false);
     }
 
     @Test
@@ -95,10 +94,10 @@ public class PubSubApiTest {
                 port, 1000, 10000, true));
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.CUSTOM, "initial");
+        client.subscribe(consumer, ReplayPreset.CUSTOM, "initial", false);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.CUSTOM, "initial");
+        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.CUSTOM, "initial", false);
     }
 
     @Test
@@ -124,12 +123,12 @@ public class PubSubApiTest {
                 port, 1000, 10000, true));
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.LATEST, null);
+        client.subscribe(consumer, ReplayPreset.LATEST, null, false);
 
         Thread.sleep(1000);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.LATEST, null);
+        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.LATEST, null, false);
     }
 
     @Test
@@ -155,7 +154,7 @@ public class PubSubApiTest {
                 port, 1000, 10000, true);
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.LATEST, null);
+        client.subscribe(consumer, ReplayPreset.LATEST, null, true);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
     }

Reply via email to