Repository: camel Updated Branches: refs/heads/master c07557607 -> 0b15168cd
CAMEL-10238: Refactored to handle handshake failure, added increment and max backoff properties Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d97e5b5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d97e5b5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d97e5b5 Branch: refs/heads/master Commit: 7d97e5b5800969b74e805e66a94a6c5323bfeafc Parents: c075576 Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Mon Aug 29 22:32:56 2016 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Mon Aug 29 22:32:56 2016 -0700 ---------------------------------------------------------------------- .../salesforce/SalesforceComponent.java | 24 +++ .../salesforce/SalesforceEndpointConfig.java | 36 +++- .../internal/streaming/SubscriptionHelper.java | 165 +++++++++++-------- .../src/test/resources/log4j2.properties | 5 + 4 files changed, 158 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index 76f7012..7b7b81a 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -1060,4 +1060,28 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) { getConfigOrCreate().setInitialReplayIdMap(initialReplayIdMap); } + + public long getBackoffIncrement() { + return getConfigOrCreate().getBackoffIncrement(); + } + + /** + * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect. + * @param backoffIncrement + */ + public void setBackoffIncrement(long backoffIncrement) { + getConfigOrCreate().setBackoffIncrement(backoffIncrement); + } + + public long getMaxBackoff() { + return getConfigOrCreate().getMaxBackoff(); + } + + /** + * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect. + * @param maxBackoff + */ + public void setMaxBackoff(long maxBackoff) { + getConfigOrCreate().setMaxBackoff(maxBackoff); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java index 39d0d06..ad18aab 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java @@ -79,6 +79,10 @@ public class SalesforceEndpointConfig implements Cloneable { // default maximum authentication retries on failed authentication or expired session public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4; + // default increment and limit for Streaming connection restart attempts + public static final long DEFAULT_BACKOFF_INCREMENT = 1000L; + public static final long DEFAULT_MAX_BACKOFF = 30000L; + // general properties @UriParam private String apiVersion = DEFAULT_VERSION; @@ -161,6 +165,14 @@ public class SalesforceEndpointConfig implements Cloneable { @UriParam private ObjectMapper objectMapper; + // Streaming connection restart attempt backoff interval increment + @UriParam + private long backoffIncrement = DEFAULT_BACKOFF_INCREMENT; + + // Streaming connection restart attempt maximum backoff interval + @UriParam + private long maxBackoff = DEFAULT_MAX_BACKOFF; + public SalesforceEndpointConfig copy() { try { final SalesforceEndpointConfig copy = (SalesforceEndpointConfig) super.clone(); @@ -505,6 +517,28 @@ public class SalesforceEndpointConfig implements Cloneable { return objectMapper; } + public long getBackoffIncrement() { + return backoffIncrement; + } + + /** + * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect. + */ + public void setBackoffIncrement(long backoffIncrement) { + this.backoffIncrement = backoffIncrement; + } + + public long getMaxBackoff() { + return maxBackoff; + } + + /** + * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect. + */ + public void setMaxBackoff(long maxBackoff) { + this.maxBackoff = maxBackoff; + } + /** * Custom Jackson ObjectMapper to use when serializing/deserializing Salesforce objects. */ @@ -538,7 +572,7 @@ public class SalesforceEndpointConfig implements Cloneable { valueMap.put(JOB_ID, jobId); valueMap.put(BATCH_ID, batchId); valueMap.put(RESULT_ID, resultId); - + // add analytics API properties valueMap.put(REPORT_ID, reportId); valueMap.put(INCLUDE_DETAILS, includeDetails); http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index c9a98ee..ed4c152 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -70,6 +71,8 @@ public class SubscriptionHelper extends ServiceSupport { private final long timeout = 60 * 1000L; private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap; + private final long maxBackoff; + private final long backoffIncrement; private ClientSessionChannel.MessageListener handshakeListener; private ClientSessionChannel.MessageListener connectListener; @@ -81,6 +84,7 @@ public class SubscriptionHelper extends ServiceSupport { private volatile Exception connectException; private volatile boolean reconnecting; + private final AtomicLong restartBackoff; public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception { this.component = component; @@ -90,6 +94,10 @@ public class SubscriptionHelper extends ServiceSupport { // create CometD client this.client = createClient(topicName); + + restartBackoff = new AtomicLong(0); + backoffIncrement = component.getConfig().getBackoffIncrement(); + maxBackoff = component.getConfig().getMaxBackoff(); } @Override @@ -112,6 +120,10 @@ public class SubscriptionHelper extends ServiceSupport { LOG.warn("Handshake failure: {}", message); handshakeError = (String) message.get(ERROR_FIELD); handshakeException = getFailure(message); + + // restart if handshake fails for any reason + restartClient(); + } else if (!listenerMap.isEmpty()) { reconnecting = true; } @@ -133,12 +145,12 @@ public class SubscriptionHelper extends ServiceSupport { connectException = getFailure(message); if (connectError != null) { - // refresh oauth token, if it's a 401 error - if (connectError.startsWith("401::")) { + // refresh oauth token, if it's a 403 error + if (connectError.startsWith("403::")) { try { session.login(null); } catch (SalesforceException e) { - LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e); + LOG.error("Error renewing OAuth token on Connect 403: " + e.getMessage(), e); } } } @@ -170,74 +182,7 @@ public class SubscriptionHelper extends ServiceSupport { disconnectListener = new ClientSessionChannel.MessageListener() { @Override public void onMessage(ClientSessionChannel clientSessionChannel, Message message) { - - // launch an async task to reconnect - final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); - - httpClient.getExecutor().execute(new Runnable() { - @Override - public void run() { - - boolean abort = false; - // wait for disconnect - while (!client.isDisconnected()) { - try { - Thread.sleep(DISCONNECT_INTERVAL); - } catch (InterruptedException e) { - LOG.error("Aborting reconnect on interrupt!"); - abort = true; - } - } - - if (!abort) { - - LOG.info("Reconnecting on unexpected disconnect from Salesforce..."); - final long backoffIncrement = client.getBackoffIncrement(); - final long maxBackoff = client.getMaxBackoff(); - - long backoff = backoffIncrement; - String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff); - Exception lastError = new SalesforceException(msg, null); - - // retry until interrupted, or handshook or connect backoff exceeded - while (!abort && !client.isHandshook() && backoff < maxBackoff) { - - try { - // reset client - doStop(); - - // register listeners and restart - doStart(); - - } catch (Exception e) { - LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e); - lastError = e; - } - - if (!client.isHandshook()) { - LOG.debug("Pausing for {} msecs after reconnect failure", backoff); - try { - Thread.sleep(backoff); - } catch (InterruptedException e) { - LOG.error("Aborting reconnect on interrupt!"); - abort = true; - } - backoff += backoffIncrement; - } - } - - if (client.isHandshook()) { - LOG.info("Successfully reconnected to Salesforce!"); - } else if (!abort) { - // notify all consumers - String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage(); - for (SalesforceConsumer consumer : listenerMap.keySet()) { - consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError)); - } - } - } - } - }); + restartClient(); } }; } @@ -267,6 +212,84 @@ public class SubscriptionHelper extends ServiceSupport { } } + // launch an async task to restart + private void restartClient() { + + // launch a new restart command + final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); + httpClient.getExecutor().execute(new Runnable() { + @Override + public void run() { + + LOG.info("Restarting on unexpected disconnect from Salesforce..."); + boolean abort = false; + + // wait for disconnect + LOG.debug("Waiting to disconnect..."); + while (!client.isDisconnected()) { + try { + Thread.sleep(DISCONNECT_INTERVAL); + } catch (InterruptedException e) { + LOG.error("Aborting restart on interrupt!"); + abort = true; + } + } + + if (!abort) { + + // update restart attempt backoff + final long backoff = restartBackoff.getAndAdd(backoffIncrement); + if (backoff > maxBackoff) { + LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff); + abort = true; + } else { + + // pause before restart attempt + LOG.debug("Pausing for {} msecs before restart attempt", backoff); + try { + Thread.sleep(backoff); + } catch (InterruptedException e) { + LOG.error("Aborting restart on interrupt!"); + abort = true; + } + } + + if (!abort) { + Exception lastError = new SalesforceException("Unknown error", null); + try { + // reset client + doStop(); + + // register listeners and restart + doStart(); + + } catch (Exception e) { + LOG.error("Error restarting: " + e.getMessage(), e); + lastError = e; + } + + if (client.isHandshook()) { + LOG.info("Successfully restarted!"); + // reset backoff interval + restartBackoff.set(client.getBackoffIncrement()); + } else { + LOG.error("Failed to restart after pausing for {} msecs", backoff); + if ((backoff + backoffIncrement) > maxBackoff) { + // notify all consumers + String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage(); + SalesforceException ex = new SalesforceException(abortMsg, lastError); + for (SalesforceConsumer consumer : listenerMap.keySet()) { + consumer.handleException(abortMsg, ex); + } + } + } + } + } + + } + }); + } + @SuppressWarnings("unchecked") private Exception getFailure(Message message) { Exception exception = null; http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties b/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties index ba27a06..4b5b208 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties +++ b/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties @@ -26,3 +26,8 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = INFO rootLogger.appenderRef.file.ref = file + +logger.salesforce.name = org.apache.camel.component.salesforce +logger.salesforce.level = TRACE +#logger.httpclient.name = org.eclipse.jetty +#logger.httpclient.level = DEBUG \ No newline at end of file