This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f3d5fb01d701a6932d37d2f67f52cc0eefa64d50 Merge: ebf7054d1f d91d016211 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Mar 4 18:14:16 2024 +0000 Merge branch '2.1' .../core/clientImpl/ThriftTransportKey.java | 29 ++++-- .../core/clientImpl/ThriftTransportPool.java | 110 +++++---------------- .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++++++++++--------- .../core/rpc/clients/ThriftClientTypes.java | 6 +- .../core/clientImpl/ThriftTransportKeyTest.java | 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 ++++---- 8 files changed, 158 insertions(+), 170 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java index f332a09492,f4c7047d6d..0f84154a15 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java @@@ -24,9 -24,10 +24,10 @@@ import java.util.Objects import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; @VisibleForTesting public class ThriftTransportKey { @@@ -54,12 -58,18 +58,18 @@@ this.saslParams = saslParams; if (saslParams != null && sslParams != null) { // TSasl and TSSL transport factories don't play nicely together - throw new RuntimeException("Cannot use both SSL and SASL thrift transports"); + throw new IllegalArgumentException("Cannot use both SSL and SASL thrift transports"); } - this.hash = Objects.hash(server, timeout, sslParams, saslParams); + this.hash = Objects.hash(type, server, timeout, sslParams, saslParams); } - HostAndPort getServer() { + @VisibleForTesting + public ThriftClientTypes<?> getType() { + return type; + } + + @VisibleForTesting + public HostAndPort getServer() { return server; } diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index d1bc17e945,a3d38aa10a..b3f205fa2a --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@@ -41,6 -41,8 +41,7 @@@ import java.util.function.LongSupplier import java.util.function.Supplier; import org.apache.accumulo.core.rpc.ThriftUtil; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TConfiguration; @@@ -49,9 -51,7 +50,8 @@@ import org.apache.thrift.transport.TTra import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@@ -109,71 -110,40 +109,40 @@@ public class ThriftTransportPool return pool; } - public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context) - throws TTransportException { - ThriftTransportKey cacheKey = new ThriftTransportKey(location, milliseconds, context); + public TTransport getTransport(ThriftClientTypes<?> type, HostAndPort location, long milliseconds, + ClientContext context, boolean preferCached) throws TTransportException { - CachedConnection connection = connectionPool.reserveAny(cacheKey); - - if (connection != null) { - log.trace("Using existing connection to {}", cacheKey.getServer()); - return connection.transport; - } else { - return createNewTransport(cacheKey); + ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, milliseconds, context); + if (preferCached) { + CachedConnection connection = connectionPool.reserveAny(cacheKey); + if (connection != null) { + log.trace("Using existing connection to {}", cacheKey.getServer()); + return connection.transport; + } } + return createNewTransport(cacheKey); } - @VisibleForTesting - public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, - boolean preferCachedConnection) throws TTransportException { - - servers = new ArrayList<>(servers); - - if (preferCachedConnection) { - HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers); - - // randomly pick a server from the connection cache - serversSet.retainAll(connectionPool.getThriftTransportKeys()); - - if (!serversSet.isEmpty()) { - ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet); - Collections.shuffle(cachedServers, RANDOM.get()); - - for (ThriftTransportKey ttk : cachedServers) { - CachedConnection connection = connectionPool.reserveAny(ttk); - if (connection != null) { - final String serverAddr = ttk.getServer().toString(); - log.trace("Using existing connection to {}", serverAddr); - return new Pair<>(serverAddr, connection.transport); - } - - } + public Pair<String,TTransport> getAnyCachedTransport(ThriftClientTypes<?> type) { + final List<ThriftTransportKey> serversSet = new ArrayList<>(); + for (ThriftTransportKey ttk : connectionPool.getThriftTransportKeys()) { + if (ttk.getType().equals(type)) { + serversSet.add(ttk); } } - - int retryCount = 0; - while (!servers.isEmpty() && retryCount < 10) { - - int index = RANDOM.get().nextInt(servers.size()); - ThriftTransportKey ttk = servers.get(index); - - if (preferCachedConnection) { - CachedConnection connection = connectionPool.reserveAnyIfPresent(ttk); - if (connection != null) { - return new Pair<>(ttk.getServer().toString(), connection.transport); - } - } - - try { - return new Pair<>(ttk.getServer().toString(), createNewTransport(ttk)); - } catch (TTransportException tte) { - log.debug("Failed to connect to {}", servers.get(index), tte); - servers.remove(index); - retryCount++; + if (serversSet.isEmpty()) { + return null; + } - Collections.shuffle(serversSet, random); ++ Collections.shuffle(serversSet, RANDOM.get()); + for (ThriftTransportKey ttk : serversSet) { + CachedConnection connection = connectionPool.reserveAny(ttk); + if (connection != null) { + final String serverAddr = ttk.getServer().toString(); + log.trace("Using existing connection to {}", serverAddr); + return new Pair<>(serverAddr, connection.transport); } } - - throw new TTransportException("Failed to connect to a server"); + return null; } private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException { diff --cc core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index f96d9f62df,5f017668e9..4027f4b0c9 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@@ -26,6 -26,6 +26,7 @@@ import static org.apache.accumulo.core. import java.util.ArrayList; import java.util.Collections; import java.util.List; ++import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.Constants; @@@ -33,11 -33,9 +34,11 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.AccumuloServerException; import org.apache.accumulo.core.clientImpl.ClientContext; - import org.apache.accumulo.core.clientImpl.ThriftTransportKey; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.lock.ServiceLock; ++import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.ExecVoid; @@@ -49,68 -50,68 +50,84 @@@ import org.apache.thrift.transport.TTra import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; ++import com.google.common.net.HostAndPort; ++ public interface TServerClient<C extends TServiceClient> { - Pair<String,C> getTabletServerConnection(ClientContext context, boolean preferCachedConnections) + Pair<String,C> getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException; - default Pair<String,C> getTabletServerConnection(Logger LOG, ThriftClientTypes<C> type, - ClientContext context, boolean preferCachedConnections, AtomicBoolean warned) - throws TTransportException { + default Pair<String,C> getThriftServerConnection(Logger LOG, ThriftClientTypes<C> type, + ClientContext context, boolean preferCachedConnections, AtomicBoolean warned, + ThriftService service) throws TTransportException { checkArgument(context != null, "context is null"); - long rpcTimeout = context.getClientTimeoutInMillis(); - // create list of servers - ArrayList<ThriftTransportKey> servers = new ArrayList<>(); - // add tservers - List<String> serverPaths = new ArrayList<>(); - serverPaths.add(context.getZooKeeperRoot() + Constants.ZTSERVERS); + if (preferCachedConnections) { + Pair<String,TTransport> cachedTransport = + context.getTransportPool().getAnyCachedTransport(type); + if (cachedTransport != null) { + C client = ThriftUtil.createClient(type, cachedTransport.getSecond()); + warned.set(false); + return new Pair<String,C>(cachedTransport.getFirst(), client); + } + } + + final long rpcTimeout = context.getClientTimeoutInMillis(); ++ final String tserverZooPath = context.getZooKeeperRoot() + Constants.ZTSERVERS; ++ final String sserverZooPath = context.getZooKeeperRoot() + Constants.ZSSERVERS; ++ final String compactorZooPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS; + final ZooCache zc = context.getZooCache(); - final List<String> tservers = new ArrayList<>(); + - tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS)); ++ final List<String> serverPaths = new ArrayList<>(); ++ zc.getChildren(tserverZooPath).forEach(tserverAddress -> { ++ serverPaths.add(tserverZooPath + "/" + tserverAddress); ++ }); + if (type == ThriftClientTypes.CLIENT) { - serverPaths.add(context.getZooKeeperRoot() + Constants.ZCOMPACTORS); - serverPaths.add(context.getZooKeeperRoot() + Constants.ZSSERVERS); - Collections.shuffle(serverPaths, RANDOM.get()); ++ zc.getChildren(sserverZooPath).forEach(sserverAddress -> { ++ serverPaths.add(sserverZooPath + "/" + sserverAddress); ++ }); ++ zc.getChildren(compactorZooPath).forEach(compactorGroup -> { ++ zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { ++ serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); ++ }); ++ }); + } - ZooCache zc = context.getZooCache(); - for (String serverPath : serverPaths) { - if (serverPath.endsWith(Constants.ZCOMPACTORS)) { - // Compactor path has another subdirectory, the group / queue name - for (String groupName : zc.getChildren(serverPath)) { - for (String server : zc.getChildren(serverPath + "/" + groupName)) { - var zLocPath = ServiceLock.path(serverPath + "/" + groupName + "/" + server); - zc.getLockData(zLocPath).map(sld -> sld.getAddress(service)) - .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) - .ifPresent(servers::add); - } - } - } else { - for (String server : zc.getChildren(serverPath)) { - var zLocPath = ServiceLock.path(serverPath + "/" + server); - zc.getLockData(zLocPath).map(sld -> sld.getAddress(service)) - .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) - .ifPresent(servers::add); - } + - if (tservers.isEmpty()) { ++ if (serverPaths.isEmpty()) { + if (warned.compareAndSet(false, true)) { - LOG.warn("There are no tablet servers: check that zookeeper and accumulo are running."); ++ LOG.warn( ++ "There are no servers serving the {} api: check that zookeeper and accumulo are running.", ++ type); } + throw new TTransportException("There are no servers for type: " + type); } ++ Collections.shuffle(serverPaths, RANDOM.get()); - boolean opened = false; - try { - Pair<String,TTransport> pair = - context.getTransportPool().getAnyTransport(servers, preferCachedConnections); - C client = ThriftUtil.createClient(type, pair.getSecond()); - opened = true; - warned.set(false); - return new Pair<>(pair.getFirst(), client); - } finally { - if (!opened) { - if (warned.compareAndSet(false, true)) { - if (servers.isEmpty()) { - LOG.warn("There are no tablet servers: check that zookeeper and accumulo are running."); - } else { - LOG.warn("Failed to find an available server in the list of servers: {}", servers); - // Try to connect to an online tserver - Collections.shuffle(tservers); - for (String tserver : tservers) { - var zLocPath = - ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + tserver); - byte[] data = zc.getLockData(zLocPath); - if (data != null) { - String strData = new String(data, UTF_8); - if (!strData.equals("manager")) { - final HostAndPort tserverClientAddress = - new ServerServices(strData).getAddress(Service.TSERV_CLIENT); ++ for (String serverPath : serverPaths) { ++ var zLocPath = ServiceLock.path(serverPath); ++ Optional<ServiceLockData> data = zc.getLockData(zLocPath); ++ if (data != null && data.isPresent()) { ++ HostAndPort tserverClientAddress = data.orElseThrow().getAddress(service); ++ if (tserverClientAddress != null) { + try { + TTransport transport = context.getTransportPool().getTransport(type, + tserverClientAddress, rpcTimeout, context, preferCachedConnections); + C client = ThriftUtil.createClient(type, transport); + warned.set(false); + return new Pair<String,C>(tserverClientAddress.toString(), client); + } catch (TTransportException e) { + LOG.trace("Error creating transport to {}", tserverClientAddress); + continue; } } } } ++ + if (warned.compareAndSet(false, true)) { + LOG.warn("Failed to find an available server in the list of servers: {} for API type: {}", - tservers, type); ++ serverPaths, type); + } + throw new TTransportException("Failed to connect to any server for API type " + type); } default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec) diff --cc core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java index ee2d9b12c7,bc9e7cb0de..44fd571489 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java @@@ -35,6 -35,8 +35,7 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; diff --cc test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java index 2a17353fd9,9c3474ffcc..5396db64da --- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java @@@ -32,6 -32,9 +32,8 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.clientImpl.ThriftTransportPool; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; + import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException;