This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c9e3877 Make ThriftTransportPool a part of ClientContext (#2303)
c9e3877 is described below
commit c9e3877373303997d42b465b2cdb62fd3fffd5d8
Author: Mike Miller <[email protected]>
AuthorDate: Fri Oct 22 10:51:43 2021 -0400
Make ThriftTransportPool a part of ClientContext (#2303)
* Create getTransportPool() in ClientContext that will create a
ThriftTransportPool once and call startCheckerThread()
* Make close method in ClientContext shutdown the ThriftTransportPool
* Pass ClientContext to static methods where needed to return
ThriftTransportPool
* Drop unused transport object from TabletServerBatchReaderIterator
* Internal ThriftTransportPool refactoring included: Dropping singleton
code,
making private objects final, changing access modifiers of methods and
replacing
getConnectionPool() with direct references to object
---
.../accumulo/core/clientImpl/ClientContext.java | 13 +++
.../core/clientImpl/ConditionalWriterImpl.java | 6 +-
.../core/clientImpl/InstanceOperationsImpl.java | 4 +-
.../accumulo/core/clientImpl/ManagerClient.java | 8 +-
.../core/clientImpl/ReplicationClient.java | 8 +-
.../accumulo/core/clientImpl/ServerClient.java | 10 +--
.../core/clientImpl/TableOperationsImpl.java | 16 ++--
.../TabletServerBatchReaderIterator.java | 6 +-
.../core/clientImpl/TabletServerBatchWriter.java | 2 +-
.../accumulo/core/clientImpl/ThriftScanner.java | 6 +-
.../core/clientImpl/ThriftTransportPool.java | 99 ++++------------------
.../apache/accumulo/core/clientImpl/Writer.java | 2 +-
.../org/apache/accumulo/core/rpc/ThriftUtil.java | 10 +--
.../org/apache/accumulo/core/summary/Gatherer.java | 2 +-
.../util/compaction/ExternalCompactionUtil.java | 6 +-
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 34 ++++----
.../accumulo/server/client/BulkImporter.java | 4 +-
.../accumulo/server/manager/LiveTServerSet.java | 18 ++--
.../manager/balancer/BalancerEnvironmentImpl.java | 2 +-
.../server/master/balancer/TabletBalancer.java | 2 +-
.../server/util/VerifyTabletAssignments.java | 2 +-
.../coordinator/CompactionCoordinator.java | 9 +-
.../accumulo/coordinator/CompactionFinalizer.java | 2 +-
.../org/apache/accumulo/compactor/Compactor.java | 8 +-
.../java/org/apache/accumulo/manager/Manager.java | 4 +-
.../manager/tableOps/bulkVer1/LoadFiles.java | 2 +-
.../manager/tableOps/bulkVer2/LoadFiles.java | 2 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 8 +-
.../rest/tservers/TabletServerResource.java | 2 +-
.../org/apache/accumulo/tserver/TabletServer.java | 2 +-
.../accumulo/shell/commands/ListBulkCommand.java | 2 +-
.../accumulo/test/DetectDeadTabletServersIT.java | 2 +-
.../org/apache/accumulo/test/GetManagerStats.java | 2 +-
.../apache/accumulo/test/TransportCachingIT.java | 2 +-
.../functional/BalanceAfterCommsFailureIT.java | 2 +-
.../BalanceInPresenceOfOfflineTableIT.java | 2 +-
.../accumulo/test/functional/BulkFailureIT.java | 4 +-
.../test/functional/MetadataMaxFilesIT.java | 2 +-
.../test/functional/SimpleBalancerFairnessIT.java | 2 +-
39 files changed, 130 insertions(+), 189 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 70425f9..9e3fdce 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -113,6 +113,7 @@ public class ClientContext implements AccumuloClient {
private final Supplier<SaslConnectionParams> saslSupplier;
private final Supplier<SslConnectionParams> sslSupplier;
private TCredentials rpcCreds;
+ private ThriftTransportPool thriftTransportPool;
private volatile boolean closed = false;
@@ -711,6 +712,9 @@ public class ClientContext implements AccumuloClient {
@Override
public void close() {
closed = true;
+ if (thriftTransportPool != null) {
+ thriftTransportPool.shutdown();
+ }
singletonReservation.close();
}
@@ -915,4 +919,13 @@ public class ClientContext implements AccumuloClient {
setProperty(property, Integer.toString(value));
}
}
+
+ public synchronized ThriftTransportPool getTransportPool() {
+ ensureOpen();
+ if (thriftTransportPool == null) {
+ thriftTransportPool = new ThriftTransportPool();
+ thriftTransportPool.startCheckerThread();
+ }
+ return thriftTransportPool;
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 83c6dff..e8b21b0 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -239,7 +239,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
client = getClient(sid.location);
client.closeConditionalUpdate(tinfo, sid.sessionID);
} catch (Exception e) {} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
@@ -608,7 +608,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
} finally {
if (sessionId != null)
unreserveSessionID(location);
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
@@ -697,7 +697,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
client = getClient(location);
client.invalidateConditionalUpdate(tinfo, sessionId);
} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index ae8924c..1619139 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -177,7 +177,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
throw new AccumuloException(e);
} finally {
if (client != null)
- returnClient(client);
+ returnClient(client, context);
}
}
@@ -207,7 +207,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
throw new AccumuloException(e);
} finally {
if (client != null)
- returnClient(client);
+ returnClient(client, context);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ManagerClient.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ManagerClient.java
index 2e6fd59..20a2048 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ManagerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ManagerClient.java
@@ -82,11 +82,11 @@ public class ManagerClient {
}
}
- public static void close(ManagerClientService.Iface iface) {
+ public static void close(ManagerClientService.Iface iface, ClientContext
context) {
TServiceClient client = (TServiceClient) iface;
if (client != null && client.getInputProtocol() != null
&& client.getInputProtocol().getTransport() != null) {
-
ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
+
context.getTransportPool().returnTransport(client.getInputProtocol().getTransport());
} else {
log.debug("Attempt to close null connection to the manager", new
Exception());
}
@@ -124,7 +124,7 @@ public class ManagerClient {
throw new AccumuloException(e);
} finally {
if (client != null)
- close(client);
+ close(client, context);
}
}
}
@@ -162,7 +162,7 @@ public class ManagerClient {
throw new AccumuloException(e);
} finally {
if (client != null)
- close(client);
+ close(client, context);
}
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationClient.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationClient.java
index 8f50a6c..f05f361 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationClient.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationClient.java
@@ -140,10 +140,10 @@ public class ReplicationClient {
}
}
- private static void close(TServiceClient client) {
+ private static void close(TServiceClient client, ClientContext context) {
if (client != null && client.getInputProtocol() != null
&& client.getInputProtocol().getTransport() != null) {
-
ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
+
context.getTransportPool().returnTransport(client.getInputProtocol().getTransport());
} else {
log.debug("Attempt to close null connection to the remote system", new
Exception());
}
@@ -172,7 +172,7 @@ public class ReplicationClient {
throw new AccumuloException(e);
} finally {
if (client != null)
- close(client);
+ close(client, context);
}
}
@@ -195,7 +195,7 @@ public class ReplicationClient {
throw new AccumuloException(e);
} finally {
if (client != null)
- close(client);
+ close(client, context);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ServerClient.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ServerClient.java
index 0532179..afc99e5 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ServerClient.java
@@ -102,7 +102,7 @@ public class ServerClient {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
- ServerClient.close(client);
+ ServerClient.close(client, context);
}
}
}
@@ -126,7 +126,7 @@ public class ServerClient {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
- ServerClient.close(client);
+ ServerClient.close(client, context);
}
}
}
@@ -158,7 +158,7 @@ public class ServerClient {
boolean opened = false;
try {
Pair<String,TTransport> pair =
- ThriftTransportPool.getInstance().getAnyTransport(servers,
preferCachedConnections);
+ context.getTransportPool().getAnyTransport(servers,
preferCachedConnections);
CT client = ThriftUtil.createClient(factory, pair.getSecond());
opened = true;
warnedAboutTServersBeingDown = false;
@@ -177,10 +177,10 @@ public class ServerClient {
}
}
- public static void close(TServiceClient client) {
+ public static void close(TServiceClient client, ClientContext context) {
if (client != null && client.getInputProtocol() != null
&& client.getInputProtocol().getTransport() != null) {
-
ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
+
context.getTransportPool().returnTransport(client.getInputProtocol().getTransport());
} else {
log.debug("Attempt to close null connection to a server", new
Exception());
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index a6f5370..b179eb1 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -266,7 +266,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
log.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
}
@@ -291,7 +291,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
log.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
}
@@ -311,7 +311,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
log.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
}
@@ -331,7 +331,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
log.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
}
@@ -578,7 +578,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
}
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
} catch (TApplicationException tae) {
@@ -930,7 +930,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
log.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
@@ -950,7 +950,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
log.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
} catch (ThriftSecurityException e) {
@@ -1463,7 +1463,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
} finally {
// must always return thrift connection
if (pair != null)
- ServerClient.close(pair.getSecond());
+ ServerClient.close(pair.getSecond(), context);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 84629f6..a1965b6 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -70,7 +70,6 @@ import org.apache.accumulo.core.util.OpTimer;
import org.apache.htrace.wrappers.TraceRunnable;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -650,7 +649,6 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
}
timeoutTracker.startingScan();
- TTransport transport = null;
try {
final HostAndPort parsedServer = HostAndPort.fromString(server);
final TabletClientService.Client client;
@@ -757,7 +755,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
@@ -785,8 +783,6 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
timeoutTracker.errorOccured();
throw new IOException(e);
- } finally {
- ThriftTransportPool.getInstance().returnTransport(transport);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 30369fe..52c208d 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -969,7 +969,7 @@ public class TabletServerBatchWriter implements
AutoCloseable {
}
return allFailures;
} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
} catch (TTransportException e) {
timeoutTracker.errorOccured();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 382ffe5..c16d2a7 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -129,7 +129,7 @@ public class ThriftScanner {
return isr.result.more;
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
} catch (TApplicationException tae) {
throw new AccumuloServerException(server, tae);
@@ -544,7 +544,7 @@ public class ThriftScanner {
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
Thread.currentThread().setName(old);
}
}
@@ -564,7 +564,7 @@ public class ThriftScanner {
log.debug("Failed to close active scan " + scanState.prevLoc + " " +
scanState.scanID, e);
} finally {
if (client != null)
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, scanState.context);
}
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index 4be3e67..32ffb99 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -37,8 +37,6 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.singletons.SingletonManager;
-import org.apache.accumulo.core.singletons.SingletonService;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.Threads;
@@ -360,11 +358,11 @@ public class ThriftTransportPool {
private final ConnectionPool connectionPool = new ConnectionPool();
- private Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
- private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
- private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
+ private final Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
+ private final Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
+ private final Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
- private Supplier<Thread> checkThreadFactory = Suppliers.memoize(() -> {
+ private final Supplier<Thread> checkThreadFactory = Suppliers.memoize(() -> {
var thread = Threads.createThread("Thrift Connection Pool Checker", new
Closer());
thread.start();
return thread;
@@ -407,7 +405,7 @@ public class ThriftTransportPool {
private class Closer implements Runnable {
private void closeConnections() throws InterruptedException {
- while (!getConnectionPool().shutdown) {
+ while (!connectionPool.shutdown) {
closeExpiredConnections();
Thread.sleep(500);
}
@@ -423,7 +421,7 @@ public class ThriftTransportPool {
}
}
- static class CachedTTransport extends TTransport {
+ private static class CachedTTransport extends TTransport {
private final ThriftTransportKey cacheKey;
private final TTransport wrappedTransport;
@@ -686,16 +684,13 @@ public class ThriftTransportPool {
}
- private ThriftTransportPool() {}
-
public TTransport getTransport(HostAndPort location, long milliseconds,
ClientContext context)
throws TTransportException {
ThriftTransportKey cacheKey = new ThriftTransportKey(location,
milliseconds, context);
// compute hash code outside of lock, this lowers the time the lock is held
cacheKey.precomputeHashCode();
- ConnectionPool pool = getConnectionPool();
- CachedConnection connection = pool.reserveAny(cacheKey);
+ CachedConnection connection = connectionPool.reserveAny(cacheKey);
if (connection != null) {
log.trace("Using existing connection to {}", cacheKey.getServer());
@@ -714,17 +709,15 @@ public class ThriftTransportPool {
if (preferCachedConnection) {
HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
- ConnectionPool pool = getConnectionPool();
-
// randomly pick a server from the connection cache
- serversSet.retainAll(pool.getThriftTransportKeys());
+ serversSet.retainAll(connectionPool.getThriftTransportKeys());
if (!serversSet.isEmpty()) {
ArrayList<ThriftTransportKey> cachedServers = new
ArrayList<>(serversSet);
Collections.shuffle(cachedServers, random);
for (ThriftTransportKey ttk : cachedServers) {
- CachedConnection connection = pool.reserveAny(ttk);
+ CachedConnection connection = connectionPool.reserveAny(ttk);
if (connection != null) {
final String serverAddr = ttk.getServer().toString();
log.trace("Using existing connection to {}", serverAddr);
@@ -735,7 +728,6 @@ public class ThriftTransportPool {
}
}
- ConnectionPool pool = getConnectionPool();
int retryCount = 0;
while (!servers.isEmpty() && retryCount < 10) {
@@ -743,7 +735,7 @@ public class ThriftTransportPool {
ThriftTransportKey ttk = servers.get(index);
if (preferCachedConnection) {
- CachedConnection connection = pool.reserveAnyIfPresent(ttk);
+ CachedConnection connection = connectionPool.reserveAnyIfPresent(ttk);
if (connection != null) {
return new Pair<>(ttk.getServer().toString(), connection.transport);
}
@@ -773,8 +765,7 @@ public class ThriftTransportPool {
connection.reserve();
try {
- ConnectionPool pool = getConnectionPool();
- pool.putReserved(cacheKey, connection);
+ connectionPool.putReserved(cacheKey, connection);
} catch (TransportPoolShutdownException e) {
connection.transport.close();
throw e;
@@ -790,8 +781,7 @@ public class ThriftTransportPool {
CachedTTransport cachedTransport = (CachedTTransport) transport;
ArrayList<CachedConnection> closeList = new ArrayList<>();
- ConnectionPool pool = getConnectionPool();
- boolean existInCache = pool.returnTransport(cachedTransport, closeList);
+ boolean existInCache = connectionPool.returnTransport(cachedTransport,
closeList);
// close outside of sync block
closeList.forEach((connection) -> {
@@ -843,67 +833,14 @@ public class ThriftTransportPool {
log.debug("Set thrift transport pool idle time to {}", time);
}
- private static ThriftTransportPool instance = null;
-
- static {
- SingletonManager.register(new SingletonService() {
-
- @Override
- public boolean isEnabled() {
- return ThriftTransportPool.isEnabled();
- }
-
- @Override
- public void enable() {
- ThriftTransportPool.enable();
- }
-
- @Override
- public void disable() {
- ThriftTransportPool.disable();
- }
- });
- }
-
- public static synchronized ThriftTransportPool getInstance() {
- Preconditions.checkState(instance != null,
- "The Accumulo singleton for connection pooling is disabled. This is
likely caused by all "
- + "AccumuloClients being closed or garbage collected.");
- instance.startCheckerThread();
- return instance;
- }
-
- private static synchronized boolean isEnabled() {
- return instance != null;
- }
-
- private static synchronized void enable() {
- if (instance == null) {
- // this code intentionally does not start the thread that closes idle
connections. That thread
- // is created the first time something attempts to use this service.
- instance = new ThriftTransportPool();
- }
- }
-
- private static synchronized void disable() {
- if (instance != null) {
- try {
- instance.shutdown();
- } finally {
- instance = null;
- }
- }
- }
-
- public void startCheckerThread() {
+ void startCheckerThread() {
checkThreadFactory.get();
}
- void closeExpiredConnections() {
+ private void closeExpiredConnections() {
List<CachedConnection> expiredConnections;
- ConnectionPool pool = getConnectionPool();
- expiredConnections = pool.removeExpiredConnections(killTime);
+ expiredConnections = connectionPool.removeExpiredConnections(killTime);
synchronized (errorCount) {
Iterator<Entry<ThriftTransportKey,Long>> iter =
errorTime.entrySet().iterator();
@@ -921,7 +858,7 @@ public class ThriftTransportPool {
expiredConnections.forEach((c) -> c.transport.close());
}
- private void shutdown() {
+ void shutdown() {
connectionPool.shutdown();
try {
checkThreadFactory.get().join();
@@ -929,8 +866,4 @@ public class ThriftTransportPool {
throw new RuntimeException(e);
}
}
-
- private ConnectionPool getConnectionPool() {
- return connectionPool;
- }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java
index 29fb978..da67a5b 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java
@@ -76,7 +76,7 @@ public class Writer {
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code);
} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 6836863..92151bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -34,7 +34,6 @@ import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.HostAndPort;
@@ -128,7 +127,7 @@ public class ThriftUtil {
*/
public static <T extends TServiceClient> T
getClient(TServiceClientFactory<T> factory,
HostAndPort address, ClientContext context) throws TTransportException {
- TTransport transport =
ThriftTransportPool.getInstance().getTransport(address,
+ TTransport transport = context.getTransportPool().getTransport(address,
context.getClientTimeoutInMillis(), context);
return createClient(factory, transport);
}
@@ -148,8 +147,7 @@ public class ThriftUtil {
*/
public static <T extends TServiceClient> T
getClient(TServiceClientFactory<T> factory,
HostAndPort address, ClientContext context, long timeout) throws
TTransportException {
- TTransport transport =
- ThriftTransportPool.getInstance().getTransport(address, timeout,
context);
+ TTransport transport = context.getTransportPool().getTransport(address,
timeout, context);
return createClient(factory, transport);
}
@@ -159,9 +157,9 @@ public class ThriftUtil {
* @param iface
* The Client being returned or null.
*/
- public static void returnClient(TServiceClient iface) { // Eew... the typing
here is horrible
+ public static void returnClient(TServiceClient iface, ClientContext context)
{
if (iface != null) {
-
ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
+
context.getTransportPool().returnTransport(iface.getInputProtocol().getTransport());
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 57e6e78..2d19a84 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -338,7 +338,7 @@ public class Gatherer {
} catch (TTransportException e1) {
pfiles.failedFiles.addAll(allFiles.keySet());
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, ctx);
}
if (cancelFlag.get()) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 2e42021..b9605f7 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -135,7 +135,7 @@ public class ExternalCompactionUtil {
} catch (TException e) {
LOG.debug("Failed to contact compactor {}", compactor, e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
return List.of();
}
@@ -164,7 +164,7 @@ public class ExternalCompactionUtil {
} catch (TException e) {
LOG.debug("Failed to contact compactor {}", compactorAddr, e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
return null;
}
@@ -181,7 +181,7 @@ public class ExternalCompactionUtil {
} catch (TException e) {
LOG.debug("Failed to contact compactor {}", compactorAddr, e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
return null;
}
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 62824a4..370645f 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -772,22 +772,24 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
*/
public ManagerMonitorInfo getManagerMonitorInfo()
throws AccumuloException, AccumuloSecurityException {
- ManagerClientService.Iface client = null;
- while (true) {
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
- client = ManagerClient.getConnectionWithRetry((ClientContext) c);
- return client.getManagerStats(TraceUtil.traceInfo(), ((ClientContext)
c).rpcCreds());
- } catch (ThriftSecurityException exception) {
- throw new AccumuloSecurityException(exception);
- } catch (ThriftNotActiveServiceException e) {
- // Let it loop, fetching a new location
- log.debug("Contacted a Manager which is no longer active, retrying");
- sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- } catch (TException exception) {
- throw new AccumuloException(exception);
- } finally {
- if (client != null) {
- ManagerClient.close(client);
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
+ while (true) {
+ ManagerClientService.Iface client = null;
+ try {
+ client = ManagerClient.getConnectionWithRetry((ClientContext) c);
+ return client.getManagerStats(TraceUtil.traceInfo(),
((ClientContext) c).rpcCreds());
+ } catch (ThriftSecurityException exception) {
+ throw new AccumuloSecurityException(exception);
+ } catch (ThriftNotActiveServiceException e) {
+ // Let it loop, fetching a new location
+ log.debug("Contacted a Manager which is no longer active, retrying");
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ } catch (TException exception) {
+ throw new AccumuloException(exception);
+ } finally {
+ if (client != null) {
+ ManagerClient.close(client, (ClientContext) c);
+ }
}
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index df6d5a3..27f5690 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -255,7 +255,7 @@ public class BulkImporter {
return assignmentStats;
} finally {
if (client != null) {
- ServerClient.close(client);
+ ServerClient.close(client, context);
}
}
}
@@ -589,7 +589,7 @@ public class BulkImporter {
return
failures.stream().map(KeyExtent::fromThrift).collect(Collectors.toList());
} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index f2a51ee..440ab77 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -105,7 +105,7 @@ public class LiveTServerSet implements Watcher {
try {
loadTablet(client, lock, extent);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
}
@@ -118,7 +118,7 @@ public class LiveTServerSet implements Watcher {
client.unloadTablet(TraceUtil.traceInfo(), context.rpcCreds(),
lockString(lock),
extent.toThrift(), goal, requestTime);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -148,7 +148,7 @@ public class LiveTServerSet implements Watcher {
try {
client.halt(TraceUtil.traceInfo(), context.rpcCreds(),
lockString(lock));
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -158,7 +158,7 @@ public class LiveTServerSet implements Watcher {
try {
client.fastHalt(TraceUtil.traceInfo(), context.rpcCreds(),
lockString(lock));
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -171,7 +171,7 @@ public class LiveTServerSet implements Watcher {
tableId.canonical(), startRow == null ? null :
ByteBuffer.wrap(startRow),
endRow == null ? null : ByteBuffer.wrap(endRow));
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -181,7 +181,7 @@ public class LiveTServerSet implements Watcher {
try {
client.chop(TraceUtil.traceInfo(), context.rpcCreds(),
lockString(lock), extent.toThrift());
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -193,7 +193,7 @@ public class LiveTServerSet implements Watcher {
client.splitTablet(TraceUtil.traceInfo(), context.rpcCreds(),
extent.toThrift(),
ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -206,7 +206,7 @@ public class LiveTServerSet implements Watcher {
startRow == null ? null : ByteBuffer.wrap(startRow),
endRow == null ? null : ByteBuffer.wrap(endRow));
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
@@ -216,7 +216,7 @@ public class LiveTServerSet implements Watcher {
try {
return client.isActive(TraceUtil.traceInfo(), tid);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
index 0414ff0..8eb7c46 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
@@ -95,7 +95,7 @@ public class BalancerEnvironmentImpl extends
ServiceEnvironmentImpl implements B
} catch (TTransportException e) {
log.error("Unable to connect to {}: ", tabletServerId, e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, getContext());
}
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index 9ee4508..488660b 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -267,7 +267,7 @@ public abstract class TabletBalancer
} catch (TTransportException e) {
log.error("Unable to connect to {}: ", tserver, e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
return null;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index d1f5227..720aef7 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -203,6 +203,6 @@ public class VerifyTabletAssignments {
client.closeMultiScan(tinfo, is.scanID);
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b6c15c7..14abd6d 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
@@ -63,6 +62,7 @@ import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
@@ -335,7 +335,7 @@ public class CompactionCoordinator extends AbstractServer
queuesSeen.add(summary.getQueue());
});
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, getContext());
}
} catch (TException e) {
LOG.warn("Error getting external compaction summaries from tablet
server: {}",
@@ -437,7 +437,7 @@ public class CompactionCoordinator extends AbstractServer
QUEUE_SUMMARIES.removeSummary(tserver, queueName, prioTserver.prio);
prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, getContext());
}
}
@@ -463,8 +463,9 @@ public class CompactionCoordinator extends AbstractServer
protected TabletClientService.Client
getTabletServerConnection(TServerInstance tserver)
throws TTransportException {
TServerConnection connection = tserverSet.getConnection(tserver);
+ ServerContext serverContext = getContext();
TTransport transport =
-
ThriftTransportPool.getInstance().getTransport(connection.getAddress(), 0,
getContext());
+ serverContext.getTransportPool().getTransport(connection.getAddress(),
0, serverContext);
return ThriftUtil.createClient(new TabletClientService.Client.Factory(),
transport);
}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index c9e8bf0..ece40d8 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -137,7 +137,7 @@ public class CompactionFinalizer {
} catch (TException e) {
LOG.warn("Failed to notify tserver {}", loc.getHostAndPort(), e);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 6fdf0e8..ec25495 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -367,7 +367,7 @@ public class Compactor extends AbstractServer implements
CompactorService.Iface
System.currentTimeMillis());
return "";
} finally {
- ThriftUtil.returnClient(coordinatorClient);
+ ThriftUtil.returnClient(coordinatorClient, getContext());
}
}
});
@@ -394,7 +394,7 @@ public class Compactor extends AbstractServer implements
CompactorService.Iface
job.getExternalCompactionId(), job.extent);
return "";
} finally {
- ThriftUtil.returnClient(coordinatorClient);
+ ThriftUtil.returnClient(coordinatorClient, getContext());
}
}
});
@@ -423,7 +423,7 @@ public class Compactor extends AbstractServer implements
CompactorService.Iface
job.getExternalCompactionId(), job.extent, stats);
return "";
} finally {
- ThriftUtil.returnClient(coordinatorClient);
+ ThriftUtil.returnClient(coordinatorClient, getContext());
}
}
});
@@ -458,7 +458,7 @@ public class Compactor extends AbstractServer implements
CompactorService.Iface
currentCompactionId.set(null);
throw e;
} finally {
- ThriftUtil.returnClient(coordinatorClient);
+ ThriftUtil.returnClient(coordinatorClient, getContext());
}
}
});
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 4ce6f30..e9fa3c0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -52,7 +52,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.Tables;
-import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
@@ -381,8 +380,7 @@ public class Manager extends AbstractServer
log.info("Version {}", Constants.VERSION);
log.info("Instance {}", getInstanceID());
timeKeeper = new ManagerTime(this, aconf);
- ThriftTransportPool.getInstance()
- .setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+
context.getTransportPool().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
tserverSet = new LiveTServerSet(context, this);
initializeBalancer();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/LoadFiles.java
index b9e0cc8..f626b69 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/LoadFiles.java
@@ -179,7 +179,7 @@ class LoadFiles extends ManagerRepo {
log.error(
"rpc failed server:" + server + ", tid:" +
FateTxId.formatTid(tid) + " " + ex);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, manager.getContext());
}
return null;
}));
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 1630ede..384b6c0 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -175,7 +175,7 @@ class LoadFiles extends ManagerRepo {
} catch (TException ex) {
log.debug("rpc failed server: " + server + ", " + fmtTid + " " +
ex.getMessage(), ex);
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, manager.getContext());
}
});
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 9e48e9d..3fca610 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -268,7 +268,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
log.info("Error fetching stats: ", e);
} finally {
if (client != null) {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
if (mmi == null) {
@@ -402,7 +402,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
try {
result = client.getStatus(TraceUtil.traceInfo(), context.rpcCreds());
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
}
} catch (Exception ex) {
@@ -604,7 +604,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
} catch (Exception ex) {
log.error("Failed to get active scans from {}", server, ex);
} finally {
- ThriftUtil.returnClient(tserver);
+ ThriftUtil.returnClient(tserver, context);
}
}
// Age off old scan information
@@ -632,7 +632,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
} catch (Exception ex) {
log.debug("Failed to get active compactions from {}", server, ex);
} finally {
- ThriftUtil.returnClient(tserver);
+ ThriftUtil.returnClient(tserver, context);
}
}
// Age off old compaction information
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java
index b1f3a01..8d6b365 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tservers/TabletServerResource.java
@@ -192,7 +192,7 @@ public class TabletServerResource {
}
historical = client.getHistoricalStats(TraceUtil.traceInfo(),
context.rpcCreds());
} finally {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
} catch (Exception e) {
return null;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2dc9aac..a9f7a43 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -584,7 +584,7 @@ public class TabletServer extends AbstractServer {
}
private void returnManagerConnection(ManagerClientService.Client client) {
- ThriftUtil.returnClient(client);
+ ThriftUtil.returnClient(client, context);
}
private HostAndPort startTabletClientService() throws UnknownHostException {
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
index 9f6c6a5..cff938d 100644
---
a/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
+++
b/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
@@ -65,7 +65,7 @@ public class ListBulkCommand extends Command {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
index 526d4e4..8801529 100644
--- a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
@@ -101,7 +101,7 @@ public class DetectDeadTabletServersIT extends
ConfigurableMacBase {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null) {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/GetManagerStats.java
b/test/src/main/java/org/apache/accumulo/test/GetManagerStats.java
index 8c396f8..c54c3df 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetManagerStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetManagerStats.java
@@ -54,7 +54,7 @@ public class GetManagerStats {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null) {
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
index 15ded44..d8efbe0 100644
--- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@ -68,7 +68,7 @@ public class TransportCachingIT extends
AccumuloClusterHarness {
// only want to use one server for all subsequent test
servers = servers.subList(0, 1);
- ThriftTransportPool pool = ThriftTransportPool.getInstance();
+ ThriftTransportPool pool = context.getTransportPool();
TTransport first = getAnyTransport(servers, pool, true);
assertNotNull(first);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
index c5a10a9..350d0c0 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
@@ -122,7 +122,7 @@ public class BalanceAfterCommsFailureIT extends
ConfigurableMacBase {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
- ManagerClient.close(client);
+ ManagerClient.close(client, context);
}
}
unassignedTablets = stats.getUnassignedTablets();
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
index a9c0c66..12da9a4 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -173,7 +173,7 @@ public class BalanceInPresenceOfOfflineTableIT extends
AccumuloClusterHarness {
throw new AccumuloException(exception);
} finally {
if (client != null) {
- ManagerClient.close(client);
+ ManagerClient.close(client, (ClientContext) accumuloClient);
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
index 61aac0f..c055906 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
@@ -268,7 +268,7 @@ public class BulkFailureIT extends AccumuloClusterHarness {
throw tae;
}
} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
@@ -293,7 +293,7 @@ public class BulkFailureIT extends AccumuloClusterHarness {
}
} finally {
- ThriftUtil.returnClient((TServiceClient) client);
+ ThriftUtil.returnClient((TServiceClient) client, context);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
index 68bf9bc..325c976 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
@@ -95,7 +95,7 @@ public class MetadataMaxFilesIT extends ConfigurableMacBase {
continue;
} finally {
if (client != null)
- ManagerClient.close(client);
+ ManagerClient.close(client, (ClientContext) c);
}
int tablets = 0;
for (TabletServerStatus tserver : stats.tServerInfo) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
index ef5954e..a32ea09 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
@@ -97,7 +97,7 @@ public class SimpleBalancerFairnessIT extends
ConfigurableMacBase {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
- ManagerClient.close(client);
+ ManagerClient.close(client, (ClientContext) c);
}
}
unassignedTablets = stats.getUnassignedTablets();