This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 40d4e5d2fe9 [CAMEL-21740] Check initial replay id validity on consumer startup (#17265) 40d4e5d2fe9 is described below commit 40d4e5d2fe9b166f0225c7290c07af346d42a271 Author: jubar <j_b_...@proton.me> AuthorDate: Wed Feb 26 08:49:53 2025 +0100 [CAMEL-21740] Check initial replay id validity on consumer startup (#17265) * Revert "CAMEL-21740: added error handling for corrupt replay ids" This reverts commit 2ac45377c99a6a4b568af3055f6f719547848850. * CAMEL-21740: check for corrupt initialReplayId on route startup * CAMEL-21740: only throw exception in checkInitialReplayIdValidity() for corrupt replay id * CAMEL-21740: fix PubSubApiTest --- .../component/salesforce/PubSubApiConsumer.java | 2 +- .../internal/client/PubSubApiClient.java | 81 ++++++++++++++++++---- .../camel/component/salesforce/PubSubApiTest.java | 17 +++-- 3 files changed, 77 insertions(+), 23 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java index 6db8ce247e3..ab7310e3eec 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java @@ -88,7 +88,7 @@ public class PubSubApiConsumer extends DefaultConsumer { this.pubSubClient.setUsePlainTextConnection(this.usePlainTextConnection); ServiceHelper.startService(pubSubClient); - pubSubClient.subscribe(this, initialReplayPreset, initialReplayId); + pubSubClient.subscribe(this, initialReplayPreset, initialReplayId, true); } @Override diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java index 72ae4d5963a..ef16d7ecf4f 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java @@ -24,8 +24,11 @@ import java.util.Base64; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import com.salesforce.eventbus.protobuf.ConsumerEvent; import com.salesforce.eventbus.protobuf.FetchRequest; @@ -145,7 +148,8 @@ public class PubSubApiClient extends ServiceSupport { return publishResults; } - public void subscribe(PubSubApiConsumer consumer, ReplayPreset replayPreset, String initialReplayId) { + public void subscribe( + PubSubApiConsumer consumer, ReplayPreset replayPreset, String initialReplayId, boolean initialSubscribe) { LOG.debug("Starting subscribe {}", consumer.getTopic()); this.initialReplayPreset = replayPreset; this.initialReplayId = initialReplayId; @@ -153,11 +157,14 @@ public class PubSubApiClient extends ServiceSupport { throw new RuntimeException("initialReplayId is required for ReplayPreset.CUSTOM"); } + String topic = consumer.getTopic(); ByteString replayId = null; if (initialReplayId != null) { replayId = base64DecodeToByteString(initialReplayId); + if (initialSubscribe) { + checkInitialReplayIdValidity(topic, replayId); + } } - String topic = consumer.getTopic(); LOG.info("Subscribing to topic: {}.", topic); final FetchResponseObserver responseObserver = new FetchResponseObserver(consumer); StreamObserver<FetchRequest> serverStream = asyncStub.subscribe(responseObserver); @@ -174,6 +181,58 @@ public class PubSubApiClient extends ServiceSupport { serverStream.onNext(fetchRequestBuilder.build()); } + public void checkInitialReplayIdValidity(String topic, ByteString replayId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Checking initialReplayId {} for topic {}", base64EncodeByteString(replayId), topic); + } + final AtomicReference<Throwable> error = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final StreamObserver<FetchResponse> responseObserver = new StreamObserver<>() { + + @Override + public void onNext(FetchResponse value) { + latch.countDown(); + } + + @Override + public void onError(Throwable t) { + if (t instanceof StatusRuntimeException e) { + Metadata trailers = e.getTrailers(); + if (trailers != null && PUBSUB_ERROR_CORRUPTED_REPLAY_ID + .equals(trailers.get(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER)))) { + error.set(t); + } + } + latch.countDown(); + } + + @Override + public void onCompleted() { + } + }; + StreamObserver<FetchRequest> serverStream = asyncStub.subscribe(responseObserver); + FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder() + .setReplayPreset(ReplayPreset.CUSTOM) + .setTopicName(topic) + .setNumRequested(1) + .setReplayId(replayId); + serverStream.onNext(fetchRequestBuilder.build()); + + try { + if (!Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS)) { + throw new RuntimeException("timeout while checking initialReplayId."); + } + } finally { + serverStream.onCompleted(); + } + + if (error.get() != null) { + throw new RuntimeException( + "initialReplayId " + base64EncodeByteString(replayId) + " is not valid", + error.get()); + } + } + public TopicInfo getTopicInfo(String name) { return topicInfoCache.computeIfAbsent(name, topic -> blockingStub.getTopic(TopicRequest.newBuilder().setTopicName(topic).build())); @@ -346,14 +405,10 @@ public class PubSubApiClient extends ServiceSupport { LOG.debug("logged in {}", consumer.getTopic()); } case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> { - if (initialReplayPreset == ReplayPreset.CUSTOM) { - LOG.error("replay id: " + replayId + " is corrupt."); - } else { - LOG.error("replay id: " + replayId - + " is corrupt. Trying to recover by resubscribing with LATEST replay preset"); - replayId = null; - initialReplayPreset = ReplayPreset.LATEST; - } + LOG.error("replay id: " + replayId + + " is corrupt. Trying to recover by resubscribing with LATEST replay preset"); + replayId = null; + initialReplayPreset = ReplayPreset.LATEST; } default -> LOG.error("unexpected errorCode: {}", errorCode); } @@ -373,12 +428,12 @@ public class PubSubApiClient extends ServiceSupport { throw new RuntimeException(e); } if (replayId != null) { - subscribe(consumer, ReplayPreset.CUSTOM, replayId); + subscribe(consumer, ReplayPreset.CUSTOM, replayId, false); } else { if (initialReplayPreset == ReplayPreset.CUSTOM) { - subscribe(consumer, initialReplayPreset, initialReplayId); + subscribe(consumer, initialReplayPreset, initialReplayId, false); } else { - subscribe(consumer, initialReplayPreset, null); + subscribe(consumer, initialReplayPreset, null, false); } } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java index 52447d76abd..8dbd1adb09f 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java @@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -65,11 +64,11 @@ public class PubSubApiTest { port, 1000, 10000, true)); client.setUsePlainTextConnection(true); client.start(); - client.subscribe(consumer, ReplayPreset.LATEST, null); + client.subscribe(consumer, ReplayPreset.LATEST, null, true); verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); - verify(client, timeout(5000).times(1)).subscribe(consumer, ReplayPreset.LATEST, null); - verify(client, timeout(5000).times(1)).subscribe(consumer, ReplayPreset.CUSTOM, "MTIz"); + verify(client, timeout(5000).times(1)).subscribe(consumer, ReplayPreset.LATEST, null, true); + verify(client, timeout(5000).times(1)).subscribe(consumer, ReplayPreset.CUSTOM, "MTIz", false); } @Test @@ -95,10 +94,10 @@ public class PubSubApiTest { port, 1000, 10000, true)); client.setUsePlainTextConnection(true); client.start(); - client.subscribe(consumer, ReplayPreset.CUSTOM, "initial"); + client.subscribe(consumer, ReplayPreset.CUSTOM, "initial", false); verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); - verify(client, timeout(5000).times(2)).subscribe(consumer, ReplayPreset.CUSTOM, "initial"); + verify(client, timeout(5000).times(2)).subscribe(consumer, ReplayPreset.CUSTOM, "initial", false); } @Test @@ -124,12 +123,12 @@ public class PubSubApiTest { port, 1000, 10000, true)); client.setUsePlainTextConnection(true); client.start(); - client.subscribe(consumer, ReplayPreset.LATEST, null); + client.subscribe(consumer, ReplayPreset.LATEST, null, false); Thread.sleep(1000); verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); - verify(client, timeout(5000).times(2)).subscribe(consumer, ReplayPreset.LATEST, null); + verify(client, timeout(5000).times(2)).subscribe(consumer, ReplayPreset.LATEST, null, false); } @Test @@ -155,7 +154,7 @@ public class PubSubApiTest { port, 1000, 10000, true); client.setUsePlainTextConnection(true); client.start(); - client.subscribe(consumer, ReplayPreset.LATEST, null); + client.subscribe(consumer, ReplayPreset.LATEST, null, true); verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); }