Polished camel-salesforce stop logic.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1135e7d4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1135e7d4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1135e7d4

Branch: refs/heads/camel-2.12.x
Commit: 1135e7d4ee95da53c3cea0cd3633ded960e304e8
Parents: 827a1a5
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Jan 16 12:00:36 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Jan 16 12:01:04 2014 +0100

----------------------------------------------------------------------
 .../salesforce/SalesforceComponent.java         | 43 ++++++++++----------
 .../internal/streaming/SubscriptionHelper.java  | 31 +++++---------
 2 files changed, 32 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1135e7d4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index b6302bb..3301561 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -84,7 +84,7 @@ public class SalesforceComponent extends UriEndpointComponent 
implements Endpoin
         OperationName operationName = null;
         String topicName = null;
         try {
-            LOG.debug("Creating endpoint for ", remaining);
+            LOG.debug("Creating endpoint for: {}", remaining);
             operationName = OperationName.fromValue(remaining);
         } catch (IllegalArgumentException ex) {
             // if its not an operation name, treat is as topic name for 
consumer endpoints
@@ -113,10 +113,22 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
         return endpoint;
     }
 
+    private Map<String, Class<?>> parsePackages() {
+        Map<String, Class<?>> result = new HashMap<String, Class<?>>();
+        Set<Class<?>> classes = 
getCamelContext().getPackageScanClassResolver().
+                findImplementations(AbstractSObjectBase.class, packages);
+        for (Class<?> aClass : classes) {
+            // findImplementations also returns AbstractSObjectBase for some 
reason!!!
+            if (AbstractSObjectBase.class != aClass) {
+                result.put(aClass.getSimpleName(), aClass);
+            }
+        }
+
+        return Collections.unmodifiableMap(result);
+    }
+
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
-
         // validate properties
         ObjectHelper.notNull(loginConfig, "loginConfig");
 
@@ -171,29 +183,18 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
         }
     }
 
-    private Map<String, Class<?>> parsePackages() {
-        Map<String, Class<?>> result = new HashMap<String, Class<?>>();
-        Set<Class<?>> classes = 
getCamelContext().getPackageScanClassResolver().
-            findImplementations(AbstractSObjectBase.class, packages);
-        for (Class<?> aClass : classes) {
-            // findImplementations also returns AbstractSObjectBase for some 
reason!!!
-            if (AbstractSObjectBase.class != aClass) {
-                result.put(aClass.getSimpleName(), aClass);
-            }
-        }
-
-        return Collections.unmodifiableMap(result);
-    }
-
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
+        if (classMap != null) {
+            classMap.clear();
+        }
 
         try {
             if (subscriptionHelper != null) {
                 // shutdown all streaming connections
                 // note that this is done in the component, and not in consumer
                 ServiceHelper.stopService(subscriptionHelper);
+                subscriptionHelper = null;
             }
             if (session != null && session.getAccessToken() != null) {
                 try {
@@ -206,12 +207,10 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
             if (httpClient != null) {
                 // shutdown http client connections
                 httpClient.stop();
+                httpClient.destroy();
+                httpClient = null;
             }
         }
-
-        if (classMap != null) {
-            classMap.clear();
-        }
     }
 
     public SubscriptionHelper getSubscriptionHelper() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/1135e7d4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 7c1aa71..7aaa086 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -26,11 +26,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import org.apache.camel.CamelException;
-import org.apache.camel.Service;
 import org.apache.camel.component.salesforce.SalesforceComponent;
 import org.apache.camel.component.salesforce.SalesforceConsumer;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import 
org.apache.camel.component.salesforce.internal.client.SalesforceSecurityListener;
+import org.apache.camel.support.ServiceSupport;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
 import org.cometd.client.BayeuxClient;
@@ -50,7 +50,7 @@ import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE;
 import static org.cometd.bayeux.Message.ERROR_FIELD;
 import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
-public class SubscriptionHelper implements Service {
+public class SubscriptionHelper extends ServiceSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionHelper.class);
 
@@ -62,10 +62,10 @@ public class SubscriptionHelper implements Service {
     private final SalesforceComponent component;
     private final SalesforceSession session;
     private final BayeuxClient client;
+    private final long timeout = 60 * 1000L;
 
     private final Map<SalesforceConsumer, 
ClientSessionChannel.MessageListener> listenerMap;
 
-    private boolean started;
     private ClientSessionChannel.MessageListener handshakeListener;
     private ClientSessionChannel.MessageListener connectListener;
 
@@ -85,12 +85,7 @@ public class SubscriptionHelper implements Service {
     }
 
     @Override
-    public void start() throws Exception {
-        if (started) {
-            // no need to start again
-            return;
-        }
-
+    protected void doStart() throws Exception {
         // listener for handshake error or exception
         if (handshakeListener == null) {
             // first start
@@ -174,18 +169,16 @@ public class SubscriptionHelper implements Service {
                         String.format("Handshake request timeout after %s 
seconds", CONNECT_TIMEOUT));
             }
         }
-
-        started = true;
     }
 
     @Override
-    public void stop() {
-        if (started) {
-            started = false;
-            // TODO find and log any disconnect errors
-            client.disconnect();
-            client.getChannel(META_CONNECT).removeListener(connectListener);
-            
client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+    protected void doStop() throws Exception {
+        client.getChannel(META_CONNECT).removeListener(connectListener);
+        client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+
+        boolean disconnected = client.disconnect(timeout);
+        if (!disconnected) {
+            LOG.warn("Could not disconnect client connected to: {} after: {} 
msec.", getEndpointUrl(), timeout);
         }
     }
 
@@ -226,9 +219,7 @@ public class SubscriptionHelper implements Service {
         };
 
         BayeuxClient client = new BayeuxClient(getEndpointUrl(), transport);
-
         client.setDebugEnabled(false);
-
         return client;
     }
 

Reply via email to