This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f9636b268 Modified Compactor and ScanServer to advertise Client 
service (#3928) (#3951)
8f9636b268 is described below

commit 8f9636b268acf40a36bf462f3f1a3578426fb840
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Nov 14 10:39:55 2023 -0500

    Modified Compactor and ScanServer to advertise Client service (#3928) 
(#3951)
    
    Modified Compactor and ScanServer to advertise Client service and
    modified client code to use TabletServer, ScanServer, and Compactor
    addresses when trying to execute an operation on the Client service
    API.
    
    Fixes #3624
    
    (cherry picked from commit 65f0204884f04e339699d25426e499c046b127d9)
---
 .../core/clientImpl/TableOperationsImpl.java       |  2 +-
 .../rpc/clients/ClientServiceThriftClient.java     |  7 +++--
 .../accumulo/core/rpc/clients/TServerClient.java   | 35 ++++++++++++++--------
 .../TabletManagementClientServiceThriftClient.java |  1 -
 .../core/rpc/clients/TabletServerThriftClient.java |  7 +++--
 .../accumulo/server/rpc/ThriftProcessorTypes.java  | 12 +++++---
 .../org/apache/accumulo/compactor/Compactor.java   | 18 +++++++++--
 .../org/apache/accumulo/tserver/ScanServer.java    | 19 ++++++++++--
 8 files changed, 71 insertions(+), 30 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 6fa2470401..e9e784b23e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -1503,7 +1503,7 @@ public class TableOperationsImpl extends 
TableOperationsHelper {
         // this operation may us a lot of memory... its likely that 
connections to tabletservers
         // hosting metadata tablets will be cached, so do not use cached
         // connections
-        pair = ThriftClientTypes.CLIENT.getTabletServerConnection(context, 
false);
+        pair = ThriftClientTypes.CLIENT.getThriftServerConnection(context, 
false);
         diskUsages = pair.getSecond().getDiskUsage(tableNames, 
context.rpcCreds());
       } catch (ThriftTableOperationException e) {
         switch (e.getType()) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java
 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java
index 5a6a912647..c1dc9954da 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
+import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -40,10 +41,10 @@ public class ClientServiceThriftClient extends 
ThriftClientTypes<Client>
   }
 
   @Override
-  public Pair<String,Client> getTabletServerConnection(ClientContext context,
+  public Pair<String,Client> getThriftServerConnection(ClientContext context,
       boolean preferCachedConnections) throws TTransportException {
-    return getTabletServerConnection(LOG, this, context, 
preferCachedConnections,
-        warnedAboutTServersBeingDown);
+    return getThriftServerConnection(LOG, this, context, 
preferCachedConnections,
+        warnedAboutTServersBeingDown, ThriftService.CLIENT);
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index c60c91c5d6..121bde735b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -21,8 +21,11 @@ package org.apache.accumulo.core.rpc.clients;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.Constants;
@@ -48,25 +51,33 @@ import org.slf4j.Logger;
 
 public interface TServerClient<C extends TServiceClient> {
 
-  Pair<String,C> getTabletServerConnection(ClientContext context, boolean 
preferCachedConnections)
+  Pair<String,C> getThriftServerConnection(ClientContext context, boolean 
preferCachedConnections)
       throws TTransportException;
 
-  default Pair<String,C> getTabletServerConnection(Logger LOG, 
ThriftClientTypes<C> type,
-      ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned)
-      throws TTransportException {
+  default Pair<String,C> getThriftServerConnection(Logger LOG, 
ThriftClientTypes<C> type,
+      ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned,
+      ThriftService service) throws TTransportException {
     checkArgument(context != null, "context is null");
     long rpcTimeout = context.getClientTimeoutInMillis();
     // create list of servers
     ArrayList<ThriftTransportKey> servers = new ArrayList<>();
 
     // add tservers
+    List<String> serverPaths = new ArrayList<>();
+    serverPaths.add(context.getZooKeeperRoot() + Constants.ZTSERVERS);
+    if (type == ThriftClientTypes.CLIENT) {
+      serverPaths.add(context.getZooKeeperRoot() + Constants.ZCOMPACTORS);
+      serverPaths.add(context.getZooKeeperRoot() + Constants.ZSSERVERS);
+      Collections.shuffle(serverPaths, RANDOM.get());
+    }
     ZooCache zc = context.getZooCache();
-    for (String tserver : zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS)) {
-      var zLocPath =
-          ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + 
"/" + tserver);
-      zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.TSERV))
-          .map(address -> new ThriftTransportKey(address, rpcTimeout, context))
-          .ifPresent(servers::add);
+    for (String serverPath : serverPaths) {
+      for (String server : zc.getChildren(serverPath)) {
+        var zLocPath = ServiceLock.path(serverPath + "/" + server);
+        zc.getLockData(zLocPath).map(sld -> sld.getAddress(service))
+            .map(address -> new ThriftTransportKey(address, rpcTimeout, 
context))
+            .ifPresent(servers::add);
+      }
     }
 
     boolean opened = false;
@@ -96,7 +107,7 @@ public interface TServerClient<C extends TServiceClient> {
       String server = null;
       C client = null;
       try {
-        Pair<String,C> pair = getTabletServerConnection(context, true);
+        Pair<String,C> pair = getThriftServerConnection(context, true);
         server = pair.getFirst();
         client = pair.getSecond();
         return exec.execute(client);
@@ -123,7 +134,7 @@ public interface TServerClient<C extends TServiceClient> {
       String server = null;
       C client = null;
       try {
-        Pair<String,C> pair = getTabletServerConnection(context, true);
+        Pair<String,C> pair = getThriftServerConnection(context, true);
         server = pair.getFirst();
         client = pair.getSecond();
         exec.execute(client);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java
 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java
index a5464ea14a..d19a2a45e5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java
@@ -29,5 +29,4 @@ public class TabletManagementClientServiceThriftClient 
extends ThriftClientTypes
   public TabletManagementClientServiceThriftClient(String serviceName) {
     super(serviceName, new Client.Factory());
   }
-
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java
 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java
index f672e14610..3f2f71bc7a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 import 
org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.thrift.transport.TTransportException;
@@ -44,10 +45,10 @@ public class TabletServerThriftClient extends 
ThriftClientTypes<Client>
   }
 
   @Override
-  public Pair<String,Client> getTabletServerConnection(ClientContext context,
+  public Pair<String,Client> getThriftServerConnection(ClientContext context,
       boolean preferCachedConnections) throws TTransportException {
-    return getTabletServerConnection(LOG, this, context, 
preferCachedConnections,
-        warnedAboutTServersBeingDown);
+    return getThriftServerConnection(LOG, this, context, 
preferCachedConnections,
+        warnedAboutTServersBeingDown, ThriftService.TSERV);
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
index 3913fce3f2..7e497c3145 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
@@ -95,9 +95,11 @@ public class ThriftProcessorTypes<C extends TServiceClient> 
extends ThriftClient
   public static final 
ThriftProcessorTypes<TabletManagementClientService.Client> TABLET_MGMT =
       new ThriftProcessorTypes<>(ThriftClientTypes.TABLET_MGMT);
 
-  public static TMultiplexedProcessor 
getCompactorTProcessor(CompactorService.Iface serviceHandler,
-      ServerContext context) {
+  public static TMultiplexedProcessor 
getCompactorTProcessor(ClientServiceHandler clientHandler,
+      CompactorService.Iface serviceHandler, ServerContext context) {
     TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+    muxProcessor.registerProcessor(CLIENT.getServiceName(), 
CLIENT.getTProcessor(
+        ClientService.Processor.class, ClientService.Iface.class, 
clientHandler, context));
     muxProcessor.registerProcessor(COMPACTOR.getServiceName(), 
COMPACTOR.getTProcessor(
         CompactorService.Processor.class, CompactorService.Iface.class, 
serviceHandler, context));
     return muxProcessor;
@@ -131,9 +133,11 @@ public class ThriftProcessorTypes<C extends 
TServiceClient> extends ThriftClient
     return muxProcessor;
   }
 
-  public static TMultiplexedProcessor
-      getScanServerTProcessor(TabletScanClientService.Iface tserverHandler, 
ServerContext context) {
+  public static TMultiplexedProcessor 
getScanServerTProcessor(ClientServiceHandler clientHandler,
+      TabletScanClientService.Iface tserverHandler, ServerContext context) {
     TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+    muxProcessor.registerProcessor(CLIENT.getServiceName(), 
CLIENT.getTProcessor(
+        ClientService.Processor.class, ClientService.Iface.class, 
clientHandler, context));
     muxProcessor.registerProcessor(TABLET_SCAN.getServiceName(),
         TABLET_SCAN.getTProcessor(TabletScanClientService.Processor.class,
             TabletScanClientService.Iface.class, tserverHandler, context));
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index df295cb912..fc7d6a0234 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -69,6 +69,8 @@ import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
+import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -92,6 +94,7 @@ import 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.CompactionWatcher;
 import org.apache.accumulo.server.compaction.FileCompactor;
@@ -104,6 +107,7 @@ import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
@@ -278,8 +282,13 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
       for (int i = 0; i < 25; i++) {
         zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
 
-        if (compactorLock.tryLock(lw,
-            new ServiceLockData(compactorId, hostPort, 
ThriftService.COMPACTOR, this.queueName))) {
+        ServiceDescriptors descriptors = new ServiceDescriptors();
+        for (ThriftService svc : new ThriftService[] {ThriftService.CLIENT,
+            ThriftService.COMPACTOR}) {
+          descriptors.addService(new ServiceDescriptor(compactorId, svc, 
hostPort, this.queueName));
+        }
+
+        if (compactorLock.tryLock(lw, new ServiceLockData(descriptors))) {
           LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath());
           return;
         }
@@ -302,7 +311,10 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
    * @throws UnknownHostException host unknown
    */
   protected ServerAddress startCompactorClientService() throws 
UnknownHostException {
-    var processor = ThriftProcessorTypes.getCompactorTProcessor(this, 
getContext());
+
+    ClientServiceHandler clientHandler =
+        new ClientServiceHandler(getContext(), new 
TransactionWatcher(getContext()));
+    var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, 
this, getContext());
     Property maxMessageSizeProperty =
         (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null
             ? Property.COMPACTOR_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 102267aaf7..8868065aa4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -70,6 +70,8 @@ import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
+import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -89,6 +91,7 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -96,6 +99,7 @@ import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import 
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.session.MultiScanSession;
@@ -260,7 +264,10 @@ public class ScanServer extends AbstractServer
 
     // This class implements TabletClientService.Iface and then delegates 
calls. Be sure
     // to set up the ThriftProcessor using this class, not the delegate.
-    TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, 
getContext());
+    ClientServiceHandler clientHandler =
+        new ClientServiceHandler(context, new TransactionWatcher(context));
+    TProcessor processor =
+        ThriftProcessorTypes.getScanServerTProcessor(clientHandler, this, 
getContext());
 
     Property maxMessageSizeProperty =
         (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
@@ -326,8 +333,14 @@ public class ScanServer extends AbstractServer
       for (int i = 0; i < 120 / 5; i++) {
         zoo.putPersistentData(zLockPath.toString(), new byte[0], 
NodeExistsPolicy.SKIP);
 
-        if (scanServerLock.tryLock(lw, new ServiceLockData(serverLockUUID, 
getClientAddressString(),
-            ThriftService.TABLET_SCAN, this.groupName))) {
+        ServiceDescriptors descriptors = new ServiceDescriptors();
+        for (ThriftService svc : new ThriftService[] {ThriftService.CLIENT,
+            ThriftService.TABLET_SCAN}) {
+          descriptors.addService(
+              new ServiceDescriptor(serverLockUUID, svc, 
getClientAddressString(), this.groupName));
+        }
+
+        if (scanServerLock.tryLock(lw, new ServiceLockData(descriptors))) {
           LOG.debug("Obtained scan server lock {}", 
scanServerLock.getLockPath());
           return scanServerLock;
         }

Reply via email to