This is an automated email from the ASF dual-hosted git repository. jeremyross pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2fbf4b94c5f9db6bd66040ed49abd9262ea3788c Author: Jeremy Ross <jeremy.g.r...@gmail.com> AuthorDate: Sun Nov 14 20:15:45 2021 -0600 camel-salesforce: restore & fix integration test. --- .../SubscriptionHelperIntegrationTest.java | 109 ++++++++++++++++++--- 1 file changed, 96 insertions(+), 13 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java index 5105595..8df70f1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.slf4j.LoggerFactory; import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName; @@ -53,15 +54,10 @@ import static org.mockito.Mockito.when; public class SubscriptionHelperIntegrationTest { final CamelContext camel; - final SalesforceEndpointConfig config = new SalesforceEndpointConfig(); - final BlockingQueue<String> messages = new LinkedBlockingDeque<>(); - final SalesforceComponent salesforce; - final StubServer server; - final SubscriptionHelper subscription; SalesforceConsumer toUnsubscribe; @@ -77,33 +73,25 @@ public class SubscriptionHelperIntegrationTest { @Override public boolean matches(final Message message) { final Map<String, Object> data = message.getDataAsMap(); - @SuppressWarnings("unchecked") final Map<String, Object> event = (Map<String, Object>) data.get("event"); - @SuppressWarnings("unchecked") final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject"); - return "created".equals(event.get("type")) && name.equals(sobject.get("Name")); } static Message messageForAccountCreationWithName(final String name) { return argThat(new MessageArgumentMatcher(name)); } - } public SubscriptionHelperIntegrationTest() throws SalesforceException { server = new StubServer(); - LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}", server.port()); - final String instanceUrl = "http://localhost:" + server.port(); - server.replyTo("POST", "/services/oauth2/token", "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}"); - server.replyTo("GET", "/services/oauth2/revoke?token=token", 200); server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n" @@ -202,6 +190,101 @@ public class SubscriptionHelperIntegrationTest { } @Test + void shouldResubscribeOnConnectionFailures() throws InterruptedException { + // handshake and connect + subscription.start(); + + final SalesforceConsumer consumer + = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer"); + + final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint"); + + // subscribe + when(consumer.getTopicName()).thenReturn("Account"); + + when(consumer.getEndpoint()).thenReturn(endpoint); + when(endpoint.getConfiguration()).thenReturn(config); + when(endpoint.getComponent()).thenReturn(salesforce); + when(endpoint.getTopicName()).thenReturn("Account"); + + subscription.subscribe("Account", consumer); + + // push one message so we know connection is established and consumer + // receives notifications + messages.add("[\n" + + " {\n" + + " \"data\": {\n" + + " \"event\": {\n" + + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" + + " \"replayId\": 1,\n" + + " \"type\": \"created\"\n" + + " },\n" + + " \"sobject\": {\n" + + " \"Id\": \"0011n00002XWMgVAAX\",\n" + + " \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n" + + " }\n" + + " },\n" + + " \"channel\": \"/topic/Account\"\n" + + " },\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class), + messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1")); + + // send failed connection message w/o reconnect advice so we handshake again + + messages.add("[\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": false,\n" + + " \"advice\": {\n" + + " \"reconnect\": \"none\"\n" + + " }\n" + + " }\n" + + "]"); + + // queue next message for when the client recovers + messages.add("[\n" + + " {\n" + + " \"data\": {\n" + + " \"event\": {\n" + + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n" + + " \"replayId\": 2,\n" + + " \"type\": \"created\"\n" + + " },\n" + + " \"sobject\": {\n" + + " \"Id\": \"0011n00002XWMgVAAX\",\n" + + " \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n" + + " }\n" + + " },\n" + + " \"channel\": \"/topic/Account\"\n" + + " },\n" + + " {\n" + + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); + + // assert last message was received, recovery can take a bit + verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class), + messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2")); + + verify(consumer, atLeastOnce()).getEndpoint(); + verify(consumer, atLeastOnce()).getTopicName(); + verifyNoMoreInteractions(consumer); + } + + @Test void shouldResubscribeOnHelperRestart() { // handshake and connect subscription.start();