This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new cea2a3312f Don't advertise Manager address until upgrade done and fully started (#5419) cea2a3312f is described below commit cea2a3312f868864c75261e399b318445143f8d1 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Mar 21 08:06:52 2025 -0400 Don't advertise Manager address until upgrade done and fully started (#5419) Removed HighlyAvailableService and associated code, which was only really used by the Manager, and instead modified the Manager to only advertise the Manager address with the Thrift services after the upgrade process has completed and the Manager is fully started. This change removes the HighlyAvailableServiceInvocationHandler which was throwing an exception when the Manager was not ready to accept Thrift RPC calls and causing stack traces in the Manager log during the upgrade process. These stack traces could be construed as a problem to the user, when in reality they were benign. Also modified TServerUtils to only create the Thrift server, but not start it. The ThriftServer is set on the ServerAddress object that is return by the TServerUtils methods. Added a method to ServerAddress to start the Thrift server, which for most server processes is called immediately after creating it. However, for the Manager, the call to start the Thrift server is delayed until the Manager is up. Closes #5411 --- .../accumulo/server/HighlyAvailableService.java | 51 ---------------- .../HighlyAvailableServiceInvocationHandler.java | 71 ---------------------- .../server/rpc/HighlyAvailableServiceWrapper.java | 54 ---------------- .../apache/accumulo/server/rpc/ServerAddress.java | 22 +++++++ .../apache/accumulo/server/rpc/TServerUtils.java | 62 +++++++------------ .../accumulo/server/rpc/TServerUtilsTest.java | 9 +-- .../org/apache/accumulo/compactor/Compactor.java | 5 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 13 ++-- .../java/org/apache/accumulo/manager/Manager.java | 55 +++++++---------- .../java/org/apache/accumulo/monitor/Monitor.java | 11 +--- .../org/apache/accumulo/tserver/ScanServer.java | 8 +-- .../org/apache/accumulo/tserver/TabletServer.java | 9 +-- .../accumulo/test/functional/ZombieTServer.java | 6 +- .../accumulo/test/performance/NullTserver.java | 11 ++-- 14 files changed, 101 insertions(+), 286 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java b/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java deleted file mode 100644 index 4d529369a0..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server; - -/** - * This interface allows service implementations which support running multiple instances - * concurrently with only one active instance to report whether or not they are the active service. - */ -public interface HighlyAvailableService { - - /** - * Is this service instance currently the active instance for the Accumulo cluster. - * - * @return True if the service is the active service, false otherwise. - */ - boolean isActiveService(); - - /** - * Is this service instance currently in the process of upgrading. - * - * @return True if the service is upgrading, false otherwise. - */ - default boolean isUpgrading() { - return false; - } - - /** - * Get the name of the service - * - * @return service name - */ - default String getServiceName() { - return this.getClass().getSimpleName(); - } -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java deleted file mode 100644 index a0e8a9453a..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.rpc; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Objects; - -import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException; -import org.apache.accumulo.server.HighlyAvailableService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An {@link InvocationHandler} which checks to see if a {@link HighlyAvailableService} is the - * current active instance of that service, throwing {@link ThriftNotActiveServiceException} when it - * is not the current active instance. - */ -public class HighlyAvailableServiceInvocationHandler<I> implements InvocationHandler { - private static final Logger LOG = - LoggerFactory.getLogger(HighlyAvailableServiceInvocationHandler.class); - - private final I instance; - private final HighlyAvailableService service; - - public HighlyAvailableServiceInvocationHandler(I instance, HighlyAvailableService service) { - this.instance = Objects.requireNonNull(instance); - this.service = Objects.requireNonNull(service); - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - - // If the service is upgrading, throw an exception - if (service.isUpgrading()) { - LOG.trace("Service can not be accessed while it is upgrading."); - throw new ThriftNotActiveServiceException(service.getServiceName(), - "Service can not be accessed while it is upgrading"); - } - - // If the service is not active, throw an exception - if (!service.isActiveService()) { - LOG.trace("Denying access to RPC service as this instance is not the active instance."); - throw new ThriftNotActiveServiceException(service.getServiceName(), - "Denying access to RPC service as this instance is not the active instance"); - } - try { - // Otherwise, call the real method - return method.invoke(instance, args); - } catch (InvocationTargetException ex) { - throw ex.getCause(); - } - } -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java deleted file mode 100644 index 307b2dfa36..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.rpc; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Proxy; - -import org.apache.accumulo.core.util.ClassUtil; -import org.apache.accumulo.server.HighlyAvailableService; - -/** - * A class to wrap invocations to the Thrift handler to prevent these invocations from succeeding - * when the Accumulo service that this Thrift service is for has not yet obtained its ZooKeeper - * lock. - * - * @since 2.0 - */ -public class HighlyAvailableServiceWrapper { - - private static final HighlyAvailableServiceWrapper INSTANCE = new HighlyAvailableServiceWrapper(); - - // Not for public use. - private HighlyAvailableServiceWrapper() {} - - public static <I> I service(final I instance, HighlyAvailableService service) { - InvocationHandler handler = INSTANCE.getInvocationHandler(instance, service); - - @SuppressWarnings("unchecked") - I proxiedInstance = (I) Proxy.newProxyInstance(instance.getClass().getClassLoader(), - ClassUtil.getInterfaces(instance.getClass()).toArray(new Class<?>[0]), handler); - return proxiedInstance; - } - - protected <T> HighlyAvailableServiceInvocationHandler<T> getInvocationHandler(final T instance, - final HighlyAvailableService service) { - return new HighlyAvailableServiceInvocationHandler<>(instance, service); - } -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java index 0e033b9056..3924bea115 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java @@ -18,8 +18,12 @@ */ package org.apache.accumulo.server.rpc; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.server.TServer; +import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; /** @@ -41,4 +45,22 @@ public class ServerAddress { public HostAndPort getAddress() { return address; } + + public void startThriftServer(String threadName) { + Threads.createThread(threadName, () -> { + try { + server.serve(); + } catch (Error e) { + Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1); + } + }).start(); + + while (!server.isServing()) { + // Wait for the thread to start and for the TServer to start + // serving events + UtilWaitThread.sleep(10); + Preconditions.checkState(!server.getShouldStop()); + } + + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index a1318c4beb..fd37e171a0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -47,18 +47,14 @@ import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TNonblockingServerSocket; @@ -73,7 +69,6 @@ import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; import com.google.common.primitives.Ints; @@ -114,13 +109,14 @@ public class TServerUtils { } /** - * Start a server, at the given port, or higher, if that port is not available. + * Create a ServerAddress, at the given port, or higher, if that port is not available. Callers + * must start the ThriftServer after calling this method using + * {@code ServerAddress#startThriftServer(String)} * * @param context RPC configuration * @param portHintProperty the port to attempt to open, can be zero, meaning "any available port" * @param processor the service to be started * @param serverName the name of the class that is providing the service - * @param threadName name this service's thread for better debugging * @param portSearchProperty A boolean Property to control if port-search should be used, or null * to disable * @param minThreadProperty A Property to control the minimum number of threads in the pool @@ -129,8 +125,8 @@ public class TServerUtils { * @return the server object created, and the port actually used * @throws UnknownHostException when we don't know our own address */ - public static ServerAddress startServer(ServerContext context, String hostname, - Property portHintProperty, TProcessor processor, String serverName, String threadName, + public static ServerAddress createThriftServer(ServerContext context, String hostname, + Property portHintProperty, TProcessor processor, String serverName, Property portSearchProperty, Property minThreadProperty, Property threadTimeOutProperty, Property timeBetweenThreadChecksProperty) throws UnknownHostException { final AccumuloConfiguration config = context.getConfiguration(); @@ -174,8 +170,8 @@ public class TServerUtils { HostAndPort[] addresses = getHostAndPorts(hostname, portHint); try { - return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, - minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, + return TServerUtils.createThriftServer(serverType, timedProcessor, serverName, minThreads, + threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), backlog, portSearch, addresses); } catch (TTransportException e) { @@ -199,12 +195,12 @@ public class TServerUtils { } try { HostAndPort addr = HostAndPort.fromParts(hostname, port); - return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, + return TServerUtils.createThriftServer(serverType, timedProcessor, serverName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), backlog, portSearch, addr); } catch (TTransportException tte) { - log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName); + log.info("Unable to use port {}, retrying.", port); } } log.error("Unable to start TServer", e); @@ -562,9 +558,14 @@ public class TServerUtils { return new ServerAddress(server, address); } - public static ServerAddress startTServer(final AccumuloConfiguration conf, - ThriftServerType serverType, TProcessor processor, String serverName, String threadName, - int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize, + /** + * Create a ServerAddress, at the given port, or higher, if that port is not available. Callers + * must start the ThriftServer after calling this method using + * {@code ServerAddress#startThriftServer(String)} + */ + public static ServerAddress createThriftServer(final AccumuloConfiguration conf, + ThriftServerType serverType, TProcessor processor, String serverName, int numThreads, + long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean portSearch, HostAndPort... addresses) { @@ -574,9 +575,9 @@ public class TServerUtils { } try { - return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName, - threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, - sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses); + return createThriftServer(serverType, new TimedProcessor(processor, metricsInfo), serverName, + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams, + saslParams, serverSocketTimeout, backlog, portSearch, addresses); } catch (TTransportException e) { throw new IllegalStateException(e); } @@ -589,8 +590,8 @@ public class TServerUtils { * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is * bound to. */ - private static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor, - String serverName, String threadName, int numThreads, long threadTimeOut, + private static ServerAddress createThriftServer(ThriftServerType serverType, + TimedProcessor processor, String serverName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog, boolean portSearch, HostAndPort... addresses) @@ -653,28 +654,11 @@ public class TServerUtils { "Unable to create server on addresses: " + Arrays.toString(addresses)); } - final TServer finalServer = serverAddress.server; - - Threads.createThread(threadName, () -> { - try { - finalServer.serve(); - } catch (Error e) { - Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1); - } - }).start(); - - while (!finalServer.isServing()) { - // Wait for the thread to start and for the TServer to start - // serving events - UtilWaitThread.sleep(10); - Preconditions.checkState(!finalServer.getShouldStop()); - } - // check for the special "bind to everything address" if (serverAddress.address.getHost().equals("0.0.0.0")) { // can't get the address from the bind, so we'll do our best to invent our hostname try { - serverAddress = new ServerAddress(finalServer, HostAndPort + serverAddress = new ServerAddress(serverAddress.server, HostAndPort .fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort())); } catch (UnknownHostException e) { throw new TTransportException(e); diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 6bf9218e60..c4eed579b3 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -301,9 +301,10 @@ public class TServerUtilsTest { // misconfiguration) String hostname = "localhost"; - return TServerUtils.startServer(context, hostname, Property.TSERV_CLIENTPORT, processor, - "TServerUtilsTest", "TServerUtilsTestThread", Property.TSERV_PORTSEARCH, - Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); - + ServerAddress sa = TServerUtils.createThriftServer(context, hostname, Property.TSERV_CLIENTPORT, + processor, "TServerUtilsTest", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, + Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); + sa.startThriftServer("TServerUtilsTestThread"); + return sa; } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 72145ab32d..2bfc6b7276 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -322,10 +322,11 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac ClientServiceHandler clientHandler = new ClientServiceHandler(getContext()); var processor = ThriftProcessorTypes.getCompactorTProcessor(this, clientHandler, getCompactorThriftHandlerInterface(), getContext()); - ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), + ServerAddress sp = TServerUtils.createThriftServer(getContext(), getHostname(), Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), - "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, + Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK); + sp.startThriftServer("Thrift Client Server"); setHostname(sp.address); LOG.info("address = {}", sp.address); return sp; diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 2d7afe261d..46e4fe8c8d 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -429,12 +429,13 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { IntStream port = getConfiguration().getPortStream(Property.GC_PORT); HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port); long maxMessageSize = getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE); - ServerAddress server = TServerUtils.startTServer(getConfiguration(), - getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), - "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, - getContext().getServerSslParams(), getContext().getSaslParams(), 0, - getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), false, - addresses); + ServerAddress server = + TServerUtils.createThriftServer(getConfiguration(), getContext().getThriftServerType(), + processor, this.getClass().getSimpleName(), 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, + 1000, maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), + 0, getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), + false, addresses); + server.startThriftServer("GC Monitor Service"); setHostname(server.address); log.debug("Starting garbage collector listening on " + server.address); return server.address; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index edeef321a6..5da833ee84 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -52,7 +52,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -97,7 +96,6 @@ import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; -import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.ManagerState; @@ -133,7 +131,6 @@ import org.apache.accumulo.manager.tableOps.TraceRepo; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator.UpgradeStatus; import org.apache.accumulo.server.AbstractServer; -import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.fs.VolumeManager; @@ -145,7 +142,6 @@ import org.apache.accumulo.server.manager.state.DeadServerList; import org.apache.accumulo.server.manager.state.TabletServerState; import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.manager.state.UnassignedTablet; -import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -183,8 +179,7 @@ import io.opentelemetry.context.Scope; * <p> * The manager will also coordinate log recoveries and reports general status. */ -public class Manager extends AbstractServer - implements LiveTServerSet.Listener, TableObserver, HighlyAvailableService { +public class Manager extends AbstractServer implements LiveTServerSet.Listener, TableObserver { static final Logger log = LoggerFactory.getLogger(Manager.class); @@ -236,8 +231,6 @@ public class Manager extends AbstractServer final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); - private final AtomicBoolean managerInitialized = new AtomicBoolean(false); - private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; private ThreadPoolExecutor tabletRefreshThreadPool; @@ -1146,26 +1139,24 @@ public class Manager extends AbstractServer managerClientHandler = new ManagerClientServiceHandler(this); compactionCoordinator = new CompactionCoordinator(context, security, fateRefs, this); - // Start the Manager's Client service - // Ensure that calls before the manager gets the lock fail - ManagerClientService.Iface haProxy = - HighlyAvailableServiceWrapper.service(managerClientHandler, this); - ServerAddress sa; var processor = ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, - compactionCoordinator.getThriftService(), haProxy, getContext()); + compactionCoordinator.getThriftService(), managerClientHandler, getContext()); try { - sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, - "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, + sa = TServerUtils.createThriftServer(context, getHostname(), Property.MANAGER_CLIENTPORT, + processor, "Manager", null, Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } - clientService = sa.server; - log.info("Started Manager client service at {}", sa.address); - // block until we can obtain the ZK lock for the manager + // block until we can obtain the ZK lock for the manager. Create the + // initial lock using ThriftService.NONE. This will allow the lock + // allocation to occur, but prevent any services from getting the + // Manager address for the COORDINATOR, FATE, and MANAGER services. + // The lock data is replaced below and the manager address is exposed + // for each of these services. ServiceLockData sld; try { sld = getManagerLock(context.getServerPaths().createManagerPath()); @@ -1402,7 +1393,16 @@ public class Manager extends AbstractServer log.info("AuthenticationTokenSecretManager is initialized"); } - UUID uuid = sld.getServerUUID(ThriftService.MANAGER); + // Now that the Manager is up, start the ThriftServer + sa.startThriftServer("Manager Client Service Handler"); + clientService = sa.server; + log.info("Started Manager client service at {}", sa.address); + + // Replace the ServiceLockData information in the Manager lock node in ZooKeeper. + // This advertises the address that clients can use to connect to the Manager + // for the Coordinator, Fate, and Manager services. Do **not** do this until + // after the upgrade process is finished and the dependent services are started. + UUID uuid = sld.getServerUUID(ThriftService.NONE); ServiceDescriptors descriptors = new ServiceDescriptors(); for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, ThriftService.FATE}) { @@ -1418,13 +1418,6 @@ public class Manager extends AbstractServer throw new IllegalStateException("Exception updating manager lock", e); } - while (!clientService.isServing()) { - sleepUninterruptibly(100, MILLISECONDS); - } - - // The manager is fully initialized. Clients are allowed to connect now. - managerInitialized.set(true); - while (!isShutdownRequested() && clientService.isServing()) { if (Thread.currentThread().isInterrupted()) { log.info("Server process thread has been interrupted, shutting down"); @@ -1607,7 +1600,7 @@ public class Manager extends AbstractServer UUID zooLockUUID = UUID.randomUUID(); ServiceDescriptors descriptors = new ServiceDescriptors(); - descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.MANAGER, + descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.NONE, managerClientAddress, this.getResourceGroup())); ServiceLockData sld = new ServiceLockData(descriptors); managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); @@ -1871,12 +1864,6 @@ public class Manager extends AbstractServer return timeKeeper.getTime(); } - @Override - public boolean isActiveService() { - return managerInitialized.get(); - } - - @Override public boolean isUpgrading() { return upgradeCoordinator.getStatus() != UpgradeCoordinator.UpgradeStatus.COMPLETE; } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index d0ec4b13dc..51ea5edc4c 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -82,7 +82,6 @@ import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionI import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactions; import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorDetails; import org.apache.accumulo.server.AbstractServer; -import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.TableInfoUtil; import org.apache.thrift.transport.TTransportException; @@ -109,7 +108,7 @@ import com.google.common.net.HostAndPort; /** * Serve manager statistics with an embedded web server. */ -public class Monitor extends AbstractServer implements HighlyAvailableService, Connection.Listener { +public class Monitor extends AbstractServer implements Connection.Listener { private static final Logger log = LoggerFactory.getLogger(Monitor.class); private static final int REFRESH_TIME = 5; @@ -140,7 +139,6 @@ public class Monitor extends AbstractServer implements HighlyAvailableService, C private long totalHoldTime = 0; private long totalLookups = 0; private int totalTables = 0; - private final AtomicBoolean monitorInitialized = new AtomicBoolean(false); private EventCounter lookupRateTracker = new EventCounter(); private EventCounter indexCacheHitTracker = new EventCounter(); @@ -430,8 +428,6 @@ public class Monitor extends AbstractServer implements HighlyAvailableService, C }).start(); Threads.createThread("Metric Fetcher Thread", fetcher).start(); - monitorInitialized.set(true); - while (!isShutdownRequested()) { if (Thread.currentThread().isInterrupted()) { log.info("Server process thread has been interrupted, shutting down"); @@ -825,11 +821,6 @@ public class Monitor extends AbstractServer implements HighlyAvailableService, C return lookupRateTracker.calculateRate(); } - @Override - public boolean isActiveService() { - return monitorInitialized.get(); - } - public Optional<HostAndPort> getCoordinatorHost() { return coordinatorHost; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 535d100dfe..595df77ba4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -304,11 +304,11 @@ public class ScanServer extends AbstractServer TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, clientHandler, this, getContext()); - ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), + ServerAddress sp = TServerUtils.createThriftServer(getContext(), getHostname(), Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), - "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, - Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK); - + Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, Property.SSERV_MINTHREADS_TIMEOUT, + Property.SSERV_THREADCHECK); + sp.startThriftServer("Thrift Client Server"); setHostname(sp.address); LOG.info("address = {}", sp.address); return sp; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 5a71c6cad2..93a53b83d3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -413,10 +413,11 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private HostAndPort startServer(String address, TProcessor processor) throws UnknownHostException { - ServerAddress sp = TServerUtils.startServer(getContext(), address, Property.TSERV_CLIENTPORT, - processor, this.getClass().getSimpleName(), "Thrift Client Server", - Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, - Property.TSERV_THREADCHECK); + ServerAddress sp = + TServerUtils.createThriftServer(getContext(), address, Property.TSERV_CLIENTPORT, processor, + this.getClass().getSimpleName(), Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, + Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); + sp.startThriftServer("Thrift Client Server"); this.server = sp.server; return sp.address; } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 2155fc93e0..71076c8764 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -123,12 +123,12 @@ public class ZombieTServer { ThriftProcessorTypes.TABLET_SCAN.getTProcessor(TabletScanClientService.Processor.class, TabletScanClientService.Iface.class, tch, context)); - ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), - ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking dead", 2, + ServerAddress serverPort = TServerUtils.createThriftServer(context.getConfiguration(), + ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, HostAndPort.fromParts("0.0.0.0", port)); - + serverPort.startThriftServer("walking dead"); String addressString = serverPort.address.toString(); var zLockPath = context.getServerPaths() diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 6335a8dd62..7a09fc5e85 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -79,6 +79,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.TabletStateStore; +import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; import org.apache.accumulo.server.rpc.ThriftServerType; @@ -313,10 +314,12 @@ public class NullTserver { TabletManagementClientService.Processor.class, TabletManagementClientService.Iface.class, tch, context)); - TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, - muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, - 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), - context.getMetricsInfo(), false, HostAndPort.fromParts("0.0.0.0", opts.port)); + ServerAddress sa = TServerUtils.createThriftServer(context.getConfiguration(), + ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", 2, + ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, + context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, + HostAndPort.fromParts("0.0.0.0", opts.port)); + sa.startThriftServer("null tserver"); AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() {