This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new d91d016211 Optimized logic for getting a random TabletServer connection (#4309) d91d016211 is described below commit d91d0162115ae66112a104278bcd14e8085936d3 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Mar 4 09:16:58 2024 -0500 Optimized logic for getting a random TabletServer connection (#4309) The previous logic in this class would gather all of the Tserver ZNodes in ZooKeeper, then get the data for each ZNode and validate their ServiceLock. Then, after all of that it would randomly pick one of the TabletServers to connect to. It did this through the ZooCache object which on an initial connection would be empty and causes a lot of back and forth to ZooKeeper. The side effect of this is that the ZooCache would be populated with TabletServer information. This change modifies TServerClient such that it no longer populates ZooCache information for each TabletServer and modifies the default logic for getting a connection to a TabletServer. The new logic will make 3 calls to ZooKeeper in the best case scenario, one to get the list of TServer ZNodes in Zookeeper, one to get the ServiceLock for a random TServer and another to get the ZNode data for one of it. This is all done through ZooCache, so it is lazily populated over time instead of incurring the penalty when getting the first TabletServer connection. Fixes #4303 --- .../core/clientImpl/ThriftTransportKey.java | 29 ++++-- .../core/clientImpl/ThriftTransportPool.java | 110 +++++---------------- .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 73 ++++++++------ .../core/rpc/clients/ThriftClientTypes.java | 6 +- .../core/clientImpl/ThriftTransportKeyTest.java | 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 ++++---- 8 files changed, 142 insertions(+), 154 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java index 8be320dcc5..f4c7047d6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java @@ -24,12 +24,14 @@ import java.util.Objects; import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; import com.google.common.annotations.VisibleForTesting; @VisibleForTesting public class ThriftTransportKey { + private final ThriftClientTypes<?> type; private final HostAndPort server; private final long timeout; private final SslConnectionParams sslParams; @@ -38,16 +40,18 @@ public class ThriftTransportKey { private final int hash; @VisibleForTesting - public ThriftTransportKey(HostAndPort server, long timeout, ClientContext context) { - this(server, timeout, context.getClientSslParams(), context.getSaslParams()); + public ThriftTransportKey(ThriftClientTypes<?> type, HostAndPort server, long timeout, + ClientContext context) { + this(type, server, timeout, context.getClientSslParams(), context.getSaslParams()); } /** * Visible only for testing */ - ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams sslParams, - SaslConnectionParams saslParams) { + ThriftTransportKey(ThriftClientTypes<?> type, HostAndPort server, long timeout, + SslConnectionParams sslParams, SaslConnectionParams saslParams) { requireNonNull(server, "location is null"); + this.type = type; this.server = server; this.timeout = timeout; this.sslParams = sslParams; @@ -56,14 +60,21 @@ public class ThriftTransportKey { // TSasl and TSSL transport factories don't play nicely together throw new RuntimeException("Cannot use both SSL and SASL thrift transports"); } - this.hash = Objects.hash(server, timeout, sslParams, saslParams); + this.hash = Objects.hash(type, server, timeout, sslParams, saslParams); } - HostAndPort getServer() { + @VisibleForTesting + public ThriftClientTypes<?> getType() { + return type; + } + + @VisibleForTesting + public HostAndPort getServer() { return server; } - long getTimeout() { + @VisibleForTesting + public long getTimeout() { return timeout; } @@ -81,7 +92,7 @@ public class ThriftTransportKey { return false; } ThriftTransportKey ttk = (ThriftTransportKey) o; - return server.equals(ttk.server) && timeout == ttk.timeout + return type.equals(ttk.type) && server.equals(ttk.server) && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams))) && (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams))); } @@ -99,7 +110,7 @@ public class ThriftTransportKey { } else if (isSasl()) { prefix = saslParams + ":"; } - return prefix + server + " (" + timeout + ")"; + return prefix + type + ":" + server + " (" + timeout + ")"; } public SslConnectionParams getSslParams() { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index bd23c5dcbd..a3d38aa10a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@ -41,6 +41,7 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; @@ -50,7 +51,6 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -110,71 +110,40 @@ public class ThriftTransportPool { return pool; } - public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context) - throws TTransportException { - ThriftTransportKey cacheKey = new ThriftTransportKey(location, milliseconds, context); + public TTransport getTransport(ThriftClientTypes<?> type, HostAndPort location, long milliseconds, + ClientContext context, boolean preferCached) throws TTransportException { - CachedConnection connection = connectionPool.reserveAny(cacheKey); - - if (connection != null) { - log.trace("Using existing connection to {}", cacheKey.getServer()); - return connection.transport; - } else { - return createNewTransport(cacheKey); + ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, milliseconds, context); + if (preferCached) { + CachedConnection connection = connectionPool.reserveAny(cacheKey); + if (connection != null) { + log.trace("Using existing connection to {}", cacheKey.getServer()); + return connection.transport; + } } + return createNewTransport(cacheKey); } - @VisibleForTesting - public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, - boolean preferCachedConnection) throws TTransportException { - - servers = new ArrayList<>(servers); - - if (preferCachedConnection) { - HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers); - - // randomly pick a server from the connection cache - serversSet.retainAll(connectionPool.getThriftTransportKeys()); - - if (!serversSet.isEmpty()) { - ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet); - Collections.shuffle(cachedServers, random); - - for (ThriftTransportKey ttk : cachedServers) { - CachedConnection connection = connectionPool.reserveAny(ttk); - if (connection != null) { - final String serverAddr = ttk.getServer().toString(); - log.trace("Using existing connection to {}", serverAddr); - return new Pair<>(serverAddr, connection.transport); - } - - } + public Pair<String,TTransport> getAnyCachedTransport(ThriftClientTypes<?> type) { + final List<ThriftTransportKey> serversSet = new ArrayList<>(); + for (ThriftTransportKey ttk : connectionPool.getThriftTransportKeys()) { + if (ttk.getType().equals(type)) { + serversSet.add(ttk); } } - - int retryCount = 0; - while (!servers.isEmpty() && retryCount < 10) { - - int index = random.nextInt(servers.size()); - ThriftTransportKey ttk = servers.get(index); - - if (preferCachedConnection) { - CachedConnection connection = connectionPool.reserveAnyIfPresent(ttk); - if (connection != null) { - return new Pair<>(ttk.getServer().toString(), connection.transport); - } - } - - try { - return new Pair<>(ttk.getServer().toString(), createNewTransport(ttk)); - } catch (TTransportException tte) { - log.debug("Failed to connect to {}", servers.get(index), tte); - servers.remove(index); - retryCount++; + if (serversSet.isEmpty()) { + return null; + } + Collections.shuffle(serversSet, random); + for (ThriftTransportKey ttk : serversSet) { + CachedConnection connection = connectionPool.reserveAny(ttk); + if (connection != null) { + final String serverAddr = ttk.getServer().toString(); + log.trace("Using existing connection to {}", serverAddr); + return new Pair<>(serverAddr, connection.transport); } } - - throw new TTransportException("Failed to connect to a server"); + return null; } private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException { @@ -390,27 +359,6 @@ public class ThriftTransportPool { return executeWithinLock(key, connections::reserveAny); } - /** - * Reserve and return a new {@link CachedConnection} from the {@link CachedConnections} mapped - * to the specified transport key. If a {@link CachedConnections} is not found, null will be - * returned. - * - * <p> - * - * This operation locks access to the mapping for the key in {@link ConnectionPool#connections} - * until the operation completes. - * - * @param key the transport key - * @return the reserved {@link CachedConnection}, or null if none were available. - */ - CachedConnection reserveAnyIfPresent(final ThriftTransportKey key) { - // It's possible that multiple locks from executeWithinLock will overlap with a single lock - // inside the ConcurrentHashMap which can unnecessarily block threads. Access the - // ConcurrentHashMap outside of executeWithinLock to prevent this. - var connections = getCachedConnections(key); - return connections == null ? null : executeWithinLock(key, connections::reserveAny); - } - /** * Puts the specified connection into the reserved map of the {@link CachedConnections} for the * specified transport key. If a {@link CachedConnections} is not found, one will be created. @@ -516,10 +464,6 @@ public class ThriftTransportPool { return lock; } - CachedConnections getCachedConnections(final ThriftTransportKey key) { - return connections.get(key); - } - CachedConnections getOrCreateCachedConnections(final ThriftTransportKey key) { return connections.computeIfAbsent(key, k -> new CachedConnections()); } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 6924b4c862..41c4650dd3 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -110,8 +110,8 @@ public class ThriftUtil { */ public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type, HostAndPort address, ClientContext context) throws TTransportException { - TTransport transport = context.getTransportPool().getTransport(address, - context.getClientTimeoutInMillis(), context); + TTransport transport = context.getTransportPool().getTransport(type, address, + context.getClientTimeoutInMillis(), context, true); return createClient(type, transport); } @@ -126,7 +126,8 @@ public class ThriftUtil { */ public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type, HostAndPort address, ClientContext context, long timeout) throws TTransportException { - TTransport transport = context.getTransportPool().getTransport(address, timeout, context); + TTransport transport = + context.getTransportPool().getTransport(type, address, timeout, context, true); return createClient(type, transport); } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index 53bb90db19..5f017668e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.Constants; @@ -31,13 +33,13 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.AccumuloServerException; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.ThriftTransportKey; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.ExecVoid; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; @@ -57,44 +59,59 @@ public interface TServerClient<C extends TServiceClient> { ClientContext context, boolean preferCachedConnections, AtomicBoolean warned) throws TTransportException { checkArgument(context != null, "context is null"); - long rpcTimeout = context.getClientTimeoutInMillis(); - // create list of servers - ArrayList<ThriftTransportKey> servers = new ArrayList<>(); - // add tservers - ZooCache zc = context.getZooCache(); - for (String tserver : zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS)) { + if (preferCachedConnections) { + Pair<String,TTransport> cachedTransport = + context.getTransportPool().getAnyCachedTransport(type); + if (cachedTransport != null) { + C client = ThriftUtil.createClient(type, cachedTransport.getSecond()); + warned.set(false); + return new Pair<String,C>(cachedTransport.getFirst(), client); + } + } + + final long rpcTimeout = context.getClientTimeoutInMillis(); + final ZooCache zc = context.getZooCache(); + final List<String> tservers = new ArrayList<>(); + + tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS)); + + if (tservers.isEmpty()) { + if (warned.compareAndSet(false, true)) { + LOG.warn("There are no tablet servers: check that zookeeper and accumulo are running."); + } + throw new TTransportException("There are no servers for type: " + type); + } + + // Try to connect to an online tserver + Collections.shuffle(tservers); + for (String tserver : tservers) { var zLocPath = ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + tserver); byte[] data = zc.getLockData(zLocPath); if (data != null) { String strData = new String(data, UTF_8); if (!strData.equals("manager")) { - servers.add(new ThriftTransportKey( - new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context)); - } - } - } - - boolean opened = false; - try { - Pair<String,TTransport> pair = - context.getTransportPool().getAnyTransport(servers, preferCachedConnections); - C client = ThriftUtil.createClient(type, pair.getSecond()); - opened = true; - warned.set(false); - return new Pair<>(pair.getFirst(), client); - } finally { - if (!opened) { - if (warned.compareAndSet(false, true)) { - if (servers.isEmpty()) { - LOG.warn("There are no tablet servers: check that zookeeper and accumulo are running."); - } else { - LOG.warn("Failed to find an available server in the list of servers: {}", servers); + final HostAndPort tserverClientAddress = + new ServerServices(strData).getAddress(Service.TSERV_CLIENT); + try { + TTransport transport = context.getTransportPool().getTransport(type, + tserverClientAddress, rpcTimeout, context, preferCachedConnections); + C client = ThriftUtil.createClient(type, transport); + warned.set(false); + return new Pair<String,C>(tserverClientAddress.toString(), client); + } catch (TTransportException e) { + LOG.trace("Error creating transport to {}", tserverClientAddress); + continue; } } } } + if (warned.compareAndSet(false, true)) { + LOG.warn("Failed to find an available server in the list of servers: {} for API type: {}", + tservers, type); + } + throw new TTransportException("Failed to connect to any server for API type " + type); } default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec) diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java index bbc30e3531..95595506fb 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ThriftClientTypes.java @@ -80,7 +80,7 @@ public abstract class ThriftClientTypes<C extends TServiceClient> { private final String serviceName; private final TServiceClientFactory<C> clientFactory; - public ThriftClientTypes(String serviceName, TServiceClientFactory<C> factory) { + protected ThriftClientTypes(String serviceName, TServiceClientFactory<C> factory) { this.serviceName = serviceName; this.clientFactory = factory; } @@ -122,4 +122,8 @@ public abstract class ThriftClientTypes<C extends TServiceClient> { throw new UnsupportedOperationException("This method has not been implemented"); } + @Override + public String toString() { + return serviceName; + } } diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java index 7539ca3b66..bc9e7cb0de 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -76,9 +77,8 @@ public class ThriftTransportKeyTest { replay(clientCtx); try { - assertThrows(RuntimeException.class, - () -> new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 120_000, - clientCtx)); + assertThrows(RuntimeException.class, () -> new ThriftTransportKey(ThriftClientTypes.CLIENT, + HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx)); } finally { verify(clientCtx); } @@ -98,9 +98,10 @@ public class ThriftTransportKeyTest { user1.doAs((PrivilegedExceptionAction<SaslConnectionParams>) () -> createSaslParams(token)); ThriftTransportKey ttk1 = - new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams1), - ttk2 = - new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams2); + new ThriftTransportKey(ThriftClientTypes.CLIENT, HostAndPort.fromParts("localhost", 9997), + 1L, null, saslParams1), + ttk2 = new ThriftTransportKey(ThriftClientTypes.CLIENT, + HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams2); // Should equals() and hashCode() to make sure we don't throw away thrift cnxns assertEquals(ttk1, ttk2); @@ -119,9 +120,10 @@ public class ThriftTransportKeyTest { user2.doAs((PrivilegedExceptionAction<SaslConnectionParams>) () -> createSaslParams(token)); ThriftTransportKey ttk1 = - new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams1), - ttk2 = - new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams2); + new ThriftTransportKey(ThriftClientTypes.CLIENT, HostAndPort.fromParts("localhost", 9997), + 1L, null, saslParams1), + ttk2 = new ThriftTransportKey(ThriftClientTypes.CLIENT, + HostAndPort.fromParts("localhost", 9997), 1L, null, saslParams2); assertNotEquals(ttk1, ttk2); assertNotEquals(ttk1.hashCode(), ttk2.hashCode()); @@ -136,9 +138,10 @@ public class ThriftTransportKeyTest { replay(clientCtx); - ThriftTransportKey ttk = - new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx); + ThriftTransportKey ttk = new ThriftTransportKey(ThriftClientTypes.CLIENT, + HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx); assertEquals(ttk, ttk, "Normal ThriftTransportKey doesn't equal itself"); + assertEquals(ttk.hashCode(), ttk.hashCode()); } } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 337f5bc685..f4819ebefb 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -491,8 +491,8 @@ public class CompactionCoordinator extends AbstractServer throws TTransportException { TServerConnection connection = tserverSet.getConnection(tserver); ServerContext serverContext = getContext(); - TTransport transport = - serverContext.getTransportPool().getTransport(connection.getAddress(), 0, serverContext); + TTransport transport = serverContext.getTransportPool().getTransport( + ThriftClientTypes.TABLET_SERVER, connection.getAddress(), 0, serverContext, true); return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport); } diff --git a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java index 8306b6a764..9c3474ffcc 100644 --- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java @@ -32,7 +32,9 @@ import org.apache.accumulo.core.clientImpl.ThriftTransportKey; import org.apache.accumulo.core.clientImpl.ThriftTransportPool; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -62,43 +64,44 @@ public class TransportCachingIT extends AccumuloClusterHarness { ConfigurationTypeHelper.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue()); List<ThriftTransportKey> servers = tservers.stream().map(serverStr -> { - return new ThriftTransportKey(HostAndPort.fromString(serverStr), rpcTimeout, context); + return new ThriftTransportKey(ThriftClientTypes.CLIENT, HostAndPort.fromString(serverStr), + rpcTimeout, context); }).collect(Collectors.toList()); // only want to use one server for all subsequent test - servers = servers.subList(0, 1); + ThriftTransportKey ttk = servers.get(0); ThriftTransportPool pool = context.getTransportPool(); - TTransport first = getAnyTransport(servers, pool, true); + TTransport first = getAnyTransport(ttk, pool, true); assertNotNull(first); // Return it to unreserve it pool.returnTransport(first); - TTransport second = getAnyTransport(servers, pool, true); + TTransport second = getAnyTransport(ttk, pool, true); // We should get the same transport assertSame(first, second, "Expected the first and second to be the same instance"); pool.returnTransport(second); // Ensure does not get cached connection just returned - TTransport third = getAnyTransport(servers, pool, false); + TTransport third = getAnyTransport(ttk, pool, false); assertNotSame(second, third, "Expected second and third transport to be different instances"); - TTransport fourth = getAnyTransport(servers, pool, false); + TTransport fourth = getAnyTransport(ttk, pool, false); assertNotSame(third, fourth, "Expected third and fourth transport to be different instances"); pool.returnTransport(third); pool.returnTransport(fourth); // The following three asserts ensure the per server queue is LIFO - TTransport fifth = getAnyTransport(servers, pool, true); + TTransport fifth = getAnyTransport(ttk, pool, true); assertSame(fourth, fifth, "Expected fourth and fifth transport to be the same instance"); - TTransport sixth = getAnyTransport(servers, pool, true); + TTransport sixth = getAnyTransport(ttk, pool, true); assertSame(third, sixth, "Expected third and sixth transport to be the same instance"); - TTransport seventh = getAnyTransport(servers, pool, true); + TTransport seventh = getAnyTransport(ttk, pool, true); assertSame(second, seventh, "Expected second and seventh transport to be the same instance"); pool.returnTransport(fifth); @@ -107,16 +110,21 @@ public class TransportCachingIT extends AccumuloClusterHarness { } } - private TTransport getAnyTransport(List<ThriftTransportKey> servers, ThriftTransportPool pool, + private TTransport getAnyTransport(ThriftTransportKey ttk, ThriftTransportPool pool, boolean preferCached) { - TTransport first = null; - while (first == null) { - try { - first = pool.getAnyTransport(servers, preferCached).getSecond(); - } catch (TTransportException e) { - log.warn("Failed to obtain transport to {}", servers); + if (preferCached) { + Pair<String,TTransport> cached = pool.getAnyCachedTransport(ttk.getType()); + if (cached != null) { + return cached.getSecond(); } } - return first; + try { + return pool.getTransport(ttk.getType(), ttk.getServer(), ttk.getTimeout(), getServerContext(), + preferCached); + } catch (TTransportException e) { + log.warn("Failed to obtain transport to {}", ttk.getServer()); + } + return null; } + }