This is an automated email from the ASF dual-hosted git repository. zregvart pushed a commit to branch issue/CAMEL-12871 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 99e1f82bd41d2285f5c25628f9980c431998946e Author: Zoran Regvart <zregv...@apache.org> AuthorDate: Mon Dec 14 13:39:09 2020 +0100 CAMEL-12871: stub server and test Adds a stub server implemented in Jetty, as it is already pulled in as a dependency, and a integration test for the testing streaming resiliency. Two cases are added in the integration test: server closing the TCP connection (e.g. in a abrupt server shutdown), and restarting the `SubscriptionHelper` service (e.g. when route is restarted). --- .../salesforce/internal/streaming/StubServer.java | 268 ++++++++++++++++ .../SubscriptionHelperIntegrationTest.java | 351 +++++++++++++++++++++ 2 files changed, 619 insertions(+) diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java new file mode 100644 index 0000000..e10674a --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.streaming; + +import java.io.IOException; +import java.io.Reader; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.camel.util.IOHelper; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.AbstractNetworkConnector; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; + +class StubServer { + + private final Server server; + + private final List<StubResponse> stubs = new ArrayList<>(); + + class StubHandler extends AbstractHandler { + + @Override + public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) + throws IOException, ServletException { + final String body; + try (Reader bodyReader = request.getReader()) { + body = IOHelper.toString(bodyReader); + } + + final StubResponse stub = stubFor(request, body); + + if (stub == null) { + System.err.println("Stub not found for " + request.getMethod() + " " + request.getRequestURI()); + response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED); + return; + } + + response.setStatus(stub.responseStatus); + response.setContentType("application/json;charset=UTF-8"); + + final String id = messageIdFrom(body); + + try (Writer out = response.getWriter()) { + stub.writeTo(id, out); + } + } + + private StubResponse stubFor(final HttpServletRequest request, final String body) throws IOException { + for (final StubResponse stub : stubs) { + if (stub.matches(request, body)) { + return stub; + } + } + + return null; + } + + } + + final class StubResponse { + + private Predicate<String> requestCondition; + + private final String requestMethod; + + private final String requestPath; + + private BlockingQueue<String> responseMessages; + + private final int responseStatus; + + private String responseString; + + public StubResponse(final String requestMethod, final String requestPath, final int responseStatus, final Predicate<String> requestCondition, + final BlockingQueue<String> responseMessages) { + this(requestMethod, requestPath, responseStatus, requestCondition); + + this.responseMessages = responseMessages; + } + + private StubResponse(final String requestMethod, final String requestPath, final int responseStatus) { + this.responseStatus = responseStatus; + this.requestMethod = Objects.requireNonNull(requestMethod, "requestMethod"); + this.requestPath = Objects.requireNonNull(requestPath, "requestPath"); + } + + private StubResponse(final String requestMethod, final String requestPath, final int responseStatus, final BlockingQueue<String> responseMessages) { + this(requestMethod, requestPath, responseStatus); + + this.responseMessages = responseMessages; + } + + private StubResponse(final String requestMethod, final String requestPath, final int responseStatus, final Predicate<String> requestCondition) { + this(requestMethod, requestPath, responseStatus); + + this.requestCondition = requestCondition; + } + + private StubResponse(final String requestMethod, final String requestPath, final int responseStatus, final Predicate<String> requestCondition, + final String responseString) { + this(requestMethod, requestPath, responseStatus, requestCondition); + + this.responseString = responseString; + } + + private StubResponse(final String requestMethod, final String requestPath, final int responseStatus, + final String responseString) { + this(requestMethod, requestPath, responseStatus); + this.responseString = responseString; + } + + @Override + public String toString() { + return requestMethod + " " + requestPath; + } + + private boolean matches(final HttpServletRequest request, final String body) throws IOException { + final boolean matches = Objects.equals(requestMethod, request.getMethod()) && Objects.equals(requestPath, request.getRequestURI()); + + if (!matches) { + return false; + } + + if (requestCondition == null) { + return true; + } + + return requestCondition.test(body); + } + + private void writeTo(final String messageId, final Writer out) throws IOException { + if (responseString != null) { + out.write(responseString.replace("$id", messageId)); + out.flush(); + return; + } + + if (responseMessages != null) { + while (true) { + try { + final String message = responseMessages.poll(25, TimeUnit.MILLISECONDS); + if (message != null) { + out.write(message.replace("$id", messageId)); + out.flush(); + return; + } + + if (!server.isRunning()) { + return; + } + } catch (final InterruptedException ignored) { + return; + } + } + } + } + } + + public StubServer() { + server = new Server(0); + server.setHandler(new StubHandler()); + + try { + server.start(); + } catch (final Exception e) { + throw new ExceptionInInitializerError(e); + } + } + + @SuppressWarnings("resource") + public int port() { + return connector().getLocalPort(); + } + + public void replyTo(final String method, final String path, final BlockingQueue<String> messages) { + stubs.add(new StubResponse(method, path, 200, messages)); + } + + public void replyTo(final String method, final String path, final int status) { + stubs.add(new StubResponse(method, path, status)); + } + + public void replyTo(final String method, final String path, final Predicate<String> requestCondition, final BlockingQueue<String> messages) { + stubs.add(new StubResponse(method, path, 200, requestCondition, messages)); + } + + public void replyTo(final String method, final String path, final Predicate<String> requestCondition, final String response) { + stubs.add(new StubResponse(method, path, 200, requestCondition, response)); + } + + public void replyTo(final String method, final String path, final String response) { + stubs.add(new StubResponse(method, path, 200, response)); + } + + @SuppressWarnings("resource") + public void abruptlyRestart() { + final int port = port(); + + stop(); + + connector().setPort(port); + + try { + server.start(); + } catch (final Exception e) { + throw new IllegalStateException(e); + } + } + + public void stop() { + + try { + for (final EndPoint endPoint : connector().getConnectedEndPoints()) { + endPoint.close(); + } + + server.stop(); + } catch (final Exception e) { + throw new IllegalStateException(e); + } + } + + private AbstractNetworkConnector connector() { + final AbstractNetworkConnector connector = (AbstractNetworkConnector) server.getConnectors()[0]; + return connector; + } + + private static String messageIdFrom(final String body) { + int idx = body.indexOf("\"id\":\""); + String id = ""; + + if (idx > 0) { + idx += 6; + char ch; + while (Character.isDigit(ch = body.charAt(idx++))) { + id += ch; + } + } + return id; + } + +} diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java new file mode 100644 index 0000000..93559e9 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.streaming; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.salesforce.AuthenticationType; +import org.apache.camel.component.salesforce.SalesforceComponent; +import org.apache.camel.component.salesforce.SalesforceConsumer; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.SalesforceEndpointConfig; +import org.apache.camel.component.salesforce.api.SalesforceException; +import org.apache.camel.impl.DefaultCamelContext; +import org.cometd.bayeux.Message; +import org.cometd.bayeux.client.ClientSessionChannel; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; + +import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@TestInstance(Lifecycle.PER_CLASS) +public class SubscriptionHelperIntegrationTest { + + final CamelContext camel; + + final SalesforceEndpointConfig config = new SalesforceEndpointConfig(); + + final BlockingQueue<String> messages = new LinkedBlockingDeque<>(); + + final SalesforceComponent salesforce; + + final StubServer server; + + final SubscriptionHelper subscription; + + static class MessageArgumentMatcher implements ArgumentMatcher<Message> { + + private final String name; + + public MessageArgumentMatcher(final String name) { + this.name = name; + } + + @Override + public boolean matches(final Message message) { + final Map<String, Object> data = message.getDataAsMap(); + + @SuppressWarnings("unchecked") + final Map<String, Object> event = (Map<String, Object>) data.get("event"); + + @SuppressWarnings("unchecked") + final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject"); + + return "created".equals(event.get("type")) && name.equals(sobject.get("Name")); + } + + static Message messageForAccountCreationWithName(final String name) { + return argThat(new MessageArgumentMatcher(name)); + } + + } + + public SubscriptionHelperIntegrationTest() throws SalesforceException { + server = new StubServer(); + + System.out.println("Port for wireshark to filter: " + server.port()); + + final String instanceUrl = "http://localhost:" + server.port(); + + server.replyTo("POST", "/services/oauth2/token", "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}"); + + server.replyTo("GET", "/services/oauth2/revoke?token=token", 200); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n" + + " {\n" + + " \"ext\": {\n" + + " \"replay\": true,\n" + + " \"payload.format\": true\n" + + " },\n" + + " \"minimumVersion\": \"1.0\",\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"supportedConnectionTypes\": [\n" + + " \"long-polling\"\n" + + " ],\n" + + " \"channel\": \"/meta/handshake\",\n" + + " \"id\": \"1\",\n" + + " \"version\": \"1.0\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", (req) -> req.contains("\"timeout\":0"), "[\n" + + " {\n" + + " \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n" + + " \"advice\": {\n" + + " \"interval\": 0,\n" + + " \"timeout\": 110000,\n" + + " \"reconnect\": \"retry\"\n" + + " },\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"2\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/subscribe\",\n" + + " \"id\": \"$id\",\n" + + " \"subscription\": \"/topic/Account\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/unsubscribe\",\n" + + " \"id\": \"$id\",\n" + + " \"subscription\": \"/topic/Account\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n" + + " {\n" + + " \"channel\": \"/meta/disconnect\",\n" + + " \"clientId\": \"client-id\"\n" + + " }\n" + + "]"); + + server.replyTo("GET", "/services/oauth2/revoke", 200); + + camel = new DefaultCamelContext(); + camel.start(); + salesforce = new SalesforceComponent(camel); + salesforce.setLoginUrl(instanceUrl); + salesforce.setClientId("clientId"); + salesforce.setClientSecret("clientSecret"); + salesforce.setRefreshToken("refreshToken"); + salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN); + salesforce.setConfig(config); + + salesforce.setHttpProxyHost("localhost"); + salesforce.setHttpProxyPort(8866); + salesforce.setHttpProxySecure(false); + + salesforce.start(); + subscription = new SubscriptionHelper(salesforce); + } + + @AfterAll + public void stop() { + subscription.stop(); + salesforce.stop(); + camel.stop(); + server.stop(); + } + + @Test + void shouldResubscribeOnConnectionFailures() throws InterruptedException { + // handshake and connect + subscription.start(); + + // subscribe + final SalesforceConsumer consumer = mock(SalesforceConsumer.class); + + final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class); + when(consumer.getEndpoint()).thenReturn(endpoint); + + when(endpoint.getTopicName()).thenReturn("Account"); + + when(endpoint.getConfiguration()).thenReturn(config); + when(endpoint.getComponent()).thenReturn(salesforce); + + subscription.subscribe("Account", consumer); + + verify(consumer).getEndpoint(); + + // push one message so we know connection is established and consumer + // receives notifications + messages.add("[\n" + + " {\n" + + " \"data\": {\n" + + " \"event\": {\n" + + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" + + " \"replayId\": 1,\n" + + " \"type\": \"created\"\n" + + " },\n" + + " \"sobject\": {\n" + + " \"Id\": \"0011n00002XWMgVAAX\",\n" + + " \"Name\": \"Test Account\"\n" + + " }\n" + + " },\n" + + " \"channel\": \"/topic/Account\"\n" + + " },\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class), messageForAccountCreationWithName("Test Account")); + + // terminate server abruptly by closing the connection (sends FIN, ACK) + server.abruptlyRestart(); + + // queue next message for when the client recovers + messages.add("[\n" + + " {\n" + + " \"data\": {\n" + + " \"event\": {\n" + + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" + + " \"replayId\": 2,\n" + + " \"type\": \"created\"\n" + + " },\n" + + " \"sobject\": {\n" + + " \"Id\": \"0011n00002XWMgVAAX\",\n" + + " \"Name\": \"New Test Account\"\n" + + " }\n" + + " },\n" + + " \"channel\": \"/topic/Account\"\n" + + " },\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + // assert last message was received, recovery can take a bit + verify(consumer, Mockito.timeout(2000)).processMessage(any(ClientSessionChannel.class), messageForAccountCreationWithName("New Test Account")); + + verifyNoMoreInteractions(consumer); + } + + @Test + void shouldResubscribeOnHelperRestart() throws InterruptedException { + // handshake and connect + subscription.start(); + + // subscribe + final SalesforceConsumer consumer = mock(SalesforceConsumer.class); + + final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class); + when(consumer.getEndpoint()).thenReturn(endpoint); + when(consumer.getTopicName()).thenReturn("Account"); + + when(endpoint.getTopicName()).thenReturn("Account"); + + when(endpoint.getConfiguration()).thenReturn(config); + when(endpoint.getComponent()).thenReturn(salesforce); + + subscription.subscribe("Account", consumer); + + verify(consumer).getEndpoint(); + + // push one message so we know connection is established and consumer + // receives notifications + messages.add("[\n" + + " {\n" + + " \"data\": {\n" + + " \"event\": {\n" + + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" + + " \"replayId\": 1,\n" + + " \"type\": \"created\"\n" + + " },\n" + + " \"sobject\": {\n" + + " \"Id\": \"0011n00002XWMgVAAX\",\n" + + " \"Name\": \"Test Account\"\n" + + " }\n" + + " },\n" + + " \"channel\": \"/topic/Account\"\n" + + " },\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class), messageForAccountCreationWithName("Test Account")); + + // stop and start the subscription helper + subscription.stop(); + subscription.start(); + + // queue next message for when the client recovers + messages.add("[\n" + + " {\n" + + " \"data\": {\n" + + " \"event\": {\n" + + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" + + " \"replayId\": 2,\n" + + " \"type\": \"created\"\n" + + " },\n" + + " \"sobject\": {\n" + + " \"Id\": \"0011n00002XWMgVAAX\",\n" + + " \"Name\": \"New Test Account\"\n" + + " }\n" + + " },\n" + + " \"channel\": \"/topic/Account\"\n" + + " },\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + // assert last message was received, recovery can take a bit + verify(consumer, Mockito.timeout(2000)).processMessage(any(ClientSessionChannel.class), messageForAccountCreationWithName("New Test Account")); + + verifyNoMoreInteractions(consumer); + } +}