Repository: camel Updated Branches: refs/heads/master c5b6f9bed -> 3ac630272
CAMEL-10238: Updated subscription helper to listen for hard disconnects, and reconnect from scratch, also updated error handling throwing SalesforceException from consumer endpoints Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3ac63027 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3ac63027 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3ac63027 Branch: refs/heads/master Commit: 3ac630272a622cc6c7df95af3dbb2611031c9b93 Parents: 54c78e5 Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Thu Aug 25 01:20:07 2016 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Thu Aug 25 01:20:42 2016 -0700 ---------------------------------------------------------------------- .../salesforce/SalesforceConsumer.java | 10 +- .../internal/streaming/SubscriptionHelper.java | 237 +++++++++++++------ 2 files changed, 172 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3ac63027/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java index 83772e6..df29dd8 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java @@ -26,7 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.api.utils.JsonUtils; import org.apache.camel.component.salesforce.internal.client.DefaultRestClient; import org.apache.camel.component.salesforce.internal.client.RestClient; @@ -185,7 +185,7 @@ public class SalesforceConsumer extends DefaultConsumer { } catch (IOException e) { final String msg = String.format("Error parsing message [%s] from Topic %s: %s", message, topicName, e.getMessage()); - handleException(msg, new RuntimeCamelException(msg, e)); + handleException(msg, new SalesforceException(msg, e)); } try { @@ -199,11 +199,13 @@ public class SalesforceConsumer extends DefaultConsumer { } }); } catch (Exception e) { - handleException(String.format("Error processing %s: %s", exchange, e.getMessage()), e); + String msg = String.format("Error processing %s: %s", exchange, e); + handleException(msg, new SalesforceException(msg, e)); } finally { Exception ex = exchange.getException(); if (ex != null) { - handleException(String.format("Unhandled exception: %s", ex.getMessage()), ex); + String msg = String.format("Unhandled exception: %s", ex.getMessage()); + handleException(msg, new SalesforceException(msg, ex)); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/3ac63027/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index a501187..c9a98ee 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -30,6 +30,7 @@ import org.apache.camel.CamelException; import org.apache.camel.component.salesforce.SalesforceComponent; import org.apache.camel.component.salesforce.SalesforceConsumer; import org.apache.camel.component.salesforce.SalesforceHttpClient; +import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.support.ServiceSupport; import org.cometd.bayeux.Message; @@ -45,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.cometd.bayeux.Channel.META_CONNECT; +import static org.cometd.bayeux.Channel.META_DISCONNECT; import static org.cometd.bayeux.Channel.META_HANDSHAKE; import static org.cometd.bayeux.Channel.META_SUBSCRIBE; import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE; @@ -58,7 +60,9 @@ public class SubscriptionHelper extends ServiceSupport { private static final int CONNECT_TIMEOUT = 110; private static final int CHANNEL_TIMEOUT = 40; + private static final String FAILURE_FIELD = "failure"; private static final String EXCEPTION_FIELD = "exception"; + private static final int DISCONNECT_INTERVAL = 5000; private final SalesforceComponent component; private final SalesforceSession session; @@ -69,11 +73,14 @@ public class SubscriptionHelper extends ServiceSupport { private ClientSessionChannel.MessageListener handshakeListener; private ClientSessionChannel.MessageListener connectListener; + private ClientSessionChannel.MessageListener disconnectListener; - private String handshakeError; - private Exception handshakeException; - private String connectError; - private boolean reconnecting; + private volatile String handshakeError; + private volatile Exception handshakeException; + private volatile String connectError; + private volatile Exception connectException; + + private volatile boolean reconnecting; public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception { this.component = component; @@ -87,6 +94,13 @@ public class SubscriptionHelper extends ServiceSupport { @Override protected void doStart() throws Exception { + + // reset all error conditions + handshakeError = null; + handshakeException = null; + connectError = null; + connectException = null; + // listener for handshake error or exception if (handshakeListener == null) { // first start @@ -95,14 +109,9 @@ public class SubscriptionHelper extends ServiceSupport { LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message); if (!message.isSuccessful()) { - String error = (String) message.get(ERROR_FIELD); - if (error != null) { - handshakeError = error; - } - Exception exception = (Exception) message.get(EXCEPTION_FIELD); - if (exception != null) { - handshakeException = exception; - } + LOG.warn("Handshake failure: {}", message); + handshakeError = (String) message.get(ERROR_FIELD); + handshakeException = getFailure(message); } else if (!listenerMap.isEmpty()) { reconnecting = true; } @@ -118,10 +127,22 @@ public class SubscriptionHelper extends ServiceSupport { LOG.debug("[CHANNEL:META_CONNECT]: {}", message); if (!message.isSuccessful()) { - String error = (String) message.get(ERROR_FIELD); - if (error != null) { - connectError = error; + + LOG.warn("Connect failure: {}", message); + connectError = (String) message.get(ERROR_FIELD); + connectException = getFailure(message); + + if (connectError != null) { + // refresh oauth token, if it's a 401 error + if (connectError.startsWith("401::")) { + try { + session.login(null); + } catch (SalesforceException e) { + LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e); + } + } } + } else if (reconnecting) { reconnecting = false; @@ -135,15 +156,7 @@ public class SubscriptionHelper extends ServiceSupport { for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) { final SalesforceConsumer consumer = entry.getKey(); final String topicName = consumer.getTopicName(); - try { - subscribe(topicName, consumer); - } catch (CamelException e) { - // let the consumer handle the exception - consumer.handleException( - String.format("Error refreshing subscription to topic [%s]: %s", - topicName, e.getMessage()), - e); - } + subscribe(topicName, consumer); } } @@ -152,6 +165,84 @@ public class SubscriptionHelper extends ServiceSupport { } client.getChannel(META_CONNECT).addListener(connectListener); + // handle fatal disconnects by reconnecting asynchronously + if (disconnectListener == null) { + disconnectListener = new ClientSessionChannel.MessageListener() { + @Override + public void onMessage(ClientSessionChannel clientSessionChannel, Message message) { + + // launch an async task to reconnect + final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); + + httpClient.getExecutor().execute(new Runnable() { + @Override + public void run() { + + boolean abort = false; + // wait for disconnect + while (!client.isDisconnected()) { + try { + Thread.sleep(DISCONNECT_INTERVAL); + } catch (InterruptedException e) { + LOG.error("Aborting reconnect on interrupt!"); + abort = true; + } + } + + if (!abort) { + + LOG.info("Reconnecting on unexpected disconnect from Salesforce..."); + final long backoffIncrement = client.getBackoffIncrement(); + final long maxBackoff = client.getMaxBackoff(); + + long backoff = backoffIncrement; + String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff); + Exception lastError = new SalesforceException(msg, null); + + // retry until interrupted, or handshook or connect backoff exceeded + while (!abort && !client.isHandshook() && backoff < maxBackoff) { + + try { + // reset client + doStop(); + + // register listeners and restart + doStart(); + + } catch (Exception e) { + LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e); + lastError = e; + } + + if (!client.isHandshook()) { + LOG.debug("Pausing for {} msecs after reconnect failure", backoff); + try { + Thread.sleep(backoff); + } catch (InterruptedException e) { + LOG.error("Aborting reconnect on interrupt!"); + abort = true; + } + backoff += backoffIncrement; + } + } + + if (client.isHandshook()) { + LOG.info("Successfully reconnected to Salesforce!"); + } else if (!abort) { + // notify all consumers + String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage(); + for (SalesforceConsumer consumer : listenerMap.keySet()) { + consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError)); + } + } + } + } + }); + } + }; + } + client.getChannel(META_DISCONNECT).addListener(disconnectListener); + // connect to Salesforce cometd endpoint client.handshake(); @@ -163,6 +254,10 @@ public class SubscriptionHelper extends ServiceSupport { handshakeException); } else if (handshakeError != null) { throw new CamelException(String.format("Error during HANDSHAKE: %s", handshakeError)); + } else if (connectException != null) { + throw new CamelException( + String.format("Exception during CONNECT: %s", connectException.getMessage()), + connectException); } else if (connectError != null) { throw new CamelException(String.format("Error during CONNECT: %s", connectError)); } else { @@ -172,8 +267,20 @@ public class SubscriptionHelper extends ServiceSupport { } } + @SuppressWarnings("unchecked") + private Exception getFailure(Message message) { + Exception exception = null; + if (message.get(EXCEPTION_FIELD) != null) { + exception = (Exception) message.get(EXCEPTION_FIELD); + } else if (message.get(FAILURE_FIELD) != null) { + exception = (Exception) ((Map<String, Object>)message.get("failure")).get("exception"); + } + return exception; + } + @Override protected void doStop() throws Exception { + client.getChannel(META_DISCONNECT).removeListener(disconnectListener); client.getChannel(META_CONNECT).removeListener(connectListener); client.getChannel(META_HANDSHAKE).removeListener(handshakeListener); @@ -202,7 +309,8 @@ public class SubscriptionHelper extends ServiceSupport { super.customize(request); // add current security token obtained from session - request.header(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken()); + // replace old token + request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken()); } }; @@ -233,7 +341,7 @@ public class SubscriptionHelper extends ServiceSupport { return client; } - public void subscribe(final String topicName, final SalesforceConsumer consumer) throws CamelException { + public void subscribe(final String topicName, final SalesforceConsumer consumer) { // create subscription for consumer final String channelName = getChannelName(topicName); @@ -252,9 +360,7 @@ public class SubscriptionHelper extends ServiceSupport { final ClientSessionChannel clientChannel = client.getChannel(channelName); - // listener for subscribe error - final CountDownLatch latch = new CountDownLatch(1); - final String[] subscribeError = {null}; + // listener for subscription final ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener() { public void onMessage(ClientSessionChannel channel, Message message) { LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message); @@ -263,45 +369,28 @@ public class SubscriptionHelper extends ServiceSupport { if (!message.isSuccessful()) { String error = (String) message.get(ERROR_FIELD); - if (error != null) { - subscribeError[0] = error; + if (error == null) { + error = "Missing error message"; } + Exception failure = getFailure(message); + String msg = String.format("Error subscribing to %s: %s", topicName, + failure != null ? failure.getMessage() : error); + consumer.handleException(msg, new SalesforceException(msg, failure)); } else { // remember subscription LOG.info("Subscribed to channel {}", subscribedChannelName); + listenerMap.put(consumer, listener); } - latch.countDown(); + + // remove this subscription listener + client.getChannel(META_SUBSCRIBE).removeListener(this); } } }; client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener); - try { - clientChannel.subscribe(listener); - - // confirm that a subscription was created - try { - if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) { - String message; - if (subscribeError[0] != null) { - message = String.format("Error subscribing to topic %s: %s", - topicName, subscribeError[0]); - } else { - message = String.format("Timeout error subscribing to topic %s after %s seconds", - topicName, CHANNEL_TIMEOUT); - } - throw new CamelException(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // probably shutting down, so forget subscription - } - - listenerMap.put(consumer, listener); - - } finally { - client.getChannel(META_SUBSCRIBE).removeListener(subscriptionListener); - } + // subscribe asynchronously + clientChannel.subscribe(listener); } private String getChannelName(String topicName) { @@ -316,22 +405,25 @@ public class SubscriptionHelper extends ServiceSupport { // listen for unsubscribe error final CountDownLatch latch = new CountDownLatch(1); final String[] unsubscribeError = {null}; + final Exception[] unsubscribeFailure = {null}; + final ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener() { public void onMessage(ClientSessionChannel channel, Message message) { LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message); - String unsubscribedChannelName = message.get(SUBSCRIPTION_FIELD).toString(); - if (channelName.equals(unsubscribedChannelName)) { - - if (!message.isSuccessful()) { - String error = (String) message.get(ERROR_FIELD); - if (error != null) { - unsubscribeError[0] = error; + Object subscription = message.get(SUBSCRIPTION_FIELD); + if (subscription != null) { + String unsubscribedChannelName = subscription.toString(); + if (channelName.equals(unsubscribedChannelName)) { + + if (!message.isSuccessful()) { + unsubscribeError[0] = (String) message.get(ERROR_FIELD); + unsubscribeFailure[0] = getFailure(message); + } else { + // forget subscription + LOG.info("Unsubscribed from channel {}", unsubscribedChannelName); } - } else { - // forget subscription - LOG.info("Unsubscribed from channel {}", unsubscribedChannelName); + latch.countDown(); } - latch.countDown(); } } }; @@ -350,14 +442,17 @@ public class SubscriptionHelper extends ServiceSupport { try { if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) { String message; - if (unsubscribeError[0] != null) { + if (unsubscribeFailure[0] != null) { + message = String.format("Error unsubscribing from topic %s: %s", + topicName, unsubscribeFailure[0].getMessage()); + } else if (unsubscribeError[0] != null) { message = String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeError[0]); } else { message = String.format("Timeout error unsubscribing from topic %s after %s seconds", topicName, CHANNEL_TIMEOUT); } - throw new CamelException(message); + throw new CamelException(message, unsubscribeFailure[0]); } } catch (InterruptedException e) { Thread.currentThread().interrupt();