Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/400b991f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/400b991f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/400b991f Branch: refs/heads/master Commit: 400b991fb80114f6672e18496f1b2359b2e22c3d Parents: a91ee4d 8f9fe41 Author: Keith Turner <ktur...@apache.org> Authored: Mon Jan 6 20:48:26 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Jan 6 20:48:26 2014 -0500 ---------------------------------------------------------------------- .../core/client/impl/ThriftTransportPool.java | 100 +++++++++--- .../org/apache/accumulo/core/util/CleanUp.java | 63 ++++++++ .../accumulo/fate/zookeeper/ZooSession.java | 23 ++- .../accumulo/test/functional/CleanUpTest.java | 153 +++++++++++++++++++ test/system/auto/simple/cleanup.py | 30 ++++ 5 files changed, 350 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index ceeab21,0000000..f123289 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@@ -1,607 -1,0 +1,671 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.SecurityPermission; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; ++import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.TTimeoutTransport; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class ThriftTransportPool { + private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission"); + + private static final Random random = new Random(); + private long killTime = 1000 * 3; + + private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>(); + private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>(); + private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>(); + private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>(); ++ ++ private CountDownLatch closerExitLatch; + + private static final Logger log = Logger.getLogger(ThriftTransportPool.class); + + private static final Long ERROR_THRESHOLD = 20l; + private static final int STUCK_THRESHOLD = 2 * 60 * 1000; + + private static class CachedConnection { + + public CachedConnection(CachedTTransport t) { + this.transport = t; + } + + void setReserved(boolean reserved) { + this.transport.setReserved(reserved); + } + + boolean isReserved() { + return this.transport.reserved; + } + + CachedTTransport transport; + + long lastReturnTime; + } + ++ public static class TransportPoolShutdownException extends RuntimeException { ++ private static final long serialVersionUID = 1L; ++ } ++ + private static class Closer implements Runnable { + final ThriftTransportPool pool; ++ private CountDownLatch closerExitLatch; + - public Closer(ThriftTransportPool pool) { ++ public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) { + this.pool = pool; ++ this.closerExitLatch = closerExitLatch; + } + - public void run() { ++ private void closeConnections() { + while (true) { + + ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>(); + + synchronized (pool) { - for (List<CachedConnection> ccl : pool.cache.values()) { ++ for (List<CachedConnection> ccl : pool.getCache().values()) { + Iterator<CachedConnection> iter = ccl.iterator(); + while (iter.hasNext()) { + CachedConnection cachedConnection = iter.next(); + + if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) { + connectionsToClose.add(cachedConnection); + iter.remove(); + } + } + } + - for (List<CachedConnection> ccl : pool.cache.values()) { ++ for (List<CachedConnection> ccl : pool.getCache().values()) { + for (CachedConnection cachedConnection : ccl) { + cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD); + } + } + + Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator(); + while (iter.hasNext()) { + Entry<ThriftTransportKey,Long> entry = iter.next(); + long delta = System.currentTimeMillis() - entry.getValue(); + if (delta >= STUCK_THRESHOLD) { + pool.errorCount.remove(entry.getKey()); + iter.remove(); + } + } + } + + // close connections outside of sync block + for (CachedConnection cachedConnection : connectionsToClose) { + cachedConnection.transport.close(); + } + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } ++ ++ public void run() { ++ try { ++ closeConnections(); ++ } catch (TransportPoolShutdownException e) { ++ } finally { ++ closerExitLatch.countDown(); ++ } ++ } + } + + static class CachedTTransport extends TTransport { + + private ThriftTransportKey cacheKey; + private TTransport wrappedTransport; + private boolean sawError = false; + + private volatile String ioThreadName = null; + private volatile long ioStartTime = 0; + private volatile boolean reserved = false; + + private String stuckThreadName = null; + + int ioCount = 0; + int lastIoCount = -1; + + private void sawError(Exception e) { + sawError = true; + } + + final void setReserved(boolean reserved) { + this.reserved = reserved; + if (reserved) { + ioThreadName = Thread.currentThread().getName(); + ioCount = 0; + lastIoCount = -1; + } else { + if ((ioCount & 1) == 1) { + // connection unreserved, but it seems io may still be + // happening + log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(), + new Exception()); + } + + ioCount = 0; + lastIoCount = -1; + ioThreadName = null; + } + checkForStuckIO(STUCK_THRESHOLD); + } + + final void checkForStuckIO(long threshold) { + /* + * checking for stuck io needs to be light weight. + * + * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to + * incrementing a counter before and after each io operation. + */ + + if ((ioCount & 1) == 1) { + // when ioCount is odd, it means I/O is currently happening + if (ioCount == lastIoCount) { + // still doing same I/O operation as last time this + // functions was called + long delta = System.currentTimeMillis() - ioStartTime; + if (delta >= threshold && stuckThreadName == null) { + stuckThreadName = ioThreadName; + log.warn("Thread \"" + ioThreadName + "\" stuck on IO to " + cacheKey + " for at least " + delta + " ms"); + } + } else { + // remember this ioCount and the time we saw it, need to see + // if it changes + lastIoCount = ioCount; + ioStartTime = System.currentTimeMillis(); + + if (stuckThreadName != null) { + // doing I/O, but ioCount changed so no longer stuck + log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO to " + cacheKey + " sawError = " + sawError); + stuckThreadName = null; + } + } + } else { + // I/O is not currently happening + if (stuckThreadName != null) { + // no longer stuck, and was stuck in the past + log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO to " + cacheKey + " sawError = " + sawError); + stuckThreadName = null; + } + } + } + + public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) { + this.wrappedTransport = transport; + this.cacheKey = cacheKey2; + } + + public boolean isOpen() { + return wrappedTransport.isOpen(); + } + + public void open() throws TTransportException { + try { + ioCount++; + wrappedTransport.open(); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public int read(byte[] arg0, int arg1, int arg2) throws TTransportException { + try { + ioCount++; + return wrappedTransport.read(arg0, arg1, arg2); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException { + try { + ioCount++; + return wrappedTransport.readAll(arg0, arg1, arg2); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public void write(byte[] arg0, int arg1, int arg2) throws TTransportException { + try { + ioCount++; + wrappedTransport.write(arg0, arg1, arg2); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public void write(byte[] arg0) throws TTransportException { + try { + ioCount++; + wrappedTransport.write(arg0); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public void close() { + try { + ioCount++; + wrappedTransport.close(); + } finally { + ioCount++; + } + + } + + public void flush() throws TTransportException { + try { + ioCount++; + wrappedTransport.flush(); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public boolean peek() { + try { + ioCount++; + return wrappedTransport.peek(); + } finally { + ioCount++; + } + } + + public byte[] getBuffer() { + try { + ioCount++; + return wrappedTransport.getBuffer(); + } finally { + ioCount++; + } + } + + public int getBufferPosition() { + try { + ioCount++; + return wrappedTransport.getBufferPosition(); + } finally { + ioCount++; + } + } + + public int getBytesRemainingInBuffer() { + try { + ioCount++; + return wrappedTransport.getBytesRemainingInBuffer(); + } finally { + ioCount++; + } + } + + public void consumeBuffer(int len) { + try { + ioCount++; + wrappedTransport.consumeBuffer(len); + } finally { + ioCount++; + } + } + + public ThriftTransportKey getCacheKey() { + return cacheKey; + } + + } + + private ThriftTransportPool() {} + + public TTransport getTransport(String location, int port) throws TTransportException { + return getTransport(location, port, 0); + } + + public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException { + return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + } + + public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException { + return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout); + } + + public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException { + return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + } + + Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException { + + servers = new ArrayList<ThriftTransportKey>(servers); + + if (preferCachedConnection) { + HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers); + + synchronized (this) { + + // randomly pick a server from the connection cache - serversSet.retainAll(cache.keySet()); ++ serversSet.retainAll(getCache().keySet()); + + if (serversSet.size() > 0) { + ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet); + Collections.shuffle(cachedServers, random); + + for (ThriftTransportKey ttk : cachedServers) { - for (CachedConnection cachedConnection : cache.get(ttk)) { ++ for (CachedConnection cachedConnection : getCache().get(ttk)) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort()); + return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + } + } + } + } + } + } + + int retryCount = 0; + while (servers.size() > 0 && retryCount < 10) { + int index = random.nextInt(servers.size()); + ThriftTransportKey ttk = servers.get(index); + + if (!preferCachedConnection) { + synchronized (this) { - List<CachedConnection> cachedConnList = cache.get(ttk); ++ List<CachedConnection> cachedConnList = getCache().get(ttk); + if (cachedConnList != null) { + for (CachedConnection cachedConnection : cachedConnList) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout()); + return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + } + } + } + } + } + + try { + return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk)); + } catch (TTransportException tte) { + log.debug("Failed to connect to " + servers.get(index), tte); + servers.remove(index); + retryCount++; + } + } + + throw new TTransportException("Failed to connect to a server"); + } + + public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException { + return getTransport(new ThriftTransportKey(location, port, milliseconds)); + } + + private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException { + synchronized (this) { + // atomically reserve location if it exist in cache - List<CachedConnection> ccl = cache.get(cacheKey); ++ List<CachedConnection> ccl = getCache().get(cacheKey); + + if (ccl == null) { + ccl = new LinkedList<CachedConnection>(); - cache.put(cacheKey, ccl); ++ getCache().put(cacheKey, ccl); + } + + for (CachedConnection cachedConnection : ccl) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort()); + return cachedConnection.transport; + } + } + } + + return createNewTransport(cacheKey); + } + + private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException { + TTransport transport; + if (cacheKey.getTimeout() == 0) { + transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort()); + } else { + try { + transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout()); + } catch (IOException ex) { + throw new TTransportException(ex); + } + } + transport = ThriftUtil.transportFactory().getTransport(transport); + transport.open(); + + if (log.isTraceEnabled()) + log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort()); + + CachedTTransport tsc = new CachedTTransport(transport, cacheKey); + + CachedConnection cc = new CachedConnection(tsc); + cc.setReserved(true); + - synchronized (this) { - List<CachedConnection> ccl = cache.get(cacheKey); ++ try { ++ synchronized (this) { ++ List<CachedConnection> ccl = getCache().get(cacheKey); ++ ++ if (ccl == null) { ++ ccl = new LinkedList<CachedConnection>(); ++ getCache().put(cacheKey, ccl); ++ } + - if (ccl == null) { - ccl = new LinkedList<CachedConnection>(); - cache.put(cacheKey, ccl); ++ ccl.add(cc); + } - - ccl.add(cc); ++ } catch (TransportPoolShutdownException e) { ++ cc.transport.close(); ++ throw e; + } + return cc.transport; + } + + public void returnTransport(TTransport tsc) { + if (tsc == null) { + return; + } + + boolean existInCache = false; + CachedTTransport ctsc = (CachedTTransport) tsc; + + ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>(); + + synchronized (this) { - List<CachedConnection> ccl = cache.get(ctsc.getCacheKey()); ++ List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey()); + for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) { + CachedConnection cachedConnection = iterator.next(); + if (cachedConnection.transport == tsc) { + if (ctsc.sawError) { + closeList.add(cachedConnection); + iterator.remove(); + + if (log.isTraceEnabled()) + log.trace("Returned connection had error " + ctsc.getCacheKey()); + + Long ecount = errorCount.get(ctsc.getCacheKey()); + if (ecount == null) + ecount = 0l; + ecount++; + errorCount.put(ctsc.getCacheKey(), ecount); + + Long etime = errorTime.get(ctsc.getCacheKey()); + if (etime == null) { + errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis()); + } + + if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) { + log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore "); + serversWarnedAbout.add(ctsc.getCacheKey()); + } + + cachedConnection.setReserved(false); + + } else { + + if (log.isTraceEnabled()) + log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount); + + cachedConnection.lastReturnTime = System.currentTimeMillis(); + cachedConnection.setReserved(false); + } + existInCache = true; + break; + } + } + + // remove all unreserved cached connection when a sever has an error, not just the connection that was returned + if (ctsc.sawError) { + for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) { + CachedConnection cachedConnection = iterator.next(); + if (!cachedConnection.isReserved()) { + closeList.add(cachedConnection); + iterator.remove(); + } + } + } + } + + // close outside of sync block + for (CachedConnection cachedConnection : closeList) { + try { + cachedConnection.transport.close(); + } catch (Exception e) { + log.debug("Failed to close connection w/ errors", e); + } + } + + if (!existInCache) { + log.warn("Returned tablet server connection to cache that did not come from cache"); + // close outside of sync block + tsc.close(); + } + } + + /** + * Set the time after which idle connections should be closed + * + * @param time + */ + public synchronized void setIdleTime(long time) { + this.killTime = time; + log.debug("Set thrift transport pool idle time to " + time); + } + + private static ThriftTransportPool instance = new ThriftTransportPool(); + private static final AtomicBoolean daemonStarted = new AtomicBoolean(false); + + public static ThriftTransportPool getInstance() { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(TRANSPORT_POOL_PERMISSION); + } + + if (daemonStarted.compareAndSet(false, true)) { - new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start(); ++ CountDownLatch closerExitLatch = new CountDownLatch(1); ++ new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start(); ++ instance.setCloserExitLatch(closerExitLatch); + } + return instance; + } ++ ++ private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) { ++ this.closerExitLatch = closerExitLatch; ++ } ++ ++ public void shutdown() { ++ synchronized (this) { ++ if (cache == null) ++ return; ++ ++ // close any connections in the pool... even ones that are in use ++ for (List<CachedConnection> ccl : getCache().values()) { ++ Iterator<CachedConnection> iter = ccl.iterator(); ++ while (iter.hasNext()) { ++ CachedConnection cc = iter.next(); ++ try { ++ cc.transport.close(); ++ } catch (Exception e) { ++ log.debug("Error closing transport during shutdown", e); ++ } ++ } ++ } ++ ++ // this will render the pool unusable and cause the background thread to exit ++ this.cache = null; ++ } ++ ++ try { ++ closerExitLatch.await(); ++ } catch (InterruptedException e) { ++ throw new RuntimeException(e); ++ } ++ } ++ ++ private Map<ThriftTransportKey,List<CachedConnection>> getCache() { ++ if (cache == null) ++ throw new TransportPoolShutdownException(); ++ return cache; ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/util/CleanUp.java index 0000000,0000000..be5a41a new file mode 100644 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java @@@ -1,0 -1,0 +1,63 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.core.util; ++ ++import java.util.Set; ++ ++import org.apache.accumulo.core.client.impl.ThriftTransportPool; ++import org.apache.accumulo.fate.zookeeper.ZooSession; ++import org.apache.log4j.Logger; ++ ++/** ++ * ++ */ ++public class CleanUp { ++ ++ private static final Logger log = Logger.getLogger(CleanUp.class); ++ ++ /** ++ * kills all threads created by internal Accumulo singleton resources. After this method is called, no accumulo client will work in the current classloader. ++ */ ++ public static void shutdownNow() { ++ ThriftTransportPool.getInstance().shutdown(); ++ ZooSession.shutdown(); ++ waitForZooKeeperClientThreads(); ++ } ++ ++ /** ++ * As documented in https://issues.apache.org/jira/browse/ZOOKEEPER-1816, ZooKeeper.close() ++ * is a non-blocking call. This method will wait on the ZooKeeper internal threads to exit. ++ */ ++ private static void waitForZooKeeperClientThreads() { ++ Set<Thread> threadSet = Thread.getAllStackTraces().keySet(); ++ for (Thread thread : threadSet) { ++ // find ZooKeeper threads that were created in the same ClassLoader as the current thread. ++ if (thread.getClass().getName().startsWith("org.apache.zookeeper.ClientCnxn") && ++ thread.getContextClassLoader().equals(Thread.currentThread().getContextClassLoader())) { ++ ++ // wait for the thread the die ++ while (thread.isAlive()) { ++ try { ++ Thread.sleep(100); ++ } catch (InterruptedException e) { ++ log.error(e.getMessage(), e); ++ } ++ } ++ } ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java ---------------------------------------------------------------------- diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java index 7258ff0,0000000..040b01d mode 100644,000000..100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java @@@ -1,139 -1,0 +1,160 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.fate.zookeeper; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.log4j.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; + - class ZooSession { ++public class ZooSession { + ++ public static class ZooSessionShutdownException extends RuntimeException { ++ ++ private static final long serialVersionUID = 1L; ++ ++ } ++ + private static final Logger log = Logger.getLogger(ZooSession.class); + + private static class ZooSessionInfo { + public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) { + this.zooKeeper = zooKeeper; + } + + ZooKeeper zooKeeper; + } + + private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>(); + + private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) { + return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth)); + } + + private static class ZooWatcher implements Watcher { + + public void process(WatchedEvent event) { + if (event.getState() == KeeperState.Expired) { + log.debug("Session expired, state of current session : " + event.getState()); + } + } + + } + + public static ZooKeeper connect(String host, int timeout, String scheme, byte[] auth, Watcher watcher) { + final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100; + final int TOTAL_CONNECT_TIME_WAIT_MS = 10 * 1000; + boolean tryAgain = true; + int sleepTime = 100; + ZooKeeper zooKeeper = null; + + long startTime = System.currentTimeMillis(); + + while (tryAgain) { + try { + zooKeeper = new ZooKeeper(host, timeout, watcher); + // it may take some time to get connected to zookeeper if some of the servers are down + for (int i = 0; i < TOTAL_CONNECT_TIME_WAIT_MS / TIME_BETWEEN_CONNECT_CHECKS_MS && tryAgain; i++) { + if (zooKeeper.getState().equals(States.CONNECTED)) { + if (auth != null) + zooKeeper.addAuthInfo(scheme, auth); + tryAgain = false; + } else + UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS); + } + + if (System.currentTimeMillis() - startTime > 2 * timeout) + throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout); + + } catch (UnknownHostException uhe) { + // do not expect to recover from this + log.warn(uhe.getClass().getName() + " : " + uhe.getMessage()); + throw new RuntimeException(uhe); + } catch (IOException e) { + log.warn("Connection to zooKeeper failed, will try again in " + String.format("%.2f secs", sleepTime / 1000.0), e); + } finally { + if (tryAgain && zooKeeper != null) + try { + zooKeeper.close(); + zooKeeper = null; + } catch (InterruptedException e) { + log.warn("interrupted", e); + } + } + + if (tryAgain) { + UtilWaitThread.sleep(sleepTime); + if (sleepTime < 10000) + sleepTime = (int) (sleepTime + sleepTime * Math.random()); + } + } + + return zooKeeper; + } + + public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) { + return getSession(zooKeepers, timeout, null, null); + } + + public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String scheme, byte[] auth) { + ++ if (sessions == null) ++ throw new ZooSessionShutdownException(); ++ + String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth); + + // a read-only session can use a session with authorizations, so cache a copy for it w/out auths + String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null); + ZooSessionInfo zsi = sessions.get(sessionKey); + if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) { + if (auth != null && sessions.get(readOnlySessionKey) == zsi) + sessions.remove(readOnlySessionKey); + zsi = null; + sessions.remove(sessionKey); + } + + if (zsi == null) { + ZooWatcher watcher = new ZooWatcher(); + log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth"); + zsi = new ZooSessionInfo(connect(zooKeepers, timeout, scheme, auth, watcher), watcher); + sessions.put(sessionKey, zsi); + if (auth != null && !sessions.containsKey(readOnlySessionKey)) + sessions.put(readOnlySessionKey, zsi); + } + return zsi.zooKeeper; + } ++ ++ public static synchronized void shutdown() { ++ for (ZooSessionInfo zsi : sessions.values()) { ++ try { ++ zsi.zooKeeper.close(); ++ } catch (Exception e) { ++ log.debug("Error closing zookeeper during shutdown", e); ++ } ++ } ++ ++ sessions = null; ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java index 0000000,0000000..5a0ca26 new file mode 100644 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpTest.java @@@ -1,0 -1,0 +1,153 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.test.functional; ++ ++import java.util.Collections; ++import java.util.Iterator; ++import java.util.List; ++import java.util.Map; ++import java.util.Map.Entry; ++import java.util.Set; ++ ++import org.apache.accumulo.core.client.BatchWriter; ++import org.apache.accumulo.core.client.Scanner; ++import org.apache.accumulo.core.data.Key; ++import org.apache.accumulo.core.data.Mutation; ++import org.apache.accumulo.core.data.Value; ++import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.core.util.CleanUp; ++ ++/** ++ * ++ */ ++public class CleanUpTest extends FunctionalTest { ++ ++ @Override ++ public Map<String,String> getInitialConfig() { ++ return Collections.emptyMap(); ++ } ++ ++ @Override ++ public List<TableSetup> getTablesToCreate() { ++ return Collections.emptyList(); ++ } ++ ++ @Override ++ public void run() throws Exception { ++ ++ ++ getConnector().tableOperations().create("test"); ++ ++ BatchWriter bw = getConnector().createBatchWriter("test", 1000000, 60000, 1); ++ ++ Mutation m1 = new Mutation("r1"); ++ m1.put("cf1", "cq1", 1, "5"); ++ ++ bw.addMutation(m1); ++ ++ bw.flush(); ++ ++ Scanner scanner = getConnector().createScanner("test", new Authorizations()); ++ ++ int count = 0; ++ for (Entry<Key,Value> entry : scanner) { ++ count++; ++ if (!entry.getValue().toString().equals("5")) { ++ throw new Exception("Unexpected value " + entry.getValue()); ++ } ++ } ++ ++ if (count != 1) { ++ throw new Exception("Unexpected count " + count); ++ } ++ ++ if (countThreads() < 2) { ++ printThreadNames(); ++ throw new Exception("Not seeing expected threads"); ++ } ++ ++ CleanUp.shutdownNow(); ++ ++ Mutation m2 = new Mutation("r2"); ++ m2.put("cf1", "cq1", 1, "6"); ++ ++ try { ++ bw.addMutation(m1); ++ bw.flush(); ++ throw new Exception("batch writer did not fail"); ++ } catch (Exception e) { ++ ++ } ++ ++ try { ++ // expect this to fail also, want to clean up batch writer threads ++ bw.close(); ++ throw new Exception("batch writer close not fail"); ++ } catch (Exception e) { ++ ++ } ++ ++ try { ++ count = 0; ++ Iterator<Entry<Key,Value>> iter = scanner.iterator(); ++ while (iter.hasNext()) { ++ iter.next(); ++ count++; ++ } ++ throw new Exception("scanner did not fail"); ++ } catch (Exception e) { ++ ++ } ++ ++ if (countThreads() > 0) { ++ printThreadNames(); ++ throw new Exception("Threads did not go away"); ++ } ++ } ++ ++ private void printThreadNames() { ++ Set<Thread> threads = Thread.getAllStackTraces().keySet(); ++ for (Thread thread : threads) { ++ System.out.println("thread name:" + thread.getName()); ++ thread.getStackTrace(); ++ ++ } ++ } ++ ++ /** ++ * count threads that should be cleaned up ++ * ++ */ ++ private int countThreads() { ++ int count = 0; ++ Set<Thread> threads = Thread.getAllStackTraces().keySet(); ++ for (Thread thread : threads) { ++ ++ if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread")) ++ count++; ++ ++ if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool")) ++ count++; ++ } ++ ++ return count; ++ } ++ ++ @Override ++ public void cleanup() throws Exception {} ++ ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/400b991f/test/system/auto/simple/cleanup.py ---------------------------------------------------------------------- diff --cc test/system/auto/simple/cleanup.py index 0000000,1ed8aff..03f7721 mode 000000,100755..100755 --- a/test/system/auto/simple/cleanup.py +++ b/test/system/auto/simple/cleanup.py @@@ -1,0 -1,30 +1,30 @@@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + + from JavaTest import JavaTest + + import unittest + + class CleanUpTest(JavaTest): + "Test clean up util" + + order = 21 - testClass="org.apache.accumulo.server.test.functional.CleanUpTest" ++ testClass="org.apache.accumulo.test.functional.CleanUpTest" + + + def suite(): + result = unittest.TestSuite() + result.addTest(CleanUpTest()) + return result