Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 1c07ce242 -> 3e5ef769e


CAMEL-10238: Refactored to handle handshake failure, added increment and max 
backoff properties

Conflicts:
        
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
        
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
        
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
        
components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e5ef769
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e5ef769
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e5ef769

Branch: refs/heads/camel-2.17.x
Commit: 3e5ef769e2d8d13338729566fdb128955b4f94bf
Parents: 1c07ce2
Author: Dhiraj Bokde <dhira...@yahoo.com>
Authored: Mon Aug 29 22:32:56 2016 -0700
Committer: Dhiraj Bokde <dhira...@yahoo.com>
Committed: Mon Aug 29 22:45:22 2016 -0700

----------------------------------------------------------------------
 .../salesforce/SalesforceEndpointConfig.java    |  36 +++-
 .../internal/streaming/SubscriptionHelper.java  | 165 +++++++++++--------
 2 files changed, 129 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3e5ef769/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 2bb6306..92ca010 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -73,6 +73,10 @@ public class SalesforceEndpointConfig implements Cloneable {
     // default maximum authentication retries on failed authentication or 
expired session
     public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4;
 
+    // default increment and limit for Streaming connection restart attempts
+    public static final long DEFAULT_BACKOFF_INCREMENT = 1000L;
+    public static final long DEFAULT_MAX_BACKOFF = 30000L;
+
     // general properties
     @UriParam
     private String apiVersion = DEFAULT_VERSION;
@@ -145,6 +149,14 @@ public class SalesforceEndpointConfig implements Cloneable 
{
     @UriParam
     private SalesforceHttpClient httpClient;
 
+    // Streaming connection restart attempt backoff interval increment
+    @UriParam
+    private long backoffIncrement = DEFAULT_BACKOFF_INCREMENT;
+
+    // Streaming connection restart attempt maximum backoff interval
+    @UriParam
+    private long maxBackoff = DEFAULT_MAX_BACKOFF;
+
     public SalesforceEndpointConfig copy() {
         try {
             final SalesforceEndpointConfig copy = (SalesforceEndpointConfig) 
super.clone();
@@ -485,6 +497,28 @@ public class SalesforceEndpointConfig implements Cloneable 
{
         return httpClient;
     }
 
+    public long getBackoffIncrement() {
+        return backoffIncrement;
+    }
+
+    /**
+     * Backoff interval increment for Streaming connection restart attempts 
for failures beyond CometD auto-reconnect.
+     */
+    public void setBackoffIncrement(long backoffIncrement) {
+        this.backoffIncrement = backoffIncrement;
+    }
+
+    public long getMaxBackoff() {
+        return maxBackoff;
+    }
+
+    /**
+     * Maximum backoff interval for Streaming connection restart attempts for 
failures beyond CometD auto-reconnect.
+     */
+    public void setMaxBackoff(long maxBackoff) {
+        this.maxBackoff = maxBackoff;
+    }
+
     public Map<String, Object> toValueMap() {
 
         final Map<String, Object> valueMap = new HashMap<String, Object>();
@@ -511,7 +545,7 @@ public class SalesforceEndpointConfig implements Cloneable {
         valueMap.put(JOB_ID, jobId);
         valueMap.put(BATCH_ID, batchId);
         valueMap.put(RESULT_ID, resultId);
-        
+
         // add analytics API properties
         valueMap.put(REPORT_ID, reportId);
         valueMap.put(INCLUDE_DETAILS, includeDetails);

http://git-wip-us.apache.org/repos/asf/camel/blob/3e5ef769/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 1e3e8a4..cbe84ab 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -66,6 +67,8 @@ public class SubscriptionHelper extends ServiceSupport {
     private final long timeout = 60 * 1000L;
 
     private final Map<SalesforceConsumer, 
ClientSessionChannel.MessageListener> listenerMap;
+    private final long maxBackoff;
+    private final long backoffIncrement;
 
     private ClientSessionChannel.MessageListener handshakeListener;
     private ClientSessionChannel.MessageListener connectListener;
@@ -77,6 +80,7 @@ public class SubscriptionHelper extends ServiceSupport {
     private volatile Exception connectException;
 
     private volatile boolean reconnecting;
+    private final AtomicLong restartBackoff;
 
     public SubscriptionHelper(SalesforceComponent component) throws Exception {
         this.component = component;
@@ -86,6 +90,10 @@ public class SubscriptionHelper extends ServiceSupport {
 
         // create CometD client
         this.client = createClient();
+
+        restartBackoff = new AtomicLong(0);
+        backoffIncrement = component.getConfig().getBackoffIncrement();
+        maxBackoff = component.getConfig().getMaxBackoff();
     }
 
     @Override
@@ -108,6 +116,10 @@ public class SubscriptionHelper extends ServiceSupport {
                         LOG.warn("Handshake failure: {}", message);
                         handshakeError = (String) message.get(ERROR_FIELD);
                         handshakeException = getFailure(message);
+
+                        // restart if handshake fails for any reason
+                        restartClient();
+
                     } else if (!listenerMap.isEmpty()) {
                         reconnecting = true;
                     }
@@ -129,12 +141,12 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectException = getFailure(message);
 
                         if (connectError != null) {
-                            // refresh oauth token, if it's a 401 error
-                            if (connectError.startsWith("401::")) {
+                            // refresh oauth token, if it's a 403 error
+                            if (connectError.startsWith("403::")) {
                                 try {
                                     session.login(null);
                                 } catch (SalesforceException e) {
-                                    LOG.error("Error renewing OAuth token on 
Connect 401: {} ", e.getMessage(), e);
+                                    LOG.error("Error renewing OAuth token on 
Connect 403: " + e.getMessage(), e);
                                 }
                             }
                         }
@@ -166,74 +178,7 @@ public class SubscriptionHelper extends ServiceSupport {
             disconnectListener = new ClientSessionChannel.MessageListener() {
                 @Override
                 public void onMessage(ClientSessionChannel 
clientSessionChannel, Message message) {
-
-                    // launch an async task to reconnect
-                    final SalesforceHttpClient httpClient = 
component.getConfig().getHttpClient();
-
-                    httpClient.getExecutor().execute(new Runnable() {
-                        @Override
-                        public void run() {
-
-                            boolean abort = false;
-                            // wait for disconnect
-                            while (!client.isDisconnected()) {
-                                try {
-                                    Thread.sleep(DISCONNECT_INTERVAL);
-                                } catch (InterruptedException e) {
-                                    LOG.error("Aborting reconnect on 
interrupt!");
-                                    abort = true;
-                                }
-                            }
-
-                            if (!abort) {
-
-                                LOG.info("Reconnecting on unexpected 
disconnect from Salesforce...");
-                                final long backoffIncrement = 
client.getBackoffIncrement();
-                                final long maxBackoff = client.getMaxBackoff();
-
-                                long backoff = backoffIncrement;
-                                String msg = String.format("Failed to 
reconnect, exceeded maximum backoff %s msecs", maxBackoff);
-                                Exception lastError = new 
SalesforceException(msg, null);
-
-                                // retry until interrupted, or handshook or 
connect backoff exceeded
-                                while (!abort && !client.isHandshook() && 
backoff < maxBackoff) {
-
-                                    try {
-                                        // reset client
-                                        doStop();
-
-                                        // register listeners and restart
-                                        doStart();
-
-                                    } catch (Exception e) {
-                                        LOG.error("Error reconnecting to 
Salesforce: {}", e.getMessage(), e);
-                                        lastError = e;
-                                    }
-
-                                    if (!client.isHandshook()) {
-                                        LOG.debug("Pausing for {} msecs after 
reconnect failure", backoff);
-                                        try {
-                                            Thread.sleep(backoff);
-                                        } catch (InterruptedException e) {
-                                            LOG.error("Aborting reconnect on 
interrupt!");
-                                            abort = true;
-                                        }
-                                        backoff += backoffIncrement;
-                                    }
-                                }
-
-                                if (client.isHandshook()) {
-                                    LOG.info("Successfully reconnected to 
Salesforce!");
-                                } else if (!abort) {
-                                    // notify all consumers
-                                    String abortMsg = "Aborting Salesforce 
reconnect due to: " + lastError.getMessage();
-                                    for (SalesforceConsumer consumer : 
listenerMap.keySet()) {
-                                        consumer.handleException(abortMsg, new 
SalesforceException(abortMsg, lastError));
-                                    }
-                                }
-                            }
-                        }
-                    });
+                    restartClient();
                 }
             };
         }
@@ -263,6 +208,84 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
+    // launch an async task to restart
+    private void restartClient() {
+
+        // launch a new restart command
+        final SalesforceHttpClient httpClient = 
component.getConfig().getHttpClient();
+        httpClient.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+
+                LOG.info("Restarting on unexpected disconnect from 
Salesforce...");
+                boolean abort = false;
+
+                // wait for disconnect
+                LOG.debug("Waiting to disconnect...");
+                while (!client.isDisconnected()) {
+                    try {
+                        Thread.sleep(DISCONNECT_INTERVAL);
+                    } catch (InterruptedException e) {
+                        LOG.error("Aborting restart on interrupt!");
+                        abort = true;
+                    }
+                }
+
+                if (!abort) {
+
+                    // update restart attempt backoff
+                    final long backoff = 
restartBackoff.getAndAdd(backoffIncrement);
+                    if (backoff > maxBackoff) {
+                        LOG.error("Restart aborted after exceeding {} msecs 
backoff", maxBackoff);
+                        abort = true;
+                    } else {
+
+                        // pause before restart attempt
+                        LOG.debug("Pausing for {} msecs before restart 
attempt", backoff);
+                        try {
+                            Thread.sleep(backoff);
+                        } catch (InterruptedException e) {
+                            LOG.error("Aborting restart on interrupt!");
+                            abort = true;
+                        }
+                    }
+
+                    if (!abort) {
+                        Exception lastError = new SalesforceException("Unknown 
error", null);
+                        try {
+                            // reset client
+                            doStop();
+
+                            // register listeners and restart
+                            doStart();
+
+                        } catch (Exception e) {
+                            LOG.error("Error restarting: " + e.getMessage(), 
e);
+                            lastError = e;
+                        }
+
+                        if (client.isHandshook()) {
+                            LOG.info("Successfully restarted!");
+                            // reset backoff interval
+                            restartBackoff.set(client.getBackoffIncrement());
+                        } else {
+                            LOG.error("Failed to restart after pausing for {} 
msecs", backoff);
+                            if ((backoff + backoffIncrement) > maxBackoff) {
+                                // notify all consumers
+                                String abortMsg = "Aborting restart attempt 
due to: " + lastError.getMessage();
+                                SalesforceException ex = new 
SalesforceException(abortMsg, lastError);
+                                for (SalesforceConsumer consumer : 
listenerMap.keySet()) {
+                                    consumer.handleException(abortMsg, ex);
+                                }
+                            }
+                        }
+                    }
+                }
+
+            }
+        });
+    }
+
     @SuppressWarnings("unchecked")
     private Exception getFailure(Message message) {
         Exception exception = null;

Reply via email to