ZooKeeper should close connection when stopping producer/consumer. Fixes #72.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bc582896
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bc582896
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bc582896

Branch: refs/heads/master
Commit: bc58289663cbdf2286fea926fd33579b8811a055
Parents: 249972f
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Dec 30 17:04:18 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Dec 30 17:04:18 2015 +0100

----------------------------------------------------------------------
 .../component/zookeeper/ConnectionHolder.java    |  1 +
 .../zookeeper/ZooKeeperConnectionManager.java    |  3 ++-
 .../component/zookeeper/ZooKeeperConsumer.java   | 19 ++++++++-----------
 .../component/zookeeper/ZookeeperProducer.java   | 12 +-----------
 4 files changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
index 2e69e31..bc92b99 100644
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
+++ 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
@@ -87,6 +87,7 @@ public class ConnectionHolder implements Watcher {
         try {
             if (zookeeper != null) {
                 zookeeper.close();
+                zookeeper = null;
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Shutting down connection to Zookeeper cluster {}", 
configuration.getConnectString());

http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
index cabafdf..3b1ab7f 100644
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
+++ 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
@@ -50,7 +50,7 @@ public class ZooKeeperConnectionManager {
 
         public DefaultZookeeperConnectionStrategy(ZooKeeperEndpoint endpoint) {
             this.configuration = endpoint.getConfiguration();
-            LOG.debug("Creating connection with static configuration of {}", 
configuration);
+            LOG.debug("Creating connection to ZooKeeper: {}", configuration);
             holder = new ConnectionHolder(configuration);
         }
 
@@ -59,6 +59,7 @@ public class ZooKeeperConnectionManager {
         }
 
         public void shutdown() {
+            LOG.debug("Shutting down connection to ZooKeeper: {}", 
configuration);
             holder.closeConnection();
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index 7fe40d5..bb9507f 100644
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++ 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -42,36 +42,32 @@ import org.apache.zookeeper.ZooKeeper;
 @SuppressWarnings("rawtypes")
 public class ZooKeeperConsumer extends DefaultConsumer {
 
-    private ZooKeeperConnectionManager connectionManager;
-
+    private final ZooKeeperConnectionManager zkm;
     private ZooKeeper connection;
-
     private ZooKeeperConfiguration configuration;
-
     private LinkedBlockingQueue<ZooKeeperOperation> operations = new 
LinkedBlockingQueue<ZooKeeperOperation>();
-
-    private boolean shuttingDown;
-
     private ExecutorService executor;
+    private volatile boolean shuttingDown;
 
     public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.connectionManager = endpoint.getConnectionManager();
+        this.zkm = endpoint.getConnectionManager();
         this.configuration = endpoint.getConfiguration();
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        connection = connectionManager.getConnection();
+        connection = zkm.getConnection();
         if (log.isDebugEnabled()) {
             log.debug(String.format("Connected to Zookeeper cluster %s", 
configuration.getConnectString()));
         }
 
         initializeConsumer();
         executor = 
getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(configuration.getPath(),
 "Camel-Zookeeper Ops executor", 1);
+
         OperationsExecutor opsService = new OperationsExecutor();
-        executor.execute(opsService);
+        executor.submit(opsService);
     }
 
     @Override
@@ -81,7 +77,8 @@ public class ZooKeeperConsumer extends DefaultConsumer {
         if (log.isTraceEnabled()) {
             log.trace(String.format("Shutting down zookeeper consumer of 
'%s'", configuration.getPath()));
         }
-        executor.shutdown();
+        
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
+        zkm.shutdown();
     }
 
     private void initializeConsumer() {

http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index aaf368b..2280049 100644
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -52,10 +52,8 @@ public class ZookeeperProducer extends DefaultProducer {
     public static final String ZK_OPERATION_WRITE  = "WRITE";
     public static final String ZK_OPERATION_DELETE = "DELETE";
 
-    private ZooKeeperConfiguration configuration;
-
+    private final ZooKeeperConfiguration configuration;
     private ZooKeeperConnectionManager zkm;
-    
     private ZooKeeper connection;
 
     public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
@@ -117,14 +115,6 @@ public class ZookeeperProducer extends DefaultProducer {
         if (log.isTraceEnabled()) {
             log.trace(String.format("Shutting down zookeeper producer of 
'%s'", configuration.getPath()));
         }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        if (log.isTraceEnabled()) {
-            log.trace(String.format("Shutting down zookeeper producer of 
'%s'", configuration.getPath()));
-        }
         zkm.shutdown();
     }
 

Reply via email to