This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.0.x by this push:
     new 73a8578e16b CAMEL-19783: Added reconnection, even if a message has not 
yet been received (#11308)
73a8578e16b is described below

commit 73a8578e16b464b13cce142ba58a825c9c0acf44
Author: Jesse Sightler <jesse.sight...@gmail.com>
AuthorDate: Wed Sep 6 00:40:30 2023 -0400

    CAMEL-19783: Added reconnection, even if a message has not yet been 
received (#11308)
---
 .../component/salesforce/PubSubApiConsumer.java    | 21 ++++-
 .../internal/client/PubSubApiClient.java           | 22 +++--
 .../camel/component/salesforce/PubSubApiTest.java  | 91 +++++++++++++++++++++
 .../pubsub/SendOneMessagePubSubServer.java         | 94 ++++++++++++++++++++++
 4 files changed, 222 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
index 043638a0143..aa70631679c 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -38,7 +38,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(PubSubApiConsumer.class);
     private final String topic;
     private final ReplayPreset initialReplayPreset;
-    private final String initialReplayId;
+    private String initialReplayId;
     private final SalesforceEndpoint endpoint;
 
     private final int batchSize;
@@ -47,6 +47,8 @@ public class PubSubApiConsumer extends DefaultConsumer {
     private PubSubApiClient pubSubClient;
     private Map<String, Class<?>> eventClassMap;
 
+    private boolean usePlainTextConnection = false;
+
     public PubSubApiConsumer(SalesforceEndpoint endpoint, Processor processor) 
throws ClassNotFoundException {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -86,6 +88,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
                 endpoint.getComponent().getSession(), 
endpoint.getComponent().getLoginConfig(),
                 endpoint.getComponent().getPubSubHost(), 
endpoint.getComponent().getPubSubPort(),
                 endpoint.getConfiguration().getBackoffIncrement(), 
endpoint.getConfiguration().getMaxBackoff());
+        
this.pubSubClient.setUsePlainTextConnection(this.usePlainTextConnection);
 
         ServiceHelper.startService(pubSubClient);
         pubSubClient.subscribe(this, initialReplayPreset, initialReplayId);
@@ -116,4 +119,20 @@ public class PubSubApiConsumer extends DefaultConsumer {
     public Class<?> getPojoClass() {
         return pojoClass;
     }
+
+    // ability to use Plain Text (http) for test contexts
+    public void setUsePlainTextConnection(boolean usePlainTextConnection) {
+        this.usePlainTextConnection = usePlainTextConnection;
+    }
+
+
+    /**
+     * This updates the initial replay id. This will only take effect after 
the route is restarted, and should
+     * generally only be done while the route is stopped.
+     *
+     * @param initialReplayId
+     */
+    public void updateInitialReplayId(String initialReplayId) {
+        this.initialReplayId = initialReplayId;
+    }
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index 1567d60216a..8d84aa3f089 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -95,6 +95,9 @@ public class PubSubApiClient extends ServiceSupport {
     private ManagedChannel channel;
     private boolean usePlainTextConnection = false;
 
+    private ReplayPreset initialReplayPreset;
+    private String initialReplayId;
+
     public PubSubApiClient(SalesforceSession session, SalesforceLoginConfig 
loginConfig, String pubSubHost,
                            int pubSubPort, long backoffIncrement, long 
maxBackoff) {
         this.session = session;
@@ -138,6 +141,8 @@ public class PubSubApiClient extends ServiceSupport {
 
     public void subscribe(PubSubApiConsumer consumer, ReplayPreset 
replayPreset, String initialReplayId) {
         LOG.error("Starting subscribe {}", consumer.getTopic());
+        this.initialReplayPreset = replayPreset;
+        this.initialReplayId = initialReplayId;
         if (replayPreset == ReplayPreset.CUSTOM && initialReplayId == null) {
             throw new RuntimeException("initialReplayId is required for 
ReplayPreset.CUSTOM");
         }
@@ -335,12 +340,19 @@ public class PubSubApiClient extends ServiceSupport {
                 LOG.error("An unexpected error occurred.", throwable);
             }
             LOG.debug("Attempting subscribe after error");
-            if (replayId == null) {
-                LOG.warn("Not re-subscribing after error because replayId is 
null. Topic: {}",
-                        consumer.getTopic());
-                return;
+            resubscribeOnError();
+        }
+
+        private void resubscribeOnError() {
+            if (replayId != null) {
+                subscribe(consumer, ReplayPreset.CUSTOM, replayId);
+            } else {
+                if (initialReplayPreset == ReplayPreset.CUSTOM) {
+                    subscribe(consumer, initialReplayPreset, initialReplayId);
+                } else {
+                    subscribe(consumer, initialReplayPreset, null);
+                }
             }
-            subscribe(consumer, ReplayPreset.CUSTOM, replayId);
         }
 
         @Override
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
index 2ab1ba5d145..4d82e0492ec 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -25,13 +25,16 @@ import io.grpc.ServerBuilder;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
 import 
org.apache.camel.component.salesforce.internal.pubsub.AuthErrorPubSubServer;
+import 
org.apache.camel.component.salesforce.internal.pubsub.SendOneMessagePubSubServer;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -39,6 +42,94 @@ public class PubSubApiTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PubSubApiTest.class);
 
+    @Test
+    public void testReconnectOnErrorAfterReplayIdNonNull() throws Exception {
+        final SalesforceSession session = mock(SalesforceSession.class);
+        when(session.getAccessToken()).thenReturn("faketoken");
+        when(session.getInstanceUrl()).thenReturn("https://myinstance";);
+        when(session.getOrgId()).thenReturn("00D123123123");
+
+        final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+        when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+        when(consumer.getBatchSize()).thenReturn(100);
+
+        int port = getPort();
+        LOG.debug("Starting server on port {}", port);
+        final Server grpcServer = ServerBuilder.forPort(port)
+            .addService(new SendOneMessagePubSubServer())
+            .build();
+        grpcServer.start();
+
+        PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+            session, new SalesforceLoginConfig(), "localhost",
+            port, 1000, 10000));
+        client.setUsePlainTextConnection(true);
+        client.start();
+        client.subscribe(consumer, ReplayPreset.LATEST, null);
+
+        verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
+        verify(client, times(1)).subscribe(consumer, ReplayPreset.LATEST, 
null);
+        verify(client, times(1)).subscribe(consumer, ReplayPreset.CUSTOM, 
"MTIz");
+    }
+
+    @Test
+    public void testReconnectOnErrorCustomWithInitialReplayId() throws 
Exception {
+        final SalesforceSession session = mock(SalesforceSession.class);
+        when(session.getAccessToken()).thenReturn("faketoken");
+        when(session.getInstanceUrl()).thenReturn("https://myinstance";);
+        when(session.getOrgId()).thenReturn("00D123123123");
+
+        final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+        when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+        when(consumer.getBatchSize()).thenReturn(100);
+
+        int port = getPort();
+        LOG.debug("Starting server on port {}", port);
+        final Server grpcServer = ServerBuilder.forPort(port)
+            .addService(new AuthErrorPubSubServer())
+            .build();
+        grpcServer.start();
+
+        PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+            session, new SalesforceLoginConfig(), "localhost",
+            port, 1000, 10000));
+        client.setUsePlainTextConnection(true);
+        client.start();
+        client.subscribe(consumer, ReplayPreset.CUSTOM, "initial");
+
+        verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
+        verify(client, times(2)).subscribe(consumer, ReplayPreset.CUSTOM, 
"initial");
+    }
+
+    @Test
+    public void testReconnectOnErrorWithNullInitialReplayId() throws Exception 
{
+        final SalesforceSession session = mock(SalesforceSession.class);
+        when(session.getAccessToken()).thenReturn("faketoken");
+        when(session.getInstanceUrl()).thenReturn("https://myinstance";);
+        when(session.getOrgId()).thenReturn("00D123123123");
+
+        final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+        when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+        when(consumer.getBatchSize()).thenReturn(100);
+
+        int port = getPort();
+        LOG.debug("Starting server on port {}", port);
+        final Server grpcServer = ServerBuilder.forPort(port)
+            .addService(new AuthErrorPubSubServer())
+            .build();
+        grpcServer.start();
+
+        PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+            session, new SalesforceLoginConfig(), "localhost",
+            port, 1000, 10000));
+        client.setUsePlainTextConnection(true);
+        client.start();
+        client.subscribe(consumer, ReplayPreset.LATEST, null);
+
+        verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
+        verify(client, times(2)).subscribe(consumer, ReplayPreset.LATEST, 
null);
+    }
+
     @Test
     public void shouldAuthenticateAndSubscribeAfterAuthError() throws 
IOException {
         final SalesforceSession session = mock(SalesforceSession.class);
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendOneMessagePubSubServer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendOneMessagePubSubServer.java
new file mode 100644
index 00000000000..17db33608d6
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendOneMessagePubSubServer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.pubsub;
+
+import com.google.protobuf.ByteString;
+import com.salesforce.eventbus.protobuf.ConsumerEvent;
+import com.salesforce.eventbus.protobuf.EventHeader;
+import com.salesforce.eventbus.protobuf.FetchRequest;
+import com.salesforce.eventbus.protobuf.FetchResponse;
+import com.salesforce.eventbus.protobuf.ProducerEvent;
+import com.salesforce.eventbus.protobuf.PubSubGrpc;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static 
org.apache.camel.component.salesforce.internal.client.PubSubApiClient.PUBSUB_ERROR_AUTH_ERROR;
+
+public class SendOneMessagePubSubServer extends PubSubGrpc.PubSubImplBase {
+
+  public int onNextCalls = 0;
+
+  @Override
+  public StreamObserver<FetchRequest> subscribe(StreamObserver<FetchResponse> 
client) {
+
+    return new StreamObserver<>() {
+      @Override
+      public void onNext(FetchRequest request) {
+        onNextCalls = onNextCalls + 1;
+        if (onNextCalls > 1) {
+          TimerTask task = new TimerTask() {
+            public void run() {
+              StatusRuntimeException e = new 
StatusRuntimeException(Status.UNAUTHENTICATED, new Metadata());
+              e.getTrailers().put(Metadata.Key.of("error-code", 
Metadata.ASCII_STRING_MARSHALLER),
+                  PUBSUB_ERROR_AUTH_ERROR);
+              client.onError(e);
+            }
+          };
+          Timer timer = new Timer("Timer");
+          long delay = 1000L;
+          timer.schedule(task, delay);
+          return;
+        }
+        TimerTask task = new TimerTask() {
+          public void run() {
+            FetchResponse response = FetchResponse.newBuilder()
+                .setLatestReplayId(ByteString.copyFromUtf8("123"))
+                .build();
+            client.onNext(response);
+          }
+        };
+        Timer timer = new Timer("Timer");
+        long delay = 1000L;
+        timer.schedule(task, delay);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+
+      }
+
+      @Override
+      public void onCompleted() {
+
+      }
+    };
+  }
+}

Reply via email to