This is an automated email from the ASF dual-hosted git repository. zregvart pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4a3ce898aa444163b95f67bc6fd91c4311dc7845 Author: Zoran Regvart <zregv...@apache.org> AuthorDate: Wed Dec 16 14:43:13 2020 +0100 CAMEL-12871: disconnect on handshake failure If we can't connect and perform the handshake, disconnecting will trigger client restart with back-off. Also when restarting as the signal to restart can occur on multiple threads we need to guard against restart happening in parallel. (cherry picked from commit e1760b1de6adbb5a0f10c02c677e69a04feaad26) --- .../internal/streaming/SubscriptionHelper.java | 132 ++++++++++++--------- .../SubscriptionHelperIntegrationTest.java | 24 ++-- 2 files changed, 89 insertions(+), 67 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 25e363b..c97fb02 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; @@ -93,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport { private volatile boolean reconnecting; private final AtomicLong restartBackoff; + private final AtomicBoolean restarting = new AtomicBoolean(); public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException { this.component = component; @@ -173,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport { connectError = (String) message.get(ERROR_FIELD); connectException = getFailure(message); + client.disconnect(); } else if (reconnecting) { reconnecting = false; @@ -206,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport { } client.getChannel(META_DISCONNECT).addListener(disconnectListener); + connect(); + } + + private void connect() throws CamelException { // connect to Salesforce cometd endpoint client.handshake(); @@ -229,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport { // launch an async task to restart private void restartClient() { + if (!restarting.compareAndSet(false, true)) { + return; + } // launch a new restart command final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); httpClient.getExecutor().execute(new Runnable() { @Override public void run() { + try { + performClientRestart(); + } finally { + restarting.set(false); + } + } + }); + } - LOG.info("Restarting on unexpected disconnect from Salesforce..."); - boolean abort = false; + private void performClientRestart() { + if (isStoppingOrStopped()) { + return; + } - // wait for disconnect - LOG.debug("Waiting to disconnect..."); - while (!client.isDisconnected()) { - try { - Thread.sleep(DISCONNECT_INTERVAL); - } catch (InterruptedException e) { - LOG.error("Aborting restart on interrupt!"); - abort = true; - } - } + LOG.info("Restarting on unexpected disconnect from Salesforce..."); + boolean abort = false; + + // wait for disconnect + LOG.debug("Waiting to disconnect..."); + while (!abort && !client.isDisconnected()) { + try { + Thread.sleep(DISCONNECT_INTERVAL); + } catch (InterruptedException e) { + LOG.error("Aborting restart on interrupt!"); + abort = true; + } - if (!abort) { + abort = isStoppingOrStopped(); + } - // update restart attempt backoff - final long backoff = restartBackoff.getAndAdd(backoffIncrement); - if (backoff > maxBackoff) { - LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff); - abort = true; - } else { + if (!abort) { - // pause before restart attempt - LOG.debug("Pausing for {} msecs before restart attempt", backoff); - try { - Thread.sleep(backoff); - } catch (InterruptedException e) { - LOG.error("Aborting restart on interrupt!"); - abort = true; - } - } + // update restart attempt backoff + final long backoff = restartBackoff.getAndAdd(backoffIncrement); + if (backoff > maxBackoff) { + LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff); + abort = true; + } else { - if (!abort) { - Exception lastError = new SalesforceException("Unknown error", null); - try { - // reset client - doStop(); + // pause before restart attempt + LOG.debug("Pausing for {} msecs before restart attempt", backoff); + try { + Thread.sleep(backoff); + } catch (InterruptedException e) { + LOG.error("Aborting restart on interrupt!"); + abort = true; + } + } - // register listeners and restart - doStart(); + if (!abort) { + Exception lastError = new SalesforceException("Unknown error", null); + try { + // reset client + doStop(); - } catch (Exception e) { - LOG.error("Error restarting: " + e.getMessage(), e); - lastError = e; - } + // register listeners and restart + doStart(); - if (client != null && client.isHandshook()) { - LOG.info("Successfully restarted!"); - // reset backoff interval - restartBackoff.set(client.getBackoffIncrement()); - } else { - LOG.error("Failed to restart after pausing for {} msecs", backoff); - if ((backoff + backoffIncrement) > maxBackoff) { - // notify all consumers - String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage(); - SalesforceException ex = new SalesforceException(abortMsg, lastError); - for (SalesforceConsumer consumer : listenerMap.keySet()) { - consumer.handleException(abortMsg, ex); - } - } + } catch (Exception e) { + LOG.error("Error restarting: " + e.getMessage(), e); + lastError = e; + } + + if (client != null && client.isHandshook()) { + LOG.info("Successfully restarted!"); + // reset backoff interval + restartBackoff.set(client.getBackoffIncrement()); + } else { + LOG.error("Failed to restart after pausing for {} msecs", backoff); + if ((backoff + backoffIncrement) > maxBackoff) { + // notify all consumers + String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage(); + SalesforceException ex = new SalesforceException(abortMsg, lastError); + for (SalesforceConsumer consumer : listenerMap.keySet()) { + consumer.handleException(abortMsg, ex); } } } - } - }); + } } @SuppressWarnings("unchecked") diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java index 0491ca7..8c01631 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java @@ -127,18 +127,18 @@ public class SubscriptionHelperIntegrationTest { server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", req -> req.contains("\"timeout\":0"), "[\n" - + " {\n" - + " \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n" - + " \"advice\": {\n" - + " \"interval\": 0,\n" - + " \"timeout\": 110000,\n" - + " \"reconnect\": \"retry\"\n" - + " },\n" - + " \"channel\": \"/meta/connect\",\n" - + " \"id\": \"$id\",\n" - + " \"successful\": true\n" - + " }\n" - + "]"); + + " {\n" + + " \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n" + + " \"advice\": {\n" + + " \"interval\": 0,\n" + + " \"timeout\": 110000,\n" + + " \"reconnect\": \"retry\"\n" + + " },\n" + + " \"channel\": \"/meta/connect\",\n" + + " \"id\": \"$id\",\n" + + " \"successful\": true\n" + + " }\n" + + "]"); server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);