Updated Branches: refs/heads/1.4.5-SNAPSHOT a40a6d423 -> 79d686faa
ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5. Fix cherry picks two commits: ACCUMULO-1379 - Adding close() to Instance to assist in freeing up resources (cherry picked from commit 7da1164d87227960d3e0cfc841f753067e2c0304) Reason: bugfix Author: John Vines <jvi...@gmail.com> Differs from original by path changes and leaving out ConditionalWriterTest, which is only in 1.6.0+ ---- ACCUMULO-1379 Fix edge cases if error in closing ZooKeeperInstance (cherry picked from commit 3f6c66ede52cb1fb5a122d7bad06d7978ff0a671) Reason: bugfix Author: Christopher Tubbs <ctubb...@apache.org> Signed-off-by: Bill Slacum <ujustgotbi...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79d686fa Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79d686fa Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79d686fa Branch: refs/heads/1.4.5-SNAPSHOT Commit: 79d686faa1e477b9cbd80c6f833ece402050b490 Parents: a40a6d4 Author: Sean Busbey <bus...@clouderagovt.com> Authored: Wed Nov 13 09:19:36 2013 -0600 Committer: Bill Slacum <ujustgotbi...@apache.org> Committed: Mon Nov 18 13:16:18 2013 -0500 ---------------------------------------------------------------------- .../apache/accumulo/core/client/Instance.java | 7 ++ .../accumulo/core/client/ZooKeeperInstance.java | 109 +++++++++++++------ .../core/client/impl/ThriftTransportPool.java | 16 ++- .../accumulo/core/client/mock/MockInstance.java | 5 + .../apache/accumulo/core/util/ThriftUtil.java | 4 + .../accumulo/core/zookeeper/ZooCache.java | 7 ++ .../accumulo/core/zookeeper/ZooReader.java | 4 + .../core/client/impl/TabletLocatorImplTest.java | 5 + .../accumulo/server/client/HdfsZooInstance.java | 9 ++ 9 files changed, 128 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java index b3d09ba..1820e7a 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java @@ -126,6 +126,13 @@ public interface Instance { * when a user's credentials are invalid */ public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException; + + /** + * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching + * stored which will enhance performance. + * @throws AccumuloException + */ + public abstract void close() throws AccumuloException; /** * Returns the AccumuloConfiguration to use when interacting with this instance. http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java index f657c07..1dae711 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.impl.ConnectorImpl; @@ -33,6 +34,7 @@ import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.OpTimer; import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooCache; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.hadoop.fs.FileStatus; @@ -57,18 +59,20 @@ import org.apache.log4j.Logger; */ public class ZooKeeperInstance implements Instance { - + private static final Logger log = Logger.getLogger(ZooKeeperInstance.class); - + private String instanceId = null; private String instanceName = null; - + private ZooCache zooCache; - + private String zooKeepers; - + private int zooKeepersSessionTimeOut; - + + private volatile boolean closed = false; + /** * * @param instanceName @@ -76,11 +80,11 @@ public class ZooKeeperInstance implements Instance { * @param zooKeepers * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port. */ - + public ZooKeeperInstance(String instanceName, String zooKeepers) { this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)); } - + /** * * @param instanceName @@ -90,7 +94,7 @@ public class ZooKeeperInstance implements Instance { * @param sessionTimeout * zoo keeper session time out in milliseconds. */ - + public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) { ArgumentChecker.notNull(instanceName, zooKeepers); this.instanceName = instanceName; @@ -98,8 +102,9 @@ public class ZooKeeperInstance implements Instance { this.zooKeepersSessionTimeOut = sessionTimeout; zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout); getInstanceID(); + clientInstances.incrementAndGet(); } - + /** * * @param instanceId @@ -107,11 +112,11 @@ public class ZooKeeperInstance implements Instance { * @param zooKeepers * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port. */ - + public ZooKeeperInstance(UUID instanceId, String zooKeepers) { this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)); } - + /** * * @param instanceId @@ -121,17 +126,20 @@ public class ZooKeeperInstance implements Instance { * @param sessionTimeout * zoo keeper session time out in milliseconds. */ - + public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) { ArgumentChecker.notNull(instanceId, zooKeepers); this.instanceId = instanceId.toString(); this.zooKeepers = zooKeepers; this.zooKeepersSessionTimeOut = sessionTimeout; zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout); + clientInstances.incrementAndGet(); } - + @Override public String getInstanceID() { + if (closed) + throw new RuntimeException("ZooKeeperInstance has been closed."); if (instanceId == null) { // want the instance id to be stable for the life of this instance object, // so only get it once @@ -143,95 +151,103 @@ public class ZooKeeperInstance implements Instance { } instanceId = new String(iidb); } - + if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) { if (instanceName == null) throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper"); throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper"); } - + return instanceId; } - + @Override public List<String> getMasterLocations() { + if (closed) + throw new RuntimeException("ZooKeeperInstance has been closed."); String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK; - + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache."); byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath); opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); - + if (loc == null) { return Collections.emptyList(); } - + return Collections.singletonList(new String(loc)); } - + @Override public String getRootTabletLocation() { + if (closed) + throw new RuntimeException("ZooKeeperInstance has been closed."); String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION; - + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper."); byte[] loc = zooCache.get(zRootLocPath); opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); - + if (loc == null) { return null; } - + return new String(loc).split("\\|")[0]; } - + @Override public String getInstanceName() { + if (closed) + throw new RuntimeException("ZooKeeperInstance has been closed."); if (instanceName == null) instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID())); - + return instanceName; } - + @Override public String getZooKeepers() { return zooKeepers; } - + @Override public int getZooKeepersSessionTimeOut() { return zooKeepersSessionTimeOut; } - + @Override public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException { return getConnector(user, TextUtil.getBytes(new Text(pass.toString()))); } - + @Override public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException { return getConnector(user, ByteBufferUtil.toBytes(pass)); } - + // Suppress deprecation, ConnectorImpl is deprecated to warn clients against using. @SuppressWarnings("deprecation") @Override public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException { + if (closed) + throw new RuntimeException("ZooKeeperInstance has been closed."); return new ConnectorImpl(this, user, pass); } - + private AccumuloConfiguration conf = null; - + @Override public AccumuloConfiguration getConfiguration() { if (conf == null) conf = AccumuloConfiguration.getDefaultConfiguration(); return conf; } - + @Override public void setConfiguration(AccumuloConfiguration conf) { this.conf = conf; } - + /** * Given a zooCache and instanceId, look up the instance name. * @@ -277,4 +293,27 @@ public class ZooKeeperInstance implements Instance { public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException { return getConnector(auth.user, auth.password); } + + static private final AtomicInteger clientInstances = new AtomicInteger(0); + + @Override + public synchronized void close() throws AccumuloException { + if (!closed && clientInstances.decrementAndGet() == 0) { + try { + zooCache.close(); + ThriftUtil.close(); + } catch (InterruptedException e) { + clientInstances.incrementAndGet(); + throw new AccumuloException("Issues closing ZooKeeper."); + } + closed = true; + } + } + + @Override + public void finalize() { + // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely. + if (!closed) + log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads."); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index ef3724b..f969f28 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@ -80,13 +80,15 @@ public class ThriftTransportPool { private static class Closer implements Runnable { ThriftTransportPool pool; + final AtomicBoolean stop; - public Closer(ThriftTransportPool pool) { + public Closer(ThriftTransportPool pool, AtomicBoolean stop) { this.pool = pool; + this.stop = stop; } public void run() { - while (true) { + while (!stop.get()) { ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>(); @@ -592,6 +594,7 @@ public class ThriftTransportPool { private static ThriftTransportPool instance = new ThriftTransportPool(); private static final AtomicBoolean daemonStarted = new AtomicBoolean(false); + private static AtomicBoolean stopDaemon; public static ThriftTransportPool getInstance() { SecurityManager sm = System.getSecurityManager(); @@ -600,8 +603,15 @@ public class ThriftTransportPool { } if (daemonStarted.compareAndSet(false, true)) { - new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start(); + stopDaemon = new AtomicBoolean(false); + new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start(); } return instance; } + + public static void close() { + if (daemonStarted.compareAndSet(true, false)) { + stopDaemon.set(true); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java index 2ff7b82..d8a15e0 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java @@ -140,4 +140,9 @@ public class MockInstance implements Instance { public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException { return getConnector(auth.user, auth.password); } + + @Override + public void close() throws AccumuloException { + // NOOP + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java index 1b1cdd7..3684ecd 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java +++ b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java @@ -165,4 +165,8 @@ public class ThriftUtil { public static TProtocolFactory protocolFactory() { return protocolFactory; } + + public static void close() { + ThriftTransportPool.close(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java index f5bdd6b..0a36923 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java +++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java @@ -307,4 +307,11 @@ public class ZooCache { return zc; } + + public void close() throws InterruptedException { + cache.clear(); + statCache.clear(); + childrenCache.clear(); + zReader.close(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java index 47663ac..1bcd22b 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java +++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java @@ -107,4 +107,8 @@ public class ZooReader implements IZooReader { public ZooReader(Instance instance) { this(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); } + + public void close() throws InterruptedException { + getZooKeeper().close(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java index 538cb6c..624a824 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java @@ -448,6 +448,11 @@ public class TabletLocatorImplTest extends TestCase { public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException { return getConnector(auth.user, auth.password); } + + @Override + public void close() throws AccumuloException { + // NOOP + } } static class TServers { http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java index e6cdb63..d68449d 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java +++ b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java @@ -177,6 +177,15 @@ public class HdfsZooInstance implements Instance { System.out.println("ZooKeepers: " + instance.getZooKeepers()); System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", ")); } + + @Override + public void close() throws AccumuloException { + try { + zooCache.close(); + } catch (InterruptedException e) { + throw new AccumuloException("Issues closing ZooKeeper, try again"); + } + } @Override public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {