Updated Branches: refs/heads/camel-2.12.x 827a1a53e -> 1135e7d4e refs/heads/master 908d7531d -> b33baa0ef
Polished camel-salesforce stop logic. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b33baa0e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b33baa0e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b33baa0e Branch: refs/heads/master Commit: b33baa0efa5dd7d75538940285cbe3b63bc8d39c Parents: 908d753 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Jan 16 12:00:36 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jan 16 12:00:36 2014 +0100 ---------------------------------------------------------------------- .../salesforce/SalesforceComponent.java | 43 ++++++++++---------- .../internal/streaming/SubscriptionHelper.java | 31 +++++--------- 2 files changed, 32 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b33baa0e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index b6302bb..3301561 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -84,7 +84,7 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin OperationName operationName = null; String topicName = null; try { - LOG.debug("Creating endpoint for ", remaining); + LOG.debug("Creating endpoint for: {}", remaining); operationName = OperationName.fromValue(remaining); } catch (IllegalArgumentException ex) { // if its not an operation name, treat is as topic name for consumer endpoints @@ -113,10 +113,22 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin return endpoint; } + private Map<String, Class<?>> parsePackages() { + Map<String, Class<?>> result = new HashMap<String, Class<?>>(); + Set<Class<?>> classes = getCamelContext().getPackageScanClassResolver(). + findImplementations(AbstractSObjectBase.class, packages); + for (Class<?> aClass : classes) { + // findImplementations also returns AbstractSObjectBase for some reason!!! + if (AbstractSObjectBase.class != aClass) { + result.put(aClass.getSimpleName(), aClass); + } + } + + return Collections.unmodifiableMap(result); + } + @Override protected void doStart() throws Exception { - super.doStart(); - // validate properties ObjectHelper.notNull(loginConfig, "loginConfig"); @@ -171,29 +183,18 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin } } - private Map<String, Class<?>> parsePackages() { - Map<String, Class<?>> result = new HashMap<String, Class<?>>(); - Set<Class<?>> classes = getCamelContext().getPackageScanClassResolver(). - findImplementations(AbstractSObjectBase.class, packages); - for (Class<?> aClass : classes) { - // findImplementations also returns AbstractSObjectBase for some reason!!! - if (AbstractSObjectBase.class != aClass) { - result.put(aClass.getSimpleName(), aClass); - } - } - - return Collections.unmodifiableMap(result); - } - @Override protected void doStop() throws Exception { - super.doStop(); + if (classMap != null) { + classMap.clear(); + } try { if (subscriptionHelper != null) { // shutdown all streaming connections // note that this is done in the component, and not in consumer ServiceHelper.stopService(subscriptionHelper); + subscriptionHelper = null; } if (session != null && session.getAccessToken() != null) { try { @@ -206,12 +207,10 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin if (httpClient != null) { // shutdown http client connections httpClient.stop(); + httpClient.destroy(); + httpClient = null; } } - - if (classMap != null) { - classMap.clear(); - } } public SubscriptionHelper getSubscriptionHelper() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/b33baa0e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 7c1aa71..7aaa086 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -26,11 +26,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import org.apache.camel.CamelException; -import org.apache.camel.Service; import org.apache.camel.component.salesforce.SalesforceComponent; import org.apache.camel.component.salesforce.SalesforceConsumer; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.component.salesforce.internal.client.SalesforceSecurityListener; +import org.apache.camel.support.ServiceSupport; import org.cometd.bayeux.Message; import org.cometd.bayeux.client.ClientSessionChannel; import org.cometd.client.BayeuxClient; @@ -50,7 +50,7 @@ import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE; import static org.cometd.bayeux.Message.ERROR_FIELD; import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD; -public class SubscriptionHelper implements Service { +public class SubscriptionHelper extends ServiceSupport { private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class); @@ -62,10 +62,10 @@ public class SubscriptionHelper implements Service { private final SalesforceComponent component; private final SalesforceSession session; private final BayeuxClient client; + private final long timeout = 60 * 1000L; private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap; - private boolean started; private ClientSessionChannel.MessageListener handshakeListener; private ClientSessionChannel.MessageListener connectListener; @@ -85,12 +85,7 @@ public class SubscriptionHelper implements Service { } @Override - public void start() throws Exception { - if (started) { - // no need to start again - return; - } - + protected void doStart() throws Exception { // listener for handshake error or exception if (handshakeListener == null) { // first start @@ -174,18 +169,16 @@ public class SubscriptionHelper implements Service { String.format("Handshake request timeout after %s seconds", CONNECT_TIMEOUT)); } } - - started = true; } @Override - public void stop() { - if (started) { - started = false; - // TODO find and log any disconnect errors - client.disconnect(); - client.getChannel(META_CONNECT).removeListener(connectListener); - client.getChannel(META_HANDSHAKE).removeListener(handshakeListener); + protected void doStop() throws Exception { + client.getChannel(META_CONNECT).removeListener(connectListener); + client.getChannel(META_HANDSHAKE).removeListener(handshakeListener); + + boolean disconnected = client.disconnect(timeout); + if (!disconnected) { + LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(), timeout); } } @@ -226,9 +219,7 @@ public class SubscriptionHelper implements Service { }; BayeuxClient client = new BayeuxClient(getEndpointUrl(), transport); - client.setDebugEnabled(false); - return client; }