This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new cea2a3312f Don't advertise Manager address until upgrade done and 
fully started (#5419)
cea2a3312f is described below

commit cea2a3312f868864c75261e399b318445143f8d1
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Mar 21 08:06:52 2025 -0400

    Don't advertise Manager address until upgrade done and fully started (#5419)
    
    Removed HighlyAvailableService and associated code, which was only
    really used by the Manager, and instead modified the Manager to
    only advertise the Manager address with the Thrift services after
    the upgrade process has completed and the Manager is fully started.
    
    This change removes the HighlyAvailableServiceInvocationHandler
    which was throwing an exception when the Manager was not ready
    to accept Thrift RPC calls and causing stack traces in the Manager
    log during the upgrade process. These stack traces could be
    construed as a problem to the user, when in reality they were
    benign.
    
    Also modified TServerUtils to only create the Thrift server, but not
    start it. The ThriftServer is set on the ServerAddress object that is
    return by the TServerUtils methods. Added a method to ServerAddress
    to start the Thrift server, which for most server processes is called
    immediately after creating it. However, for the Manager, the call
    to start the Thrift server is delayed until the Manager is up.
    
    Closes #5411
---
 .../accumulo/server/HighlyAvailableService.java    | 51 ----------------
 .../HighlyAvailableServiceInvocationHandler.java   | 71 ----------------------
 .../server/rpc/HighlyAvailableServiceWrapper.java  | 54 ----------------
 .../apache/accumulo/server/rpc/ServerAddress.java  | 22 +++++++
 .../apache/accumulo/server/rpc/TServerUtils.java   | 62 +++++++------------
 .../accumulo/server/rpc/TServerUtilsTest.java      |  9 +--
 .../org/apache/accumulo/compactor/Compactor.java   |  5 +-
 .../apache/accumulo/gc/SimpleGarbageCollector.java | 13 ++--
 .../java/org/apache/accumulo/manager/Manager.java  | 55 +++++++----------
 .../java/org/apache/accumulo/monitor/Monitor.java  | 11 +---
 .../org/apache/accumulo/tserver/ScanServer.java    |  8 +--
 .../org/apache/accumulo/tserver/TabletServer.java  |  9 +--
 .../accumulo/test/functional/ZombieTServer.java    |  6 +-
 .../accumulo/test/performance/NullTserver.java     | 11 ++--
 14 files changed, 101 insertions(+), 286 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
 
b/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
deleted file mode 100644
index 4d529369a0..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server;
-
-/**
- * This interface allows service implementations which support running 
multiple instances
- * concurrently with only one active instance to report whether or not they 
are the active service.
- */
-public interface HighlyAvailableService {
-
-  /**
-   * Is this service instance currently the active instance for the Accumulo 
cluster.
-   *
-   * @return True if the service is the active service, false otherwise.
-   */
-  boolean isActiveService();
-
-  /**
-   * Is this service instance currently in the process of upgrading.
-   *
-   * @return True if the service is upgrading, false otherwise.
-   */
-  default boolean isUpgrading() {
-    return false;
-  }
-
-  /**
-   * Get the name of the service
-   *
-   * @return service name
-   */
-  default String getServiceName() {
-    return this.getClass().getSimpleName();
-  }
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
deleted file mode 100644
index a0e8a9453a..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.rpc;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Objects;
-
-import 
org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
-import org.apache.accumulo.server.HighlyAvailableService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An {@link InvocationHandler} which checks to see if a {@link 
HighlyAvailableService} is the
- * current active instance of that service, throwing {@link 
ThriftNotActiveServiceException} when it
- * is not the current active instance.
- */
-public class HighlyAvailableServiceInvocationHandler<I> implements 
InvocationHandler {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(HighlyAvailableServiceInvocationHandler.class);
-
-  private final I instance;
-  private final HighlyAvailableService service;
-
-  public HighlyAvailableServiceInvocationHandler(I instance, 
HighlyAvailableService service) {
-    this.instance = Objects.requireNonNull(instance);
-    this.service = Objects.requireNonNull(service);
-  }
-
-  @Override
-  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
-
-    // If the service is upgrading, throw an exception
-    if (service.isUpgrading()) {
-      LOG.trace("Service can not be accessed while it is upgrading.");
-      throw new ThriftNotActiveServiceException(service.getServiceName(),
-          "Service can not be accessed while it is upgrading");
-    }
-
-    // If the service is not active, throw an exception
-    if (!service.isActiveService()) {
-      LOG.trace("Denying access to RPC service as this instance is not the 
active instance.");
-      throw new ThriftNotActiveServiceException(service.getServiceName(),
-          "Denying access to RPC service as this instance is not the active 
instance");
-    }
-    try {
-      // Otherwise, call the real method
-      return method.invoke(instance, args);
-    } catch (InvocationTargetException ex) {
-      throw ex.getCause();
-    }
-  }
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
deleted file mode 100644
index 307b2dfa36..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.rpc;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
-
-import org.apache.accumulo.core.util.ClassUtil;
-import org.apache.accumulo.server.HighlyAvailableService;
-
-/**
- * A class to wrap invocations to the Thrift handler to prevent these 
invocations from succeeding
- * when the Accumulo service that this Thrift service is for has not yet 
obtained its ZooKeeper
- * lock.
- *
- * @since 2.0
- */
-public class HighlyAvailableServiceWrapper {
-
-  private static final HighlyAvailableServiceWrapper INSTANCE = new 
HighlyAvailableServiceWrapper();
-
-  // Not for public use.
-  private HighlyAvailableServiceWrapper() {}
-
-  public static <I> I service(final I instance, HighlyAvailableService 
service) {
-    InvocationHandler handler = INSTANCE.getInvocationHandler(instance, 
service);
-
-    @SuppressWarnings("unchecked")
-    I proxiedInstance = (I) 
Proxy.newProxyInstance(instance.getClass().getClassLoader(),
-        ClassUtil.getInterfaces(instance.getClass()).toArray(new Class<?>[0]), 
handler);
-    return proxiedInstance;
-  }
-
-  protected <T> HighlyAvailableServiceInvocationHandler<T> 
getInvocationHandler(final T instance,
-      final HighlyAvailableService service) {
-    return new HighlyAvailableServiceInvocationHandler<>(instance, service);
-  }
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
index 0e033b9056..3924bea115 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
@@ -18,8 +18,12 @@
  */
 package org.apache.accumulo.server.rpc;
 
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.thrift.server.TServer;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
 /**
@@ -41,4 +45,22 @@ public class ServerAddress {
   public HostAndPort getAddress() {
     return address;
   }
+
+  public void startThriftServer(String threadName) {
+    Threads.createThread(threadName, () -> {
+      try {
+        server.serve();
+      } catch (Error e) {
+        Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 
1);
+      }
+    }).start();
+
+    while (!server.isServing()) {
+      // Wait for the thread to start and for the TServer to start
+      // serving events
+      UtilWaitThread.sleep(10);
+      Preconditions.checkState(!server.getShouldStop());
+    }
+
+  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index a1318c4beb..fd37e171a0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -47,18 +47,14 @@ import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
-import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
@@ -73,7 +69,6 @@ import org.apache.thrift.transport.TTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 import com.google.common.primitives.Ints;
 
@@ -114,13 +109,14 @@ public class TServerUtils {
   }
 
   /**
-   * Start a server, at the given port, or higher, if that port is not 
available.
+   * Create a ServerAddress, at the given port, or higher, if that port is not 
available. Callers
+   * must start the ThriftServer after calling this method using
+   * {@code ServerAddress#startThriftServer(String)}
    *
    * @param context RPC configuration
    * @param portHintProperty the port to attempt to open, can be zero, meaning 
"any available port"
    * @param processor the service to be started
    * @param serverName the name of the class that is providing the service
-   * @param threadName name this service's thread for better debugging
    * @param portSearchProperty A boolean Property to control if port-search 
should be used, or null
    *        to disable
    * @param minThreadProperty A Property to control the minimum number of 
threads in the pool
@@ -129,8 +125,8 @@ public class TServerUtils {
    * @return the server object created, and the port actually used
    * @throws UnknownHostException when we don't know our own address
    */
-  public static ServerAddress startServer(ServerContext context, String 
hostname,
-      Property portHintProperty, TProcessor processor, String serverName, 
String threadName,
+  public static ServerAddress createThriftServer(ServerContext context, String 
hostname,
+      Property portHintProperty, TProcessor processor, String serverName,
       Property portSearchProperty, Property minThreadProperty, Property 
threadTimeOutProperty,
       Property timeBetweenThreadChecksProperty) throws UnknownHostException {
     final AccumuloConfiguration config = context.getConfiguration();
@@ -174,8 +170,8 @@ public class TServerUtils {
 
     HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
     try {
-      return TServerUtils.startTServer(serverType, timedProcessor, serverName, 
threadName,
-          minThreads, threadTimeOut, config, timeBetweenThreadChecks, 
maxMessageSize,
+      return TServerUtils.createThriftServer(serverType, timedProcessor, 
serverName, minThreads,
+          threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
           context.getServerSslParams(), context.getSaslParams(), 
context.getClientTimeoutInMillis(),
           backlog, portSearch, addresses);
     } catch (TTransportException e) {
@@ -199,12 +195,12 @@ public class TServerUtils {
           }
           try {
             HostAndPort addr = HostAndPort.fromParts(hostname, port);
-            return TServerUtils.startTServer(serverType, timedProcessor, 
serverName, threadName,
+            return TServerUtils.createThriftServer(serverType, timedProcessor, 
serverName,
                 minThreads, threadTimeOut, config, timeBetweenThreadChecks, 
maxMessageSize,
                 context.getServerSslParams(), context.getSaslParams(),
                 context.getClientTimeoutInMillis(), backlog, portSearch, addr);
           } catch (TTransportException tte) {
-            log.info("Unable to use port {}, retrying. (Thread Name = {})", 
port, threadName);
+            log.info("Unable to use port {}, retrying.", port);
           }
         }
         log.error("Unable to start TServer", e);
@@ -562,9 +558,14 @@ public class TServerUtils {
     return new ServerAddress(server, address);
   }
 
-  public static ServerAddress startTServer(final AccumuloConfiguration conf,
-      ThriftServerType serverType, TProcessor processor, String serverName, 
String threadName,
-      int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long 
maxMessageSize,
+  /**
+   * Create a ServerAddress, at the given port, or higher, if that port is not 
available. Callers
+   * must start the ThriftServer after calling this method using
+   * {@code ServerAddress#startThriftServer(String)}
+   */
+  public static ServerAddress createThriftServer(final AccumuloConfiguration 
conf,
+      ThriftServerType serverType, TProcessor processor, String serverName, 
int numThreads,
+      long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize,
       SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
       long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean 
portSearch,
       HostAndPort... addresses) {
@@ -574,9 +575,9 @@ public class TServerUtils {
     }
 
     try {
-      return startTServer(serverType, new TimedProcessor(processor, 
metricsInfo), serverName,
-          threadName, numThreads, threadTimeOut, conf, 
timeBetweenThreadChecks, maxMessageSize,
-          sslParams, saslParams, serverSocketTimeout, backlog, portSearch, 
addresses);
+      return createThriftServer(serverType, new TimedProcessor(processor, 
metricsInfo), serverName,
+          numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize, sslParams,
+          saslParams, serverSocketTimeout, backlog, portSearch, addresses);
     } catch (TTransportException e) {
       throw new IllegalStateException(e);
     }
@@ -589,8 +590,8 @@ public class TServerUtils {
    * @return A ServerAddress encapsulating the Thrift server created and the 
host/port which it is
    *         bound to.
    */
-  private static ServerAddress startTServer(ThriftServerType serverType, 
TimedProcessor processor,
-      String serverName, String threadName, int numThreads, long threadTimeOut,
+  private static ServerAddress createThriftServer(ThriftServerType serverType,
+      TimedProcessor processor, String serverName, int numThreads, long 
threadTimeOut,
       final AccumuloConfiguration conf, long timeBetweenThreadChecks, long 
maxMessageSize,
       SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
       long serverSocketTimeout, int backlog, boolean portSearch, 
HostAndPort... addresses)
@@ -653,28 +654,11 @@ public class TServerUtils {
           "Unable to create server on addresses: " + 
Arrays.toString(addresses));
     }
 
-    final TServer finalServer = serverAddress.server;
-
-    Threads.createThread(threadName, () -> {
-      try {
-        finalServer.serve();
-      } catch (Error e) {
-        Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 
1);
-      }
-    }).start();
-
-    while (!finalServer.isServing()) {
-      // Wait for the thread to start and for the TServer to start
-      // serving events
-      UtilWaitThread.sleep(10);
-      Preconditions.checkState(!finalServer.getShouldStop());
-    }
-
     // check for the special "bind to everything address"
     if (serverAddress.address.getHost().equals("0.0.0.0")) {
       // can't get the address from the bind, so we'll do our best to invent 
our hostname
       try {
-        serverAddress = new ServerAddress(finalServer, HostAndPort
+        serverAddress = new ServerAddress(serverAddress.server, HostAndPort
             .fromParts(InetAddress.getLocalHost().getHostName(), 
serverAddress.address.getPort()));
       } catch (UnknownHostException e) {
         throw new TTransportException(e);
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
index 6bf9218e60..c4eed579b3 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
@@ -301,9 +301,10 @@ public class TServerUtilsTest {
     // misconfiguration)
     String hostname = "localhost";
 
-    return TServerUtils.startServer(context, hostname, 
Property.TSERV_CLIENTPORT, processor,
-        "TServerUtilsTest", "TServerUtilsTestThread", 
Property.TSERV_PORTSEARCH,
-        Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, 
Property.TSERV_THREADCHECK);
-
+    ServerAddress sa = TServerUtils.createThriftServer(context, hostname, 
Property.TSERV_CLIENTPORT,
+        processor, "TServerUtilsTest", Property.TSERV_PORTSEARCH, 
Property.TSERV_MINTHREADS,
+        Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK);
+    sa.startThriftServer("TServerUtilsTestThread");
+    return sa;
   }
 }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 72145ab32d..2bfc6b7276 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -322,10 +322,11 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     ClientServiceHandler clientHandler = new 
ClientServiceHandler(getContext());
     var processor = ThriftProcessorTypes.getCompactorTProcessor(this, 
clientHandler,
         getCompactorThriftHandlerInterface(), getContext());
-    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+    ServerAddress sp = TServerUtils.createThriftServer(getContext(), 
getHostname(),
         Property.COMPACTOR_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
-        "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, 
Property.COMPACTOR_MINTHREADS,
+        Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS,
         Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK);
+    sp.startThriftServer("Thrift Client Server");
     setHostname(sp.address);
     LOG.info("address = {}", sp.address);
     return sp;
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 2d7afe261d..46e4fe8c8d 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -429,12 +429,13 @@ public class SimpleGarbageCollector extends 
AbstractServer implements Iface {
     IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
     HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), 
port);
     long maxMessageSize = 
getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE);
-    ServerAddress server = TServerUtils.startTServer(getConfiguration(),
-        getContext().getThriftServerType(), processor, 
this.getClass().getSimpleName(),
-        "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 
maxMessageSize,
-        getContext().getServerSslParams(), getContext().getSaslParams(), 0,
-        getConfiguration().getCount(Property.RPC_BACKLOG), 
getContext().getMetricsInfo(), false,
-        addresses);
+    ServerAddress server =
+        TServerUtils.createThriftServer(getConfiguration(), 
getContext().getThriftServerType(),
+            processor, this.getClass().getSimpleName(), 2, 
ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
+            1000, maxMessageSize, getContext().getServerSslParams(), 
getContext().getSaslParams(),
+            0, getConfiguration().getCount(Property.RPC_BACKLOG), 
getContext().getMetricsInfo(),
+            false, addresses);
+    server.startThriftServer("GC Monitor Service");
     setHostname(server.address);
     log.debug("Starting garbage collector listening on " + server.address);
     return server.address;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index edeef321a6..5da833ee84 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -52,7 +52,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -97,7 +96,6 @@ import 
org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
 import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.manager.thrift.BulkImportState;
-import org.apache.accumulo.core.manager.thrift.ManagerClientService;
 import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
@@ -133,7 +131,6 @@ import org.apache.accumulo.manager.tableOps.TraceRepo;
 import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
 import org.apache.accumulo.manager.upgrade.UpgradeCoordinator.UpgradeStatus;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.HighlyAvailableService;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -145,7 +142,6 @@ import 
org.apache.accumulo.server.manager.state.DeadServerList;
 import org.apache.accumulo.server.manager.state.TabletServerState;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
 import org.apache.accumulo.server.manager.state.UnassignedTablet;
-import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
@@ -183,8 +179,7 @@ import io.opentelemetry.context.Scope;
  * <p>
  * The manager will also coordinate log recoveries and reports general status.
  */
-public class Manager extends AbstractServer
-    implements LiveTServerSet.Listener, TableObserver, HighlyAvailableService {
+public class Manager extends AbstractServer implements 
LiveTServerSet.Listener, TableObserver {
 
   static final Logger log = LoggerFactory.getLogger(Manager.class);
 
@@ -236,8 +231,6 @@ public class Manager extends AbstractServer
 
   final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
-  private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
-
   private final long timeToCacheRecoveryWalExistence;
   private ExecutorService tableInformationStatusPool = null;
   private ThreadPoolExecutor tabletRefreshThreadPool;
@@ -1146,26 +1139,24 @@ public class Manager extends AbstractServer
     managerClientHandler = new ManagerClientServiceHandler(this);
     compactionCoordinator = new CompactionCoordinator(context, security, 
fateRefs, this);
 
-    // Start the Manager's Client service
-    // Ensure that calls before the manager gets the lock fail
-    ManagerClientService.Iface haProxy =
-        HighlyAvailableServiceWrapper.service(managerClientHandler, this);
-
     ServerAddress sa;
     var processor = ThriftProcessorTypes.getManagerTProcessor(this, 
fateServiceHandler,
-        compactionCoordinator.getThriftService(), haProxy, getContext());
+        compactionCoordinator.getThriftService(), managerClientHandler, 
getContext());
 
     try {
-      sa = TServerUtils.startServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT, processor,
-          "Manager", "Manager Client Service Handler", null, 
Property.MANAGER_MINTHREADS,
+      sa = TServerUtils.createThriftServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT,
+          processor, "Manager", null, Property.MANAGER_MINTHREADS,
           Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK);
     } catch (UnknownHostException e) {
       throw new IllegalStateException("Unable to start server on host " + 
getHostname(), e);
     }
-    clientService = sa.server;
-    log.info("Started Manager client service at {}", sa.address);
 
-    // block until we can obtain the ZK lock for the manager
+    // block until we can obtain the ZK lock for the manager. Create the
+    // initial lock using ThriftService.NONE. This will allow the lock
+    // allocation to occur, but prevent any services from getting the
+    // Manager address for the COORDINATOR, FATE, and MANAGER services.
+    // The lock data is replaced below and the manager address is exposed
+    // for each of these services.
     ServiceLockData sld;
     try {
       sld = getManagerLock(context.getServerPaths().createManagerPath());
@@ -1402,7 +1393,16 @@ public class Manager extends AbstractServer
       log.info("AuthenticationTokenSecretManager is initialized");
     }
 
-    UUID uuid = sld.getServerUUID(ThriftService.MANAGER);
+    // Now that the Manager is up, start the ThriftServer
+    sa.startThriftServer("Manager Client Service Handler");
+    clientService = sa.server;
+    log.info("Started Manager client service at {}", sa.address);
+
+    // Replace the ServiceLockData information in the Manager lock node in 
ZooKeeper.
+    // This advertises the address that clients can use to connect to the 
Manager
+    // for the Coordinator, Fate, and Manager services. Do **not** do this 
until
+    // after the upgrade process is finished and the dependent services are 
started.
+    UUID uuid = sld.getServerUUID(ThriftService.NONE);
     ServiceDescriptors descriptors = new ServiceDescriptors();
     for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, 
ThriftService.COORDINATOR,
         ThriftService.FATE}) {
@@ -1418,13 +1418,6 @@ public class Manager extends AbstractServer
       throw new IllegalStateException("Exception updating manager lock", e);
     }
 
-    while (!clientService.isServing()) {
-      sleepUninterruptibly(100, MILLISECONDS);
-    }
-
-    // The manager is fully initialized. Clients are allowed to connect now.
-    managerInitialized.set(true);
-
     while (!isShutdownRequested() && clientService.isServing()) {
       if (Thread.currentThread().isInterrupted()) {
         log.info("Server process thread has been interrupted, shutting down");
@@ -1607,7 +1600,7 @@ public class Manager extends AbstractServer
     UUID zooLockUUID = UUID.randomUUID();
 
     ServiceDescriptors descriptors = new ServiceDescriptors();
-    descriptors.addService(new ServiceDescriptor(zooLockUUID, 
ThriftService.MANAGER,
+    descriptors.addService(new ServiceDescriptor(zooLockUUID, 
ThriftService.NONE,
         managerClientAddress, this.getResourceGroup()));
     ServiceLockData sld = new ServiceLockData(descriptors);
     managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
@@ -1871,12 +1864,6 @@ public class Manager extends AbstractServer
     return timeKeeper.getTime();
   }
 
-  @Override
-  public boolean isActiveService() {
-    return managerInitialized.get();
-  }
-
-  @Override
   public boolean isUpgrading() {
     return upgradeCoordinator.getStatus() != 
UpgradeCoordinator.UpgradeStatus.COMPLETE;
   }
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index d0ec4b13dc..51ea5edc4c 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -82,7 +82,6 @@ import 
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionI
 import 
org.apache.accumulo.monitor.rest.compactions.external.RunningCompactions;
 import 
org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorDetails;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.HighlyAvailableService;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.util.TableInfoUtil;
 import org.apache.thrift.transport.TTransportException;
@@ -109,7 +108,7 @@ import com.google.common.net.HostAndPort;
 /**
  * Serve manager statistics with an embedded web server.
  */
-public class Monitor extends AbstractServer implements HighlyAvailableService, 
Connection.Listener {
+public class Monitor extends AbstractServer implements Connection.Listener {
 
   private static final Logger log = LoggerFactory.getLogger(Monitor.class);
   private static final int REFRESH_TIME = 5;
@@ -140,7 +139,6 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService, C
   private long totalHoldTime = 0;
   private long totalLookups = 0;
   private int totalTables = 0;
-  private final AtomicBoolean monitorInitialized = new AtomicBoolean(false);
 
   private EventCounter lookupRateTracker = new EventCounter();
   private EventCounter indexCacheHitTracker = new EventCounter();
@@ -430,8 +428,6 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService, C
     }).start();
     Threads.createThread("Metric Fetcher Thread", fetcher).start();
 
-    monitorInitialized.set(true);
-
     while (!isShutdownRequested()) {
       if (Thread.currentThread().isInterrupted()) {
         log.info("Server process thread has been interrupted, shutting down");
@@ -825,11 +821,6 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService, C
     return lookupRateTracker.calculateRate();
   }
 
-  @Override
-  public boolean isActiveService() {
-    return monitorInitialized.get();
-  }
-
   public Optional<HostAndPort> getCoordinatorHost() {
     return coordinatorHost;
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 535d100dfe..595df77ba4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -304,11 +304,11 @@ public class ScanServer extends AbstractServer
     TProcessor processor =
         ThriftProcessorTypes.getScanServerTProcessor(this, clientHandler, 
this, getContext());
 
-    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+    ServerAddress sp = TServerUtils.createThriftServer(getContext(), 
getHostname(),
         Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
-        "Thrift Client Server", Property.SSERV_PORTSEARCH, 
Property.SSERV_MINTHREADS,
-        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK);
-
+        Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, 
Property.SSERV_MINTHREADS_TIMEOUT,
+        Property.SSERV_THREADCHECK);
+    sp.startThriftServer("Thrift Client Server");
     setHostname(sp.address);
     LOG.info("address = {}", sp.address);
     return sp;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 5a71c6cad2..93a53b83d3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -413,10 +413,11 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
 
   private HostAndPort startServer(String address, TProcessor processor)
       throws UnknownHostException {
-    ServerAddress sp = TServerUtils.startServer(getContext(), address, 
Property.TSERV_CLIENTPORT,
-        processor, this.getClass().getSimpleName(), "Thrift Client Server",
-        Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, 
Property.TSERV_MINTHREADS_TIMEOUT,
-        Property.TSERV_THREADCHECK);
+    ServerAddress sp =
+        TServerUtils.createThriftServer(getContext(), address, 
Property.TSERV_CLIENTPORT, processor,
+            this.getClass().getSimpleName(), Property.TSERV_PORTSEARCH, 
Property.TSERV_MINTHREADS,
+            Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK);
+    sp.startThriftServer("Thrift Client Server");
     this.server = sp.server;
     return sp.address;
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 2155fc93e0..71076c8764 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -123,12 +123,12 @@ public class ZombieTServer {
         
ThriftProcessorTypes.TABLET_SCAN.getTProcessor(TabletScanClientService.Processor.class,
             TabletScanClientService.Iface.class, tch, context));
 
-    ServerAddress serverPort = 
TServerUtils.startTServer(context.getConfiguration(),
-        ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking 
dead", 2,
+    ServerAddress serverPort = 
TServerUtils.createThriftServer(context.getConfiguration(),
+        ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", 2,
         ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, 
null, -1,
         context.getConfiguration().getCount(Property.RPC_BACKLOG), 
context.getMetricsInfo(), false,
         HostAndPort.fromParts("0.0.0.0", port));
-
+    serverPort.startThriftServer("walking dead");
     String addressString = serverPort.address.toString();
 
     var zLockPath = context.getServerPaths()
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java 
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index 6335a8dd62..7a09fc5e85 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -79,6 +79,7 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
+import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.rpc.ThriftServerType;
@@ -313,10 +314,12 @@ public class NullTserver {
             TabletManagementClientService.Processor.class,
             TabletManagementClientService.Iface.class, tch, context));
 
-    TServerUtils.startTServer(context.getConfiguration(), 
ThriftServerType.CUSTOM_HS_HA,
-        muxProcessor, "NullTServer", "null tserver", 2, 
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
-        10 * 1024 * 1024, null, null, -1, 
context.getConfiguration().getCount(Property.RPC_BACKLOG),
-        context.getMetricsInfo(), false, HostAndPort.fromParts("0.0.0.0", 
opts.port));
+    ServerAddress sa = 
TServerUtils.createThriftServer(context.getConfiguration(),
+        ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", 2,
+        ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, 
null, -1,
+        context.getConfiguration().getCount(Property.RPC_BACKLOG), 
context.getMetricsInfo(), false,
+        HostAndPort.fromParts("0.0.0.0", opts.port));
+    sa.startThriftServer("null tserver");
 
     AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() {
 


Reply via email to