ACCUMULO-2128 added utility to cleanup accumulo static resources Signed-off-by: Keith Turner <ktur...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/715825b3 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/715825b3 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/715825b3 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 715825b3299fd742d8570971a7f271178b932812 Parents: 71f150a Author: Keith Turner <ktur...@apache.org> Authored: Thu Jan 2 21:03:55 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Jan 6 20:17:58 2014 -0500 ---------------------------------------------------------------------- .../core/client/impl/ThriftTransportPool.java | 102 +++++++++++++++---- .../org/apache/accumulo/core/util/CleanUp.java | 35 +++++++ .../accumulo/core/zookeeper/ZooSession.java | 23 ++++- 3 files changed, 140 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index ef3724b..7468051 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -53,6 +54,8 @@ public class ThriftTransportPool { private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>(); private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>(); private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>(); + + private CountDownLatch closerExitLatch; private static final Logger log = Logger.getLogger(ThriftTransportPool.class); @@ -78,20 +81,26 @@ public class ThriftTransportPool { long lastReturnTime; } + public static class TransportPoolShutdownException extends RuntimeException { + private static final long serialVersionUID = 1L; + } + private static class Closer implements Runnable { ThriftTransportPool pool; - - public Closer(ThriftTransportPool pool) { + private CountDownLatch closerExitLatch; + + public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) { this.pool = pool; + this.closerExitLatch = closerExitLatch; } - public void run() { + private void closeConnections() { while (true) { ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>(); synchronized (pool) { - for (List<CachedConnection> ccl : pool.cache.values()) { + for (List<CachedConnection> ccl : pool.getCache().values()) { Iterator<CachedConnection> iter = ccl.iterator(); while (iter.hasNext()) { CachedConnection cachedConnection = iter.next(); @@ -103,7 +112,7 @@ public class ThriftTransportPool { } } - for (List<CachedConnection> ccl : pool.cache.values()) { + for (List<CachedConnection> ccl : pool.getCache().values()) { for (CachedConnection cachedConnection : ccl) { cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD); } @@ -132,6 +141,15 @@ public class ThriftTransportPool { } } } + + public void run() { + try { + closeConnections(); + } catch (TransportPoolShutdownException e) { + } finally { + closerExitLatch.countDown(); + } + } } static class CachedTTransport extends TTransport { @@ -384,14 +402,14 @@ public class ThriftTransportPool { synchronized (this) { // randomly pick a server from the connection cache - serversSet.retainAll(cache.keySet()); + serversSet.retainAll(getCache().keySet()); if (serversSet.size() > 0) { ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet); Collections.shuffle(cachedServers, random); for (ThriftTransportKey ttk : cachedServers) { - for (CachedConnection cachedConnection : cache.get(ttk)) { + for (CachedConnection cachedConnection : getCache().get(ttk)) { if (!cachedConnection.isReserved()) { cachedConnection.setReserved(true); if (log.isTraceEnabled()) @@ -411,7 +429,7 @@ public class ThriftTransportPool { if (!preferCachedConnection) { synchronized (this) { - List<CachedConnection> cachedConnList = cache.get(ttk); + List<CachedConnection> cachedConnList = getCache().get(ttk); if (cachedConnList != null) { for (CachedConnection cachedConnection : cachedConnList) { if (!cachedConnection.isReserved()) { @@ -444,11 +462,11 @@ public class ThriftTransportPool { private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException { synchronized (this) { // atomically reserve location if it exist in cache - List<CachedConnection> ccl = cache.get(cacheKey); + List<CachedConnection> ccl = getCache().get(cacheKey); if (ccl == null) { ccl = new LinkedList<CachedConnection>(); - cache.put(cacheKey, ccl); + getCache().put(cacheKey, ccl); } for (CachedConnection cachedConnection : ccl) { @@ -486,15 +504,20 @@ public class ThriftTransportPool { CachedConnection cc = new CachedConnection(tsc); cc.setReserved(true); - synchronized (this) { - List<CachedConnection> ccl = cache.get(cacheKey); + try { + synchronized (this) { + List<CachedConnection> ccl = getCache().get(cacheKey); + + if (ccl == null) { + ccl = new LinkedList<CachedConnection>(); + getCache().put(cacheKey, ccl); + } - if (ccl == null) { - ccl = new LinkedList<CachedConnection>(); - cache.put(cacheKey, ccl); + ccl.add(cc); } - - ccl.add(cc); + } catch (TransportPoolShutdownException e) { + cc.transport.close(); + throw e; } return cc.transport; } @@ -510,7 +533,7 @@ public class ThriftTransportPool { ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>(); synchronized (this) { - List<CachedConnection> ccl = cache.get(ctsc.getCacheKey()); + List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey()); for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) { CachedConnection cachedConnection = iterator.next(); if (cachedConnection.transport == tsc) { @@ -600,8 +623,49 @@ public class ThriftTransportPool { } if (daemonStarted.compareAndSet(false, true)) { - new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start(); + CountDownLatch closerExitLatch = new CountDownLatch(1); + new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start(); + instance.setCloserExitLatch(closerExitLatch); } return instance; } + + private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) { + this.closerExitLatch = closerExitLatch; + } + + public void shutdown() { + synchronized (this) { + if (cache == null) + return; + + // close any connections in the pool... even ones that are in use + for (List<CachedConnection> ccl : getCache().values()) { + Iterator<CachedConnection> iter = ccl.iterator(); + while (iter.hasNext()) { + CachedConnection cc = iter.next(); + try { + cc.transport.close(); + } catch (Exception e) { + log.debug("Error closing transport during shutdown", e); + } + } + } + + // this will render the pool unusable and cause the background thread to exit + this.cache = null; + } + + try { + closerExitLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private Map<ThriftTransportKey,List<CachedConnection>> getCache() { + if (cache == null) + throw new TransportPoolShutdownException(); + return cache; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java new file mode 100644 index 0000000..ba02f0b --- /dev/null +++ b/src/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.zookeeper.ZooSession; + +/** + * + */ +public class CleanUp { + /** + * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader. + */ + public static void shutdownNow() { + ThriftTransportPool.getInstance().shutdown(); + ZooSession.shutdown(); + // need to get code from jared w + // waitForZooKeeperClientThreads(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/715825b3/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java index b3db26f..e64f0c5 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java +++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java @@ -29,8 +29,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; -class ZooSession { +public class ZooSession { + public static class ZooSessionShutdownException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + } + private static final Logger log = Logger.getLogger(ZooSession.class); private static class ZooSessionInfo { @@ -114,6 +120,9 @@ class ZooSession { public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String auth) { + if (sessions == null) + throw new ZooSessionShutdownException(); + String sessionKey = sessionKey(zooKeepers, timeout, auth); // a read-only session can use a session with authorizations, so cache a copy for it w/out auths @@ -137,4 +146,16 @@ class ZooSession { } return zsi.zooKeeper; } + + public static synchronized void shutdown() { + for (ZooSessionInfo zsi : sessions.values()) { + try { + zsi.zooKeeper.close(); + } catch (Exception e) { + log.debug("Error closing zookeeper during shutdown", e); + } + } + + sessions = null; + } }