ZooKeeper should close connection when stopping producer/consumer. Fixes #72.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bc582896 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bc582896 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bc582896 Branch: refs/heads/master Commit: bc58289663cbdf2286fea926fd33579b8811a055 Parents: 249972f Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Dec 30 17:04:18 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 30 17:04:18 2015 +0100 ---------------------------------------------------------------------- .../component/zookeeper/ConnectionHolder.java | 1 + .../zookeeper/ZooKeeperConnectionManager.java | 3 ++- .../component/zookeeper/ZooKeeperConsumer.java | 19 ++++++++----------- .../component/zookeeper/ZookeeperProducer.java | 12 +----------- 4 files changed, 12 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java index 2e69e31..bc92b99 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java @@ -87,6 +87,7 @@ public class ConnectionHolder implements Watcher { try { if (zookeeper != null) { zookeeper.close(); + zookeeper = null; } if (LOG.isDebugEnabled()) { LOG.debug("Shutting down connection to Zookeeper cluster {}", configuration.getConnectString()); http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java index cabafdf..3b1ab7f 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java @@ -50,7 +50,7 @@ public class ZooKeeperConnectionManager { public DefaultZookeeperConnectionStrategy(ZooKeeperEndpoint endpoint) { this.configuration = endpoint.getConfiguration(); - LOG.debug("Creating connection with static configuration of {}", configuration); + LOG.debug("Creating connection to ZooKeeper: {}", configuration); holder = new ConnectionHolder(configuration); } @@ -59,6 +59,7 @@ public class ZooKeeperConnectionManager { } public void shutdown() { + LOG.debug("Shutting down connection to ZooKeeper: {}", configuration); holder.closeConnection(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java index 7fe40d5..bb9507f 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java @@ -42,36 +42,32 @@ import org.apache.zookeeper.ZooKeeper; @SuppressWarnings("rawtypes") public class ZooKeeperConsumer extends DefaultConsumer { - private ZooKeeperConnectionManager connectionManager; - + private final ZooKeeperConnectionManager zkm; private ZooKeeper connection; - private ZooKeeperConfiguration configuration; - private LinkedBlockingQueue<ZooKeeperOperation> operations = new LinkedBlockingQueue<ZooKeeperOperation>(); - - private boolean shuttingDown; - private ExecutorService executor; + private volatile boolean shuttingDown; public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) { super(endpoint, processor); - this.connectionManager = endpoint.getConnectionManager(); + this.zkm = endpoint.getConnectionManager(); this.configuration = endpoint.getConfiguration(); } @Override protected void doStart() throws Exception { super.doStart(); - connection = connectionManager.getConnection(); + connection = zkm.getConnection(); if (log.isDebugEnabled()) { log.debug(String.format("Connected to Zookeeper cluster %s", configuration.getConnectString())); } initializeConsumer(); executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(configuration.getPath(), "Camel-Zookeeper Ops executor", 1); + OperationsExecutor opsService = new OperationsExecutor(); - executor.execute(opsService); + executor.submit(opsService); } @Override @@ -81,7 +77,8 @@ public class ZooKeeperConsumer extends DefaultConsumer { if (log.isTraceEnabled()) { log.trace(String.format("Shutting down zookeeper consumer of '%s'", configuration.getPath())); } - executor.shutdown(); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor); + zkm.shutdown(); } private void initializeConsumer() { http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java index aaf368b..2280049 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java @@ -52,10 +52,8 @@ public class ZookeeperProducer extends DefaultProducer { public static final String ZK_OPERATION_WRITE = "WRITE"; public static final String ZK_OPERATION_DELETE = "DELETE"; - private ZooKeeperConfiguration configuration; - + private final ZooKeeperConfiguration configuration; private ZooKeeperConnectionManager zkm; - private ZooKeeper connection; public ZookeeperProducer(ZooKeeperEndpoint endpoint) { @@ -117,14 +115,6 @@ public class ZookeeperProducer extends DefaultProducer { if (log.isTraceEnabled()) { log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath())); } - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - if (log.isTraceEnabled()) { - log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath())); - } zkm.shutdown(); }