This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d91d016211 Optimized logic for getting a random TabletServer 
connection (#4309)
d91d016211 is described below

commit d91d0162115ae66112a104278bcd14e8085936d3
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Mar 4 09:16:58 2024 -0500

    Optimized logic for getting a random TabletServer connection (#4309)
    
    The previous logic in this class would gather all of the Tserver
    ZNodes in ZooKeeper, then get the data for each ZNode and validate
    their ServiceLock. Then, after all of that it would randomly pick
    one of the TabletServers to connect to. It did this through the
    ZooCache object which on an initial connection would be empty and
    causes a lot of back and forth to ZooKeeper. The side effect of
    this is that the ZooCache would be populated with TabletServer
    information.
    
    This change modifies TServerClient such that it no longer populates
    ZooCache information for each TabletServer and modifies the
    default logic for getting a connection to a TabletServer. The new
    logic will make 3 calls to ZooKeeper in the best case scenario, one to get
    the list of TServer ZNodes in Zookeeper, one to get the ServiceLock for
    a random TServer and another to get the ZNode data for one of it. This
    is all done through ZooCache, so it is lazily populated over time instead
    of incurring the penalty when getting the first TabletServer connection.
    
    Fixes #4303
---
 .../core/clientImpl/ThriftTransportKey.java        |  29 ++++--
 .../core/clientImpl/ThriftTransportPool.java       | 110 +++++----------------
 .../org/apache/accumulo/core/rpc/ThriftUtil.java   |   7 +-
 .../accumulo/core/rpc/clients/TServerClient.java   |  73 ++++++++------
 .../core/rpc/clients/ThriftClientTypes.java        |   6 +-
 .../core/clientImpl/ThriftTransportKeyTest.java    |  25 ++---
 .../coordinator/CompactionCoordinator.java         |   4 +-
 .../apache/accumulo/test/TransportCachingIT.java   |  42 ++++----
 8 files changed, 142 insertions(+), 154 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
index 8be320dcc5..f4c7047d6d 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
@@ -24,12 +24,14 @@ import java.util.Objects;
 
 import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.util.HostAndPort;
 
 import com.google.common.annotations.VisibleForTesting;
 
 @VisibleForTesting
 public class ThriftTransportKey {
+  private final ThriftClientTypes<?> type;
   private final HostAndPort server;
   private final long timeout;
   private final SslConnectionParams sslParams;
@@ -38,16 +40,18 @@ public class ThriftTransportKey {
   private final int hash;
 
   @VisibleForTesting
-  public ThriftTransportKey(HostAndPort server, long timeout, ClientContext 
context) {
-    this(server, timeout, context.getClientSslParams(), 
context.getSaslParams());
+  public ThriftTransportKey(ThriftClientTypes<?> type, HostAndPort server, 
long timeout,
+      ClientContext context) {
+    this(type, server, timeout, context.getClientSslParams(), 
context.getSaslParams());
   }
 
   /**
    * Visible only for testing
    */
-  ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams 
sslParams,
-      SaslConnectionParams saslParams) {
+  ThriftTransportKey(ThriftClientTypes<?> type, HostAndPort server, long 
timeout,
+      SslConnectionParams sslParams, SaslConnectionParams saslParams) {
     requireNonNull(server, "location is null");
+    this.type = type;
     this.server = server;
     this.timeout = timeout;
     this.sslParams = sslParams;
@@ -56,14 +60,21 @@ public class ThriftTransportKey {
       // TSasl and TSSL transport factories don't play nicely together
       throw new RuntimeException("Cannot use both SSL and SASL thrift 
transports");
     }
-    this.hash = Objects.hash(server, timeout, sslParams, saslParams);
+    this.hash = Objects.hash(type, server, timeout, sslParams, saslParams);
   }
 
-  HostAndPort getServer() {
+  @VisibleForTesting
+  public ThriftClientTypes<?> getType() {
+    return type;
+  }
+
+  @VisibleForTesting
+  public HostAndPort getServer() {
     return server;
   }
 
-  long getTimeout() {
+  @VisibleForTesting
+  public long getTimeout() {
     return timeout;
   }
 
@@ -81,7 +92,7 @@ public class ThriftTransportKey {
       return false;
     }
     ThriftTransportKey ttk = (ThriftTransportKey) o;
-    return server.equals(ttk.server) && timeout == ttk.timeout
+    return type.equals(ttk.type) && server.equals(ttk.server) && timeout == 
ttk.timeout
         && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)))
         && (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams)));
   }
@@ -99,7 +110,7 @@ public class ThriftTransportKey {
     } else if (isSasl()) {
       prefix = saslParams + ":";
     }
-    return prefix + server + " (" + timeout + ")";
+    return prefix + type + ":" + server + " (" + timeout + ")";
   }
 
   public SslConnectionParams getSslParams() {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index bd23c5dcbd..a3d38aa10a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -41,6 +41,7 @@ import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.threads.Threads;
@@ -50,7 +51,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -110,71 +110,40 @@ public class ThriftTransportPool {
     return pool;
   }
 
-  public TTransport getTransport(HostAndPort location, long milliseconds, 
ClientContext context)
-      throws TTransportException {
-    ThriftTransportKey cacheKey = new ThriftTransportKey(location, 
milliseconds, context);
+  public TTransport getTransport(ThriftClientTypes<?> type, HostAndPort 
location, long milliseconds,
+      ClientContext context, boolean preferCached) throws TTransportException {
 
-    CachedConnection connection = connectionPool.reserveAny(cacheKey);
-
-    if (connection != null) {
-      log.trace("Using existing connection to {}", cacheKey.getServer());
-      return connection.transport;
-    } else {
-      return createNewTransport(cacheKey);
+    ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, 
milliseconds, context);
+    if (preferCached) {
+      CachedConnection connection = connectionPool.reserveAny(cacheKey);
+      if (connection != null) {
+        log.trace("Using existing connection to {}", cacheKey.getServer());
+        return connection.transport;
+      }
     }
+    return createNewTransport(cacheKey);
   }
 
-  @VisibleForTesting
-  public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> 
servers,
-      boolean preferCachedConnection) throws TTransportException {
-
-    servers = new ArrayList<>(servers);
-
-    if (preferCachedConnection) {
-      HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
-
-      // randomly pick a server from the connection cache
-      serversSet.retainAll(connectionPool.getThriftTransportKeys());
-
-      if (!serversSet.isEmpty()) {
-        ArrayList<ThriftTransportKey> cachedServers = new 
ArrayList<>(serversSet);
-        Collections.shuffle(cachedServers, random);
-
-        for (ThriftTransportKey ttk : cachedServers) {
-          CachedConnection connection = connectionPool.reserveAny(ttk);
-          if (connection != null) {
-            final String serverAddr = ttk.getServer().toString();
-            log.trace("Using existing connection to {}", serverAddr);
-            return new Pair<>(serverAddr, connection.transport);
-          }
-
-        }
+  public Pair<String,TTransport> getAnyCachedTransport(ThriftClientTypes<?> 
type) {
+    final List<ThriftTransportKey> serversSet = new ArrayList<>();
+    for (ThriftTransportKey ttk : connectionPool.getThriftTransportKeys()) {
+      if (ttk.getType().equals(type)) {
+        serversSet.add(ttk);
       }
     }
-
-    int retryCount = 0;
-    while (!servers.isEmpty() && retryCount < 10) {
-
-      int index = random.nextInt(servers.size());
-      ThriftTransportKey ttk = servers.get(index);
-
-      if (preferCachedConnection) {
-        CachedConnection connection = connectionPool.reserveAnyIfPresent(ttk);
-        if (connection != null) {
-          return new Pair<>(ttk.getServer().toString(), connection.transport);
-        }
-      }
-
-      try {
-        return new Pair<>(ttk.getServer().toString(), createNewTransport(ttk));
-      } catch (TTransportException tte) {
-        log.debug("Failed to connect to {}", servers.get(index), tte);
-        servers.remove(index);
-        retryCount++;
+    if (serversSet.isEmpty()) {
+      return null;
+    }
+    Collections.shuffle(serversSet, random);
+    for (ThriftTransportKey ttk : serversSet) {
+      CachedConnection connection = connectionPool.reserveAny(ttk);
+      if (connection != null) {
+        final String serverAddr = ttk.getServer().toString();
+        log.trace("Using existing connection to {}", serverAddr);
+        return new Pair<>(serverAddr, connection.transport);
       }
     }
-
-    throw new TTransportException("Failed to connect to a server");
+    return null;
   }
 
   private TTransport createNewTransport(ThriftTransportKey cacheKey) throws 
TTransportException {
@@ -390,27 +359,6 @@ public class ThriftTransportPool {
       return executeWithinLock(key, connections::reserveAny);
     }
 
-    /**
-     * Reserve and return a new {@link CachedConnection} from the {@link 
CachedConnections} mapped
-     * to the specified transport key. If a {@link CachedConnections} is not 
found, null will be
-     * returned.
-     *
-     * <p>
-     *
-     * This operation locks access to the mapping for the key in {@link 
ConnectionPool#connections}
-     * until the operation completes.
-     *
-     * @param key the transport key
-     * @return the reserved {@link CachedConnection}, or null if none were 
available.
-     */
-    CachedConnection reserveAnyIfPresent(final ThriftTransportKey key) {
-      // It's possible that multiple locks from executeWithinLock will overlap 
with a single lock
-      // inside the ConcurrentHashMap which can unnecessarily block threads. 
Access the
-      // ConcurrentHashMap outside of executeWithinLock to prevent this.
-      var connections = getCachedConnections(key);
-      return connections == null ? null : executeWithinLock(key, 
connections::reserveAny);
-    }
-
     /**
      * Puts the specified connection into the reserved map of the {@link 
CachedConnections} for the
      * specified transport key. If a {@link CachedConnections} is not found, 
one will be created.
@@ -516,10 +464,6 @@ public class ThriftTransportPool {
       return lock;
     }
 
-    CachedConnections getCachedConnections(final ThriftTransportKey key) {
-      return connections.get(key);
-    }
-
     CachedConnections getOrCreateCachedConnections(final ThriftTransportKey 
key) {
       return connections.computeIfAbsent(key, k -> new CachedConnections());
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 6924b4c862..41c4650dd3 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -110,8 +110,8 @@ public class ThriftUtil {
    */
   public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> 
type,
       HostAndPort address, ClientContext context) throws TTransportException {
-    TTransport transport = context.getTransportPool().getTransport(address,
-        context.getClientTimeoutInMillis(), context);
+    TTransport transport = context.getTransportPool().getTransport(type, 
address,
+        context.getClientTimeoutInMillis(), context, true);
     return createClient(type, transport);
   }
 
@@ -126,7 +126,8 @@ public class ThriftUtil {
    */
   public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> 
type,
       HostAndPort address, ClientContext context, long timeout) throws 
TTransportException {
-    TTransport transport = context.getTransportPool().getTransport(address, 
timeout, context);
+    TTransport transport =
+        context.getTransportPool().getTransport(type, address, timeout, 
context, true);
     return createClient(type, transport);
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 53bb90db19..5f017668e9 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.Constants;
@@ -31,13 +33,13 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.clientImpl.AccumuloServerException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.ExecVoid;
+import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
@@ -57,44 +59,59 @@ public interface TServerClient<C extends TServiceClient> {
       ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned)
       throws TTransportException {
     checkArgument(context != null, "context is null");
-    long rpcTimeout = context.getClientTimeoutInMillis();
-    // create list of servers
-    ArrayList<ThriftTransportKey> servers = new ArrayList<>();
 
-    // add tservers
-    ZooCache zc = context.getZooCache();
-    for (String tserver : zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS)) {
+    if (preferCachedConnections) {
+      Pair<String,TTransport> cachedTransport =
+          context.getTransportPool().getAnyCachedTransport(type);
+      if (cachedTransport != null) {
+        C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+        warned.set(false);
+        return new Pair<String,C>(cachedTransport.getFirst(), client);
+      }
+    }
+
+    final long rpcTimeout = context.getClientTimeoutInMillis();
+    final ZooCache zc = context.getZooCache();
+    final List<String> tservers = new ArrayList<>();
+
+    tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS));
+
+    if (tservers.isEmpty()) {
+      if (warned.compareAndSet(false, true)) {
+        LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
+      }
+      throw new TTransportException("There are no servers for type: " + type);
+    }
+
+    // Try to connect to an online tserver
+    Collections.shuffle(tservers);
+    for (String tserver : tservers) {
       var zLocPath =
           ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + 
"/" + tserver);
       byte[] data = zc.getLockData(zLocPath);
       if (data != null) {
         String strData = new String(data, UTF_8);
         if (!strData.equals("manager")) {
-          servers.add(new ThriftTransportKey(
-              new ServerServices(strData).getAddress(Service.TSERV_CLIENT), 
rpcTimeout, context));
-        }
-      }
-    }
-
-    boolean opened = false;
-    try {
-      Pair<String,TTransport> pair =
-          context.getTransportPool().getAnyTransport(servers, 
preferCachedConnections);
-      C client = ThriftUtil.createClient(type, pair.getSecond());
-      opened = true;
-      warned.set(false);
-      return new Pair<>(pair.getFirst(), client);
-    } finally {
-      if (!opened) {
-        if (warned.compareAndSet(false, true)) {
-          if (servers.isEmpty()) {
-            LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
-          } else {
-            LOG.warn("Failed to find an available server in the list of 
servers: {}", servers);
+          final HostAndPort tserverClientAddress =
+              new ServerServices(strData).getAddress(Service.TSERV_CLIENT);
+          try {
+            TTransport transport = 
context.getTransportPool().getTransport(type,
+                tserverClientAddress, rpcTimeout, context, 
preferCachedConnections);
+            C client = ThriftUtil.createClient(type, transport);
+            warned.set(false);
+            return new Pair<String,C>(tserverClientAddress.toString(), client);
+          } catch (TTransportException e) {
+            LOG.trace("Error creating transport to {}", tserverClientAddress);
+            continue;
           }
         }
       }
     }
+    if (warned.compareAndSet(false, true)) {
+      LOG.warn("Failed to find an available server in the list of servers: {} 
for API type: {}",
+          tservers, type);
+    }
+    throw new TTransportException("Failed to connect to any server for API 
type " + type);
   }
 
   default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec)
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java
 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java
index bbc30e3531..95595506fb 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java
@@ -80,7 +80,7 @@ public abstract class ThriftClientTypes<C extends 
TServiceClient> {
   private final String serviceName;
   private final TServiceClientFactory<C> clientFactory;
 
-  public ThriftClientTypes(String serviceName, TServiceClientFactory<C> 
factory) {
+  protected ThriftClientTypes(String serviceName, TServiceClientFactory<C> 
factory) {
     this.serviceName = serviceName;
     this.clientFactory = factory;
   }
@@ -122,4 +122,8 @@ public abstract class ThriftClientTypes<C extends 
TServiceClient> {
     throw new UnsupportedOperationException("This method has not been 
implemented");
   }
 
+  @Override
+  public String toString() {
+    return serviceName;
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
index 7539ca3b66..bc9e7cb0de 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
@@ -35,6 +35,7 @@ import 
org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -76,9 +77,8 @@ public class ThriftTransportKeyTest {
     replay(clientCtx);
 
     try {
-      assertThrows(RuntimeException.class,
-          () -> new ThriftTransportKey(HostAndPort.fromParts("localhost", 
9999), 120_000,
-              clientCtx));
+      assertThrows(RuntimeException.class, () -> new 
ThriftTransportKey(ThriftClientTypes.CLIENT,
+          HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx));
     } finally {
       verify(clientCtx);
     }
@@ -98,9 +98,10 @@ public class ThriftTransportKeyTest {
         user1.doAs((PrivilegedExceptionAction<SaslConnectionParams>) () -> 
createSaslParams(token));
 
     ThriftTransportKey ttk1 =
-        new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1L, 
null, saslParams1),
-        ttk2 =
-            new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 
1L, null, saslParams2);
+        new ThriftTransportKey(ThriftClientTypes.CLIENT, 
HostAndPort.fromParts("localhost", 9997),
+            1L, null, saslParams1),
+        ttk2 = new ThriftTransportKey(ThriftClientTypes.CLIENT,
+            HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams2);
 
     // Should equals() and hashCode() to make sure we don't throw away thrift 
cnxns
     assertEquals(ttk1, ttk2);
@@ -119,9 +120,10 @@ public class ThriftTransportKeyTest {
         user2.doAs((PrivilegedExceptionAction<SaslConnectionParams>) () -> 
createSaslParams(token));
 
     ThriftTransportKey ttk1 =
-        new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1L, 
null, saslParams1),
-        ttk2 =
-            new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 
1L, null, saslParams2);
+        new ThriftTransportKey(ThriftClientTypes.CLIENT, 
HostAndPort.fromParts("localhost", 9997),
+            1L, null, saslParams1),
+        ttk2 = new ThriftTransportKey(ThriftClientTypes.CLIENT,
+            HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams2);
 
     assertNotEquals(ttk1, ttk2);
     assertNotEquals(ttk1.hashCode(), ttk2.hashCode());
@@ -136,9 +138,10 @@ public class ThriftTransportKeyTest {
 
     replay(clientCtx);
 
-    ThriftTransportKey ttk =
-        new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 
120_000, clientCtx);
+    ThriftTransportKey ttk = new ThriftTransportKey(ThriftClientTypes.CLIENT,
+        HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx);
 
     assertEquals(ttk, ttk, "Normal ThriftTransportKey doesn't equal itself");
+    assertEquals(ttk.hashCode(), ttk.hashCode());
   }
 }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 337f5bc685..f4819ebefb 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -491,8 +491,8 @@ public class CompactionCoordinator extends AbstractServer
       throws TTransportException {
     TServerConnection connection = tserverSet.getConnection(tserver);
     ServerContext serverContext = getContext();
-    TTransport transport =
-        serverContext.getTransportPool().getTransport(connection.getAddress(), 
0, serverContext);
+    TTransport transport = serverContext.getTransportPool().getTransport(
+        ThriftClientTypes.TABLET_SERVER, connection.getAddress(), 0, 
serverContext, true);
     return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport);
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java 
b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
index 8306b6a764..9c3474ffcc 100644
--- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@ -32,7 +32,9 @@ import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
 import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -62,43 +64,44 @@ public class TransportCachingIT extends 
AccumuloClusterHarness {
           
ConfigurationTypeHelper.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
 
       List<ThriftTransportKey> servers = tservers.stream().map(serverStr -> {
-        return new ThriftTransportKey(HostAndPort.fromString(serverStr), 
rpcTimeout, context);
+        return new ThriftTransportKey(ThriftClientTypes.CLIENT, 
HostAndPort.fromString(serverStr),
+            rpcTimeout, context);
       }).collect(Collectors.toList());
 
       // only want to use one server for all subsequent test
-      servers = servers.subList(0, 1);
+      ThriftTransportKey ttk = servers.get(0);
 
       ThriftTransportPool pool = context.getTransportPool();
-      TTransport first = getAnyTransport(servers, pool, true);
+      TTransport first = getAnyTransport(ttk, pool, true);
 
       assertNotNull(first);
       // Return it to unreserve it
       pool.returnTransport(first);
 
-      TTransport second = getAnyTransport(servers, pool, true);
+      TTransport second = getAnyTransport(ttk, pool, true);
 
       // We should get the same transport
       assertSame(first, second, "Expected the first and second to be the same 
instance");
       pool.returnTransport(second);
 
       // Ensure does not get cached connection just returned
-      TTransport third = getAnyTransport(servers, pool, false);
+      TTransport third = getAnyTransport(ttk, pool, false);
       assertNotSame(second, third, "Expected second and third transport to be 
different instances");
 
-      TTransport fourth = getAnyTransport(servers, pool, false);
+      TTransport fourth = getAnyTransport(ttk, pool, false);
       assertNotSame(third, fourth, "Expected third and fourth transport to be 
different instances");
 
       pool.returnTransport(third);
       pool.returnTransport(fourth);
 
       // The following three asserts ensure the per server queue is LIFO
-      TTransport fifth = getAnyTransport(servers, pool, true);
+      TTransport fifth = getAnyTransport(ttk, pool, true);
       assertSame(fourth, fifth, "Expected fourth and fifth transport to be the 
same instance");
 
-      TTransport sixth = getAnyTransport(servers, pool, true);
+      TTransport sixth = getAnyTransport(ttk, pool, true);
       assertSame(third, sixth, "Expected third and sixth transport to be the 
same instance");
 
-      TTransport seventh = getAnyTransport(servers, pool, true);
+      TTransport seventh = getAnyTransport(ttk, pool, true);
       assertSame(second, seventh, "Expected second and seventh transport to be 
the same instance");
 
       pool.returnTransport(fifth);
@@ -107,16 +110,21 @@ public class TransportCachingIT extends 
AccumuloClusterHarness {
     }
   }
 
-  private TTransport getAnyTransport(List<ThriftTransportKey> servers, 
ThriftTransportPool pool,
+  private TTransport getAnyTransport(ThriftTransportKey ttk, 
ThriftTransportPool pool,
       boolean preferCached) {
-    TTransport first = null;
-    while (first == null) {
-      try {
-        first = pool.getAnyTransport(servers, preferCached).getSecond();
-      } catch (TTransportException e) {
-        log.warn("Failed to obtain transport to {}", servers);
+    if (preferCached) {
+      Pair<String,TTransport> cached = 
pool.getAnyCachedTransport(ttk.getType());
+      if (cached != null) {
+        return cached.getSecond();
       }
     }
-    return first;
+    try {
+      return pool.getTransport(ttk.getType(), ttk.getServer(), 
ttk.getTimeout(), getServerContext(),
+          preferCached);
+    } catch (TTransportException e) {
+      log.warn("Failed to obtain transport to {}", ttk.getServer());
+    }
+    return null;
   }
+
 }

Reply via email to