Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c092cad Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c092cad Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c092cad Branch: refs/heads/1.6.0-SNAPSHOT Commit: 9c092cadde75250fa84e049eeae33687636dbc99 Parents: ee3ccb8 400b991 Author: Keith Turner <ktur...@apache.org> Authored: Mon Jan 6 21:11:02 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Jan 6 21:11:02 2014 -0500 ---------------------------------------------------------------------- .../core/client/impl/ThriftTransportPool.java | 100 +++++++++++--- .../org/apache/accumulo/core/util/CleanUp.java | 63 +++++++++ .../accumulo/fate/zookeeper/ZooSession.java | 23 +++- .../accumulo/test/functional/CleanUpIT.java | 137 +++++++++++++++++++ 4 files changed, 304 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index a553cc1,f123289..a5fecd5 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@@ -357,35 -376,20 +375,35 @@@ public class ThriftTransportPool private ThriftTransportPool() {} - public TTransport getTransport(String location, int port) throws TTransportException { - return getTransport(location, port, 0); - } - - public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException { - return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + public TTransport getTransportWithDefaultTimeout(HostAndPort addr, AccumuloConfiguration conf) throws TTransportException { + return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), SslConnectionParams.forClient(conf)); } - public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException { - return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout); + public TTransport getTransport(String location, long milliseconds, SslConnectionParams sslParams) throws TTransportException { + return getTransport(new ThriftTransportKey(location, milliseconds, sslParams)); } - public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException { - return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException { + synchronized (this) { + // atomically reserve location if it exist in cache - List<CachedConnection> ccl = cache.get(cacheKey); ++ List<CachedConnection> ccl = getCache().get(cacheKey); + + if (ccl == null) { + ccl = new LinkedList<CachedConnection>(); - cache.put(cacheKey, ccl); ++ getCache().put(cacheKey, ccl); + } + + for (CachedConnection cachedConnection : ccl) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort()); + return cachedConnection.transport; + } + } + } + + return createNewTransport(cacheKey); } Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException { @@@ -484,9 -531,9 +507,9 @@@ CachedTTransport ctsc = (CachedTTransport) tsc; ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>(); - + synchronized (this) { - List<CachedConnection> ccl = cache.get(ctsc.getCacheKey()); + List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey()); for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) { CachedConnection cachedConnection = iterator.next(); if (cachedConnection.transport == tsc) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java index 0000000,0000000..ad5b2fd new file mode 100644 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java @@@ -1,0 -1,0 +1,137 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.test.functional; ++ ++import java.util.Iterator; ++import java.util.Map.Entry; ++import java.util.Set; ++ ++import org.apache.accumulo.core.client.BatchWriter; ++import org.apache.accumulo.core.client.BatchWriterConfig; ++import org.apache.accumulo.core.client.Scanner; ++import org.apache.accumulo.core.data.Key; ++import org.apache.accumulo.core.data.Mutation; ++import org.apache.accumulo.core.data.Value; ++import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.core.util.CleanUp; ++import org.junit.Test; ++ ++/** ++ * ++ */ ++public class CleanUpIT extends SimpleMacIT { ++ @Test(timeout = 30 * 1000) ++ public void run() throws Exception { ++ ++ String tableName = getTableNames(1)[0]; ++ getConnector().tableOperations().create(tableName); ++ ++ BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); ++ ++ Mutation m1 = new Mutation("r1"); ++ m1.put("cf1", "cq1", 1, "5"); ++ ++ bw.addMutation(m1); ++ ++ bw.flush(); ++ ++ Scanner scanner = getConnector().createScanner(tableName, new Authorizations()); ++ ++ int count = 0; ++ for (Entry<Key,Value> entry : scanner) { ++ count++; ++ if (!entry.getValue().toString().equals("5")) { ++ throw new Exception("Unexpected value " + entry.getValue()); ++ } ++ } ++ ++ if (count != 1) { ++ throw new Exception("Unexpected count " + count); ++ } ++ ++ if (countThreads() < 2) { ++ printThreadNames(); ++ throw new Exception("Not seeing expected threads"); ++ } ++ ++ CleanUp.shutdownNow(); ++ ++ Mutation m2 = new Mutation("r2"); ++ m2.put("cf1", "cq1", 1, "6"); ++ ++ try { ++ bw.addMutation(m1); ++ bw.flush(); ++ throw new Exception("batch writer did not fail"); ++ } catch (Exception e) { ++ ++ } ++ ++ try { ++ // expect this to fail also, want to clean up batch writer threads ++ bw.close(); ++ throw new Exception("batch writer close not fail"); ++ } catch (Exception e) { ++ ++ } ++ ++ try { ++ count = 0; ++ Iterator<Entry<Key,Value>> iter = scanner.iterator(); ++ while (iter.hasNext()) { ++ iter.next(); ++ count++; ++ } ++ throw new Exception("scanner did not fail"); ++ } catch (Exception e) { ++ ++ } ++ ++ if (countThreads() > 0) { ++ printThreadNames(); ++ throw new Exception("Threads did not go away"); ++ } ++ } ++ ++ private void printThreadNames() { ++ Set<Thread> threads = Thread.getAllStackTraces().keySet(); ++ for (Thread thread : threads) { ++ System.out.println("thread name:" + thread.getName()); ++ thread.getStackTrace(); ++ ++ } ++ } ++ ++ /** ++ * count threads that should be cleaned up ++ * ++ */ ++ private int countThreads() { ++ int count = 0; ++ Set<Thread> threads = Thread.getAllStackTraces().keySet(); ++ for (Thread thread : threads) { ++ ++ if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread")) ++ count++; ++ ++ if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool")) ++ count++; ++ } ++ ++ return count; ++ } ++}