Repository: camel Updated Branches: refs/heads/camel-2.17.x 1c07ce242 -> 3e5ef769e
CAMEL-10238: Refactored to handle handshake failure, added increment and max backoff properties Conflicts: components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e5ef769 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e5ef769 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e5ef769 Branch: refs/heads/camel-2.17.x Commit: 3e5ef769e2d8d13338729566fdb128955b4f94bf Parents: 1c07ce2 Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Mon Aug 29 22:32:56 2016 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Mon Aug 29 22:45:22 2016 -0700 ---------------------------------------------------------------------- .../salesforce/SalesforceEndpointConfig.java | 36 +++- .../internal/streaming/SubscriptionHelper.java | 165 +++++++++++-------- 2 files changed, 129 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3e5ef769/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java index 2bb6306..92ca010 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java @@ -73,6 +73,10 @@ public class SalesforceEndpointConfig implements Cloneable { // default maximum authentication retries on failed authentication or expired session public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4; + // default increment and limit for Streaming connection restart attempts + public static final long DEFAULT_BACKOFF_INCREMENT = 1000L; + public static final long DEFAULT_MAX_BACKOFF = 30000L; + // general properties @UriParam private String apiVersion = DEFAULT_VERSION; @@ -145,6 +149,14 @@ public class SalesforceEndpointConfig implements Cloneable { @UriParam private SalesforceHttpClient httpClient; + // Streaming connection restart attempt backoff interval increment + @UriParam + private long backoffIncrement = DEFAULT_BACKOFF_INCREMENT; + + // Streaming connection restart attempt maximum backoff interval + @UriParam + private long maxBackoff = DEFAULT_MAX_BACKOFF; + public SalesforceEndpointConfig copy() { try { final SalesforceEndpointConfig copy = (SalesforceEndpointConfig) super.clone(); @@ -485,6 +497,28 @@ public class SalesforceEndpointConfig implements Cloneable { return httpClient; } + public long getBackoffIncrement() { + return backoffIncrement; + } + + /** + * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect. + */ + public void setBackoffIncrement(long backoffIncrement) { + this.backoffIncrement = backoffIncrement; + } + + public long getMaxBackoff() { + return maxBackoff; + } + + /** + * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect. + */ + public void setMaxBackoff(long maxBackoff) { + this.maxBackoff = maxBackoff; + } + public Map<String, Object> toValueMap() { final Map<String, Object> valueMap = new HashMap<String, Object>(); @@ -511,7 +545,7 @@ public class SalesforceEndpointConfig implements Cloneable { valueMap.put(JOB_ID, jobId); valueMap.put(BATCH_ID, batchId); valueMap.put(RESULT_ID, resultId); - + // add analytics API properties valueMap.put(REPORT_ID, reportId); valueMap.put(INCLUDE_DETAILS, includeDetails); http://git-wip-us.apache.org/repos/asf/camel/blob/3e5ef769/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 1e3e8a4..cbe84ab 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -66,6 +67,8 @@ public class SubscriptionHelper extends ServiceSupport { private final long timeout = 60 * 1000L; private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap; + private final long maxBackoff; + private final long backoffIncrement; private ClientSessionChannel.MessageListener handshakeListener; private ClientSessionChannel.MessageListener connectListener; @@ -77,6 +80,7 @@ public class SubscriptionHelper extends ServiceSupport { private volatile Exception connectException; private volatile boolean reconnecting; + private final AtomicLong restartBackoff; public SubscriptionHelper(SalesforceComponent component) throws Exception { this.component = component; @@ -86,6 +90,10 @@ public class SubscriptionHelper extends ServiceSupport { // create CometD client this.client = createClient(); + + restartBackoff = new AtomicLong(0); + backoffIncrement = component.getConfig().getBackoffIncrement(); + maxBackoff = component.getConfig().getMaxBackoff(); } @Override @@ -108,6 +116,10 @@ public class SubscriptionHelper extends ServiceSupport { LOG.warn("Handshake failure: {}", message); handshakeError = (String) message.get(ERROR_FIELD); handshakeException = getFailure(message); + + // restart if handshake fails for any reason + restartClient(); + } else if (!listenerMap.isEmpty()) { reconnecting = true; } @@ -129,12 +141,12 @@ public class SubscriptionHelper extends ServiceSupport { connectException = getFailure(message); if (connectError != null) { - // refresh oauth token, if it's a 401 error - if (connectError.startsWith("401::")) { + // refresh oauth token, if it's a 403 error + if (connectError.startsWith("403::")) { try { session.login(null); } catch (SalesforceException e) { - LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e); + LOG.error("Error renewing OAuth token on Connect 403: " + e.getMessage(), e); } } } @@ -166,74 +178,7 @@ public class SubscriptionHelper extends ServiceSupport { disconnectListener = new ClientSessionChannel.MessageListener() { @Override public void onMessage(ClientSessionChannel clientSessionChannel, Message message) { - - // launch an async task to reconnect - final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); - - httpClient.getExecutor().execute(new Runnable() { - @Override - public void run() { - - boolean abort = false; - // wait for disconnect - while (!client.isDisconnected()) { - try { - Thread.sleep(DISCONNECT_INTERVAL); - } catch (InterruptedException e) { - LOG.error("Aborting reconnect on interrupt!"); - abort = true; - } - } - - if (!abort) { - - LOG.info("Reconnecting on unexpected disconnect from Salesforce..."); - final long backoffIncrement = client.getBackoffIncrement(); - final long maxBackoff = client.getMaxBackoff(); - - long backoff = backoffIncrement; - String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff); - Exception lastError = new SalesforceException(msg, null); - - // retry until interrupted, or handshook or connect backoff exceeded - while (!abort && !client.isHandshook() && backoff < maxBackoff) { - - try { - // reset client - doStop(); - - // register listeners and restart - doStart(); - - } catch (Exception e) { - LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e); - lastError = e; - } - - if (!client.isHandshook()) { - LOG.debug("Pausing for {} msecs after reconnect failure", backoff); - try { - Thread.sleep(backoff); - } catch (InterruptedException e) { - LOG.error("Aborting reconnect on interrupt!"); - abort = true; - } - backoff += backoffIncrement; - } - } - - if (client.isHandshook()) { - LOG.info("Successfully reconnected to Salesforce!"); - } else if (!abort) { - // notify all consumers - String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage(); - for (SalesforceConsumer consumer : listenerMap.keySet()) { - consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError)); - } - } - } - } - }); + restartClient(); } }; } @@ -263,6 +208,84 @@ public class SubscriptionHelper extends ServiceSupport { } } + // launch an async task to restart + private void restartClient() { + + // launch a new restart command + final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); + httpClient.getExecutor().execute(new Runnable() { + @Override + public void run() { + + LOG.info("Restarting on unexpected disconnect from Salesforce..."); + boolean abort = false; + + // wait for disconnect + LOG.debug("Waiting to disconnect..."); + while (!client.isDisconnected()) { + try { + Thread.sleep(DISCONNECT_INTERVAL); + } catch (InterruptedException e) { + LOG.error("Aborting restart on interrupt!"); + abort = true; + } + } + + if (!abort) { + + // update restart attempt backoff + final long backoff = restartBackoff.getAndAdd(backoffIncrement); + if (backoff > maxBackoff) { + LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff); + abort = true; + } else { + + // pause before restart attempt + LOG.debug("Pausing for {} msecs before restart attempt", backoff); + try { + Thread.sleep(backoff); + } catch (InterruptedException e) { + LOG.error("Aborting restart on interrupt!"); + abort = true; + } + } + + if (!abort) { + Exception lastError = new SalesforceException("Unknown error", null); + try { + // reset client + doStop(); + + // register listeners and restart + doStart(); + + } catch (Exception e) { + LOG.error("Error restarting: " + e.getMessage(), e); + lastError = e; + } + + if (client.isHandshook()) { + LOG.info("Successfully restarted!"); + // reset backoff interval + restartBackoff.set(client.getBackoffIncrement()); + } else { + LOG.error("Failed to restart after pausing for {} msecs", backoff); + if ((backoff + backoffIncrement) > maxBackoff) { + // notify all consumers + String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage(); + SalesforceException ex = new SalesforceException(abortMsg, lastError); + for (SalesforceConsumer consumer : listenerMap.keySet()) { + consumer.handleException(abortMsg, ex); + } + } + } + } + } + + } + }); + } + @SuppressWarnings("unchecked") private Exception getFailure(Message message) { Exception exception = null;