This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-4.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push: new 73a8578e16b CAMEL-19783: Added reconnection, even if a message has not yet been received (#11308) 73a8578e16b is described below commit 73a8578e16b464b13cce142ba58a825c9c0acf44 Author: Jesse Sightler <jesse.sight...@gmail.com> AuthorDate: Wed Sep 6 00:40:30 2023 -0400 CAMEL-19783: Added reconnection, even if a message has not yet been received (#11308) --- .../component/salesforce/PubSubApiConsumer.java | 21 ++++- .../internal/client/PubSubApiClient.java | 22 +++-- .../camel/component/salesforce/PubSubApiTest.java | 91 +++++++++++++++++++++ .../pubsub/SendOneMessagePubSubServer.java | 94 ++++++++++++++++++++++ 4 files changed, 222 insertions(+), 6 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java index 043638a0143..aa70631679c 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java @@ -38,7 +38,7 @@ public class PubSubApiConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(PubSubApiConsumer.class); private final String topic; private final ReplayPreset initialReplayPreset; - private final String initialReplayId; + private String initialReplayId; private final SalesforceEndpoint endpoint; private final int batchSize; @@ -47,6 +47,8 @@ public class PubSubApiConsumer extends DefaultConsumer { private PubSubApiClient pubSubClient; private Map<String, Class<?>> eventClassMap; + private boolean usePlainTextConnection = false; + public PubSubApiConsumer(SalesforceEndpoint endpoint, Processor processor) throws ClassNotFoundException { super(endpoint, processor); this.endpoint = endpoint; @@ -86,6 +88,7 @@ public class PubSubApiConsumer extends DefaultConsumer { endpoint.getComponent().getSession(), endpoint.getComponent().getLoginConfig(), endpoint.getComponent().getPubSubHost(), endpoint.getComponent().getPubSubPort(), endpoint.getConfiguration().getBackoffIncrement(), endpoint.getConfiguration().getMaxBackoff()); + this.pubSubClient.setUsePlainTextConnection(this.usePlainTextConnection); ServiceHelper.startService(pubSubClient); pubSubClient.subscribe(this, initialReplayPreset, initialReplayId); @@ -116,4 +119,20 @@ public class PubSubApiConsumer extends DefaultConsumer { public Class<?> getPojoClass() { return pojoClass; } + + // ability to use Plain Text (http) for test contexts + public void setUsePlainTextConnection(boolean usePlainTextConnection) { + this.usePlainTextConnection = usePlainTextConnection; + } + + + /** + * This updates the initial replay id. This will only take effect after the route is restarted, and should + * generally only be done while the route is stopped. + * + * @param initialReplayId + */ + public void updateInitialReplayId(String initialReplayId) { + this.initialReplayId = initialReplayId; + } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java index 1567d60216a..8d84aa3f089 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java @@ -95,6 +95,9 @@ public class PubSubApiClient extends ServiceSupport { private ManagedChannel channel; private boolean usePlainTextConnection = false; + private ReplayPreset initialReplayPreset; + private String initialReplayId; + public PubSubApiClient(SalesforceSession session, SalesforceLoginConfig loginConfig, String pubSubHost, int pubSubPort, long backoffIncrement, long maxBackoff) { this.session = session; @@ -138,6 +141,8 @@ public class PubSubApiClient extends ServiceSupport { public void subscribe(PubSubApiConsumer consumer, ReplayPreset replayPreset, String initialReplayId) { LOG.error("Starting subscribe {}", consumer.getTopic()); + this.initialReplayPreset = replayPreset; + this.initialReplayId = initialReplayId; if (replayPreset == ReplayPreset.CUSTOM && initialReplayId == null) { throw new RuntimeException("initialReplayId is required for ReplayPreset.CUSTOM"); } @@ -335,12 +340,19 @@ public class PubSubApiClient extends ServiceSupport { LOG.error("An unexpected error occurred.", throwable); } LOG.debug("Attempting subscribe after error"); - if (replayId == null) { - LOG.warn("Not re-subscribing after error because replayId is null. Topic: {}", - consumer.getTopic()); - return; + resubscribeOnError(); + } + + private void resubscribeOnError() { + if (replayId != null) { + subscribe(consumer, ReplayPreset.CUSTOM, replayId); + } else { + if (initialReplayPreset == ReplayPreset.CUSTOM) { + subscribe(consumer, initialReplayPreset, initialReplayId); + } else { + subscribe(consumer, initialReplayPreset, null); + } } - subscribe(consumer, ReplayPreset.CUSTOM, replayId); } @Override diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java index 2ab1ba5d145..4d82e0492ec 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java @@ -25,13 +25,16 @@ import io.grpc.ServerBuilder; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.component.salesforce.internal.client.PubSubApiClient; import org.apache.camel.component.salesforce.internal.pubsub.AuthErrorPubSubServer; +import org.apache.camel.component.salesforce.internal.pubsub.SendOneMessagePubSubServer; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,6 +42,94 @@ public class PubSubApiTest { private static final Logger LOG = LoggerFactory.getLogger(PubSubApiTest.class); + @Test + public void testReconnectOnErrorAfterReplayIdNonNull() throws Exception { + final SalesforceSession session = mock(SalesforceSession.class); + when(session.getAccessToken()).thenReturn("faketoken"); + when(session.getInstanceUrl()).thenReturn("https://myinstance"); + when(session.getOrgId()).thenReturn("00D123123123"); + + final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class); + when(consumer.getTopic()).thenReturn("/event/FakeTopic"); + when(consumer.getBatchSize()).thenReturn(100); + + int port = getPort(); + LOG.debug("Starting server on port {}", port); + final Server grpcServer = ServerBuilder.forPort(port) + .addService(new SendOneMessagePubSubServer()) + .build(); + grpcServer.start(); + + PubSubApiClient client = Mockito.spy(new PubSubApiClient( + session, new SalesforceLoginConfig(), "localhost", + port, 1000, 10000)); + client.setUsePlainTextConnection(true); + client.start(); + client.subscribe(consumer, ReplayPreset.LATEST, null); + + verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); + verify(client, times(1)).subscribe(consumer, ReplayPreset.LATEST, null); + verify(client, times(1)).subscribe(consumer, ReplayPreset.CUSTOM, "MTIz"); + } + + @Test + public void testReconnectOnErrorCustomWithInitialReplayId() throws Exception { + final SalesforceSession session = mock(SalesforceSession.class); + when(session.getAccessToken()).thenReturn("faketoken"); + when(session.getInstanceUrl()).thenReturn("https://myinstance"); + when(session.getOrgId()).thenReturn("00D123123123"); + + final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class); + when(consumer.getTopic()).thenReturn("/event/FakeTopic"); + when(consumer.getBatchSize()).thenReturn(100); + + int port = getPort(); + LOG.debug("Starting server on port {}", port); + final Server grpcServer = ServerBuilder.forPort(port) + .addService(new AuthErrorPubSubServer()) + .build(); + grpcServer.start(); + + PubSubApiClient client = Mockito.spy(new PubSubApiClient( + session, new SalesforceLoginConfig(), "localhost", + port, 1000, 10000)); + client.setUsePlainTextConnection(true); + client.start(); + client.subscribe(consumer, ReplayPreset.CUSTOM, "initial"); + + verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); + verify(client, times(2)).subscribe(consumer, ReplayPreset.CUSTOM, "initial"); + } + + @Test + public void testReconnectOnErrorWithNullInitialReplayId() throws Exception { + final SalesforceSession session = mock(SalesforceSession.class); + when(session.getAccessToken()).thenReturn("faketoken"); + when(session.getInstanceUrl()).thenReturn("https://myinstance"); + when(session.getOrgId()).thenReturn("00D123123123"); + + final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class); + when(consumer.getTopic()).thenReturn("/event/FakeTopic"); + when(consumer.getBatchSize()).thenReturn(100); + + int port = getPort(); + LOG.debug("Starting server on port {}", port); + final Server grpcServer = ServerBuilder.forPort(port) + .addService(new AuthErrorPubSubServer()) + .build(); + grpcServer.start(); + + PubSubApiClient client = Mockito.spy(new PubSubApiClient( + session, new SalesforceLoginConfig(), "localhost", + port, 1000, 10000)); + client.setUsePlainTextConnection(true); + client.start(); + client.subscribe(consumer, ReplayPreset.LATEST, null); + + verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong()); + verify(client, times(2)).subscribe(consumer, ReplayPreset.LATEST, null); + } + @Test public void shouldAuthenticateAndSubscribeAfterAuthError() throws IOException { final SalesforceSession session = mock(SalesforceSession.class); diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendOneMessagePubSubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendOneMessagePubSubServer.java new file mode 100644 index 00000000000..17db33608d6 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendOneMessagePubSubServer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.pubsub; + +import com.google.protobuf.ByteString; +import com.salesforce.eventbus.protobuf.ConsumerEvent; +import com.salesforce.eventbus.protobuf.EventHeader; +import com.salesforce.eventbus.protobuf.FetchRequest; +import com.salesforce.eventbus.protobuf.FetchResponse; +import com.salesforce.eventbus.protobuf.ProducerEvent; +import com.salesforce.eventbus.protobuf.PubSubGrpc; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Timer; +import java.util.TimerTask; + +import static org.apache.camel.component.salesforce.internal.client.PubSubApiClient.PUBSUB_ERROR_AUTH_ERROR; + +public class SendOneMessagePubSubServer extends PubSubGrpc.PubSubImplBase { + + public int onNextCalls = 0; + + @Override + public StreamObserver<FetchRequest> subscribe(StreamObserver<FetchResponse> client) { + + return new StreamObserver<>() { + @Override + public void onNext(FetchRequest request) { + onNextCalls = onNextCalls + 1; + if (onNextCalls > 1) { + TimerTask task = new TimerTask() { + public void run() { + StatusRuntimeException e = new StatusRuntimeException(Status.UNAUTHENTICATED, new Metadata()); + e.getTrailers().put(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER), + PUBSUB_ERROR_AUTH_ERROR); + client.onError(e); + } + }; + Timer timer = new Timer("Timer"); + long delay = 1000L; + timer.schedule(task, delay); + return; + } + TimerTask task = new TimerTask() { + public void run() { + FetchResponse response = FetchResponse.newBuilder() + .setLatestReplayId(ByteString.copyFromUtf8("123")) + .build(); + client.onNext(response); + } + }; + Timer timer = new Timer("Timer"); + long delay = 1000L; + timer.schedule(task, delay); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + + } + }; + } +}