This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit f3d5fb01d701a6932d37d2f67f52cc0eefa64d50
Merge: ebf7054d1f d91d016211
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Mar 4 18:14:16 2024 +0000

    Merge branch '2.1'

 .../core/clientImpl/ThriftTransportKey.java        |  29 ++++--
 .../core/clientImpl/ThriftTransportPool.java       | 110 +++++----------------
 .../org/apache/accumulo/core/rpc/ThriftUtil.java   |   7 +-
 .../accumulo/core/rpc/clients/TServerClient.java   | 105 +++++++++++---------
 .../core/rpc/clients/ThriftClientTypes.java        |   6 +-
 .../core/clientImpl/ThriftTransportKeyTest.java    |  25 ++---
 .../coordinator/CompactionCoordinator.java         |   4 +-
 .../apache/accumulo/test/TransportCachingIT.java   |  42 ++++----
 8 files changed, 158 insertions(+), 170 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
index f332a09492,f4c7047d6d..0f84154a15
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java
@@@ -24,9 -24,10 +24,10 @@@ import java.util.Objects
  
  import org.apache.accumulo.core.rpc.SaslConnectionParams;
  import org.apache.accumulo.core.rpc.SslConnectionParams;
+ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import org.apache.accumulo.core.util.HostAndPort;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.net.HostAndPort;
  
  @VisibleForTesting
  public class ThriftTransportKey {
@@@ -54,12 -58,18 +58,18 @@@
      this.saslParams = saslParams;
      if (saslParams != null && sslParams != null) {
        // TSasl and TSSL transport factories don't play nicely together
 -      throw new RuntimeException("Cannot use both SSL and SASL thrift 
transports");
 +      throw new IllegalArgumentException("Cannot use both SSL and SASL thrift 
transports");
      }
-     this.hash = Objects.hash(server, timeout, sslParams, saslParams);
+     this.hash = Objects.hash(type, server, timeout, sslParams, saslParams);
    }
  
-   HostAndPort getServer() {
+   @VisibleForTesting
+   public ThriftClientTypes<?> getType() {
+     return type;
+   }
+ 
+   @VisibleForTesting
+   public HostAndPort getServer() {
      return server;
    }
  
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index d1bc17e945,a3d38aa10a..b3f205fa2a
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@@ -41,6 -41,8 +41,7 @@@ import java.util.function.LongSupplier
  import java.util.function.Supplier;
  
  import org.apache.accumulo.core.rpc.ThriftUtil;
+ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.thrift.TConfiguration;
@@@ -49,9 -51,7 +50,8 @@@ import org.apache.thrift.transport.TTra
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
 +import com.google.common.net.HostAndPort;
  
  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  
@@@ -109,71 -110,40 +109,40 @@@ public class ThriftTransportPool 
      return pool;
    }
  
-   public TTransport getTransport(HostAndPort location, long milliseconds, 
ClientContext context)
-       throws TTransportException {
-     ThriftTransportKey cacheKey = new ThriftTransportKey(location, 
milliseconds, context);
+   public TTransport getTransport(ThriftClientTypes<?> type, HostAndPort 
location, long milliseconds,
+       ClientContext context, boolean preferCached) throws TTransportException 
{
  
-     CachedConnection connection = connectionPool.reserveAny(cacheKey);
- 
-     if (connection != null) {
-       log.trace("Using existing connection to {}", cacheKey.getServer());
-       return connection.transport;
-     } else {
-       return createNewTransport(cacheKey);
+     ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, 
milliseconds, context);
+     if (preferCached) {
+       CachedConnection connection = connectionPool.reserveAny(cacheKey);
+       if (connection != null) {
+         log.trace("Using existing connection to {}", cacheKey.getServer());
+         return connection.transport;
+       }
      }
+     return createNewTransport(cacheKey);
    }
  
-   @VisibleForTesting
-   public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> 
servers,
-       boolean preferCachedConnection) throws TTransportException {
- 
-     servers = new ArrayList<>(servers);
- 
-     if (preferCachedConnection) {
-       HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
- 
-       // randomly pick a server from the connection cache
-       serversSet.retainAll(connectionPool.getThriftTransportKeys());
- 
-       if (!serversSet.isEmpty()) {
-         ArrayList<ThriftTransportKey> cachedServers = new 
ArrayList<>(serversSet);
-         Collections.shuffle(cachedServers, RANDOM.get());
- 
-         for (ThriftTransportKey ttk : cachedServers) {
-           CachedConnection connection = connectionPool.reserveAny(ttk);
-           if (connection != null) {
-             final String serverAddr = ttk.getServer().toString();
-             log.trace("Using existing connection to {}", serverAddr);
-             return new Pair<>(serverAddr, connection.transport);
-           }
- 
-         }
+   public Pair<String,TTransport> getAnyCachedTransport(ThriftClientTypes<?> 
type) {
+     final List<ThriftTransportKey> serversSet = new ArrayList<>();
+     for (ThriftTransportKey ttk : connectionPool.getThriftTransportKeys()) {
+       if (ttk.getType().equals(type)) {
+         serversSet.add(ttk);
        }
      }
- 
-     int retryCount = 0;
-     while (!servers.isEmpty() && retryCount < 10) {
- 
-       int index = RANDOM.get().nextInt(servers.size());
-       ThriftTransportKey ttk = servers.get(index);
- 
-       if (preferCachedConnection) {
-         CachedConnection connection = connectionPool.reserveAnyIfPresent(ttk);
-         if (connection != null) {
-           return new Pair<>(ttk.getServer().toString(), connection.transport);
-         }
-       }
- 
-       try {
-         return new Pair<>(ttk.getServer().toString(), 
createNewTransport(ttk));
-       } catch (TTransportException tte) {
-         log.debug("Failed to connect to {}", servers.get(index), tte);
-         servers.remove(index);
-         retryCount++;
+     if (serversSet.isEmpty()) {
+       return null;
+     }
 -    Collections.shuffle(serversSet, random);
++    Collections.shuffle(serversSet, RANDOM.get());
+     for (ThriftTransportKey ttk : serversSet) {
+       CachedConnection connection = connectionPool.reserveAny(ttk);
+       if (connection != null) {
+         final String serverAddr = ttk.getServer().toString();
+         log.trace("Using existing connection to {}", serverAddr);
+         return new Pair<>(serverAddr, connection.transport);
        }
      }
- 
-     throw new TTransportException("Failed to connect to a server");
+     return null;
    }
  
    private TTransport createNewTransport(ThriftTransportKey cacheKey) throws 
TTransportException {
diff --cc 
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index f96d9f62df,5f017668e9..4027f4b0c9
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@@ -26,6 -26,6 +26,7 @@@ import static org.apache.accumulo.core.
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.List;
++import java.util.Optional;
  import java.util.concurrent.atomic.AtomicBoolean;
  
  import org.apache.accumulo.core.Constants;
@@@ -33,11 -33,9 +34,11 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.clientImpl.AccumuloServerException;
  import org.apache.accumulo.core.clientImpl.ClientContext;
- import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
  import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
  import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.core.lock.ServiceLock;
++import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.ExecVoid;
@@@ -49,68 -50,68 +50,84 @@@ import org.apache.thrift.transport.TTra
  import org.apache.thrift.transport.TTransportException;
  import org.slf4j.Logger;
  
++import com.google.common.net.HostAndPort;
++
  public interface TServerClient<C extends TServiceClient> {
  
 -  Pair<String,C> getTabletServerConnection(ClientContext context, boolean 
preferCachedConnections)
 +  Pair<String,C> getThriftServerConnection(ClientContext context, boolean 
preferCachedConnections)
        throws TTransportException;
  
 -  default Pair<String,C> getTabletServerConnection(Logger LOG, 
ThriftClientTypes<C> type,
 -      ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned)
 -      throws TTransportException {
 +  default Pair<String,C> getThriftServerConnection(Logger LOG, 
ThriftClientTypes<C> type,
 +      ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned,
 +      ThriftService service) throws TTransportException {
      checkArgument(context != null, "context is null");
-     long rpcTimeout = context.getClientTimeoutInMillis();
-     // create list of servers
-     ArrayList<ThriftTransportKey> servers = new ArrayList<>();
  
-     // add tservers
-     List<String> serverPaths = new ArrayList<>();
-     serverPaths.add(context.getZooKeeperRoot() + Constants.ZTSERVERS);
+     if (preferCachedConnections) {
+       Pair<String,TTransport> cachedTransport =
+           context.getTransportPool().getAnyCachedTransport(type);
+       if (cachedTransport != null) {
+         C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+         warned.set(false);
+         return new Pair<String,C>(cachedTransport.getFirst(), client);
+       }
+     }
+ 
+     final long rpcTimeout = context.getClientTimeoutInMillis();
++    final String tserverZooPath = context.getZooKeeperRoot() + 
Constants.ZTSERVERS;
++    final String sserverZooPath = context.getZooKeeperRoot() + 
Constants.ZSSERVERS;
++    final String compactorZooPath = context.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
+     final ZooCache zc = context.getZooCache();
 -    final List<String> tservers = new ArrayList<>();
+ 
 -    tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS));
++    final List<String> serverPaths = new ArrayList<>();
++    zc.getChildren(tserverZooPath).forEach(tserverAddress -> {
++      serverPaths.add(tserverZooPath + "/" + tserverAddress);
++    });
 +    if (type == ThriftClientTypes.CLIENT) {
-       serverPaths.add(context.getZooKeeperRoot() + Constants.ZCOMPACTORS);
-       serverPaths.add(context.getZooKeeperRoot() + Constants.ZSSERVERS);
-       Collections.shuffle(serverPaths, RANDOM.get());
++      zc.getChildren(sserverZooPath).forEach(sserverAddress -> {
++        serverPaths.add(sserverZooPath + "/" + sserverAddress);
++      });
++      zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
++        zc.getChildren(compactorZooPath + "/" + 
compactorGroup).forEach(compactorAddress -> {
++          serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + 
compactorAddress);
++        });
++      });
 +    }
-     ZooCache zc = context.getZooCache();
-     for (String serverPath : serverPaths) {
-       if (serverPath.endsWith(Constants.ZCOMPACTORS)) {
-         // Compactor path has another subdirectory, the group / queue name
-         for (String groupName : zc.getChildren(serverPath)) {
-           for (String server : zc.getChildren(serverPath + "/" + groupName)) {
-             var zLocPath = ServiceLock.path(serverPath + "/" + groupName + 
"/" + server);
-             zc.getLockData(zLocPath).map(sld -> sld.getAddress(service))
-                 .map(address -> new ThriftTransportKey(address, rpcTimeout, 
context))
-                 .ifPresent(servers::add);
-           }
-         }
-       } else {
-         for (String server : zc.getChildren(serverPath)) {
-           var zLocPath = ServiceLock.path(serverPath + "/" + server);
-           zc.getLockData(zLocPath).map(sld -> sld.getAddress(service))
-               .map(address -> new ThriftTransportKey(address, rpcTimeout, 
context))
-               .ifPresent(servers::add);
-         }
+ 
 -    if (tservers.isEmpty()) {
++    if (serverPaths.isEmpty()) {
+       if (warned.compareAndSet(false, true)) {
 -        LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
++        LOG.warn(
++            "There are no servers serving the {} api: check that zookeeper 
and accumulo are running.",
++            type);
        }
+       throw new TTransportException("There are no servers for type: " + type);
      }
++    Collections.shuffle(serverPaths, RANDOM.get());
  
-     boolean opened = false;
-     try {
-       Pair<String,TTransport> pair =
-           context.getTransportPool().getAnyTransport(servers, 
preferCachedConnections);
-       C client = ThriftUtil.createClient(type, pair.getSecond());
-       opened = true;
-       warned.set(false);
-       return new Pair<>(pair.getFirst(), client);
-     } finally {
-       if (!opened) {
-         if (warned.compareAndSet(false, true)) {
-           if (servers.isEmpty()) {
-             LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
-           } else {
-             LOG.warn("Failed to find an available server in the list of 
servers: {}", servers);
 -    // Try to connect to an online tserver
 -    Collections.shuffle(tservers);
 -    for (String tserver : tservers) {
 -      var zLocPath =
 -          ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + 
"/" + tserver);
 -      byte[] data = zc.getLockData(zLocPath);
 -      if (data != null) {
 -        String strData = new String(data, UTF_8);
 -        if (!strData.equals("manager")) {
 -          final HostAndPort tserverClientAddress =
 -              new ServerServices(strData).getAddress(Service.TSERV_CLIENT);
++    for (String serverPath : serverPaths) {
++      var zLocPath = ServiceLock.path(serverPath);
++      Optional<ServiceLockData> data = zc.getLockData(zLocPath);
++      if (data != null && data.isPresent()) {
++        HostAndPort tserverClientAddress = 
data.orElseThrow().getAddress(service);
++        if (tserverClientAddress != null) {
+           try {
+             TTransport transport = 
context.getTransportPool().getTransport(type,
+                 tserverClientAddress, rpcTimeout, context, 
preferCachedConnections);
+             C client = ThriftUtil.createClient(type, transport);
+             warned.set(false);
+             return new Pair<String,C>(tserverClientAddress.toString(), 
client);
+           } catch (TTransportException e) {
+             LOG.trace("Error creating transport to {}", tserverClientAddress);
+             continue;
            }
          }
        }
      }
++
+     if (warned.compareAndSet(false, true)) {
+       LOG.warn("Failed to find an available server in the list of servers: {} 
for API type: {}",
 -          tservers, type);
++          serverPaths, type);
+     }
+     throw new TTransportException("Failed to connect to any server for API 
type " + type);
    }
  
    default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec)
diff --cc 
core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
index ee2d9b12c7,bc9e7cb0de..44fd571489
--- 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java
@@@ -35,6 -35,8 +35,7 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.conf.ClientProperty;
  import org.apache.accumulo.core.rpc.SaslConnectionParams;
  import org.apache.accumulo.core.rpc.SslConnectionParams;
+ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
  import org.apache.hadoop.security.UserGroupInformation;
diff --cc test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
index 2a17353fd9,9c3474ffcc..5396db64da
--- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@@ -32,6 -32,9 +32,8 @@@ import org.apache.accumulo.core.clientI
  import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
  import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
  import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import org.apache.accumulo.core.util.HostAndPort;
+ import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
  import org.apache.thrift.transport.TTransport;
  import org.apache.thrift.transport.TTransportException;

Reply via email to