This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new bab03236bf avoids reassignment of hosted tablet during shutdown (#5751) bab03236bf is described below commit bab03236bf982f966be8c95dd3559d32aad64c89 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jul 23 12:32:48 2025 -0400 avoids reassignment of hosted tablet during shutdown (#5751) When a tablet server is gracefully shutdown it notifies the manager it is shutting down. The manager would remove the server from the set of live tservers which caused the manger to treat all tablets on that tserver as assigned to dead tservers. Saw this when debugging a failure with GracefulShutdownIT. Saw the manager initiate log recovery and reassignment for the root tablet before it unloaded from the tserver that was shutting down. This caused errors with the metadata tablet consistency check during unload of the root tablet because the manager had modified the tablet metadata before unload. This fix modifies the tablet server to use the existing fate operation for shutting down a tserver. --- .../accumulo/server/manager/LiveTServerSet.java | 19 ---- .../java/org/apache/accumulo/manager/Manager.java | 9 +- .../manager/ManagerClientServiceHandler.java | 11 ++- .../manager/tableOps/ShutdownTServerTest.java | 6 +- .../org/apache/accumulo/tserver/TabletServer.java | 105 +++++++-------------- 5 files changed, 54 insertions(+), 96 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 1757e29580..e6a722dc9c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -28,7 +28,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; @@ -243,9 +242,6 @@ public class LiveTServerSet implements Watcher { // as above, indexed by TServerInstance private final Map<TServerInstance,TServerInfo> currentInstances = new HashMap<>(); - private final ConcurrentHashMap<String,TServerInfo> serversShuttingDown = - new ConcurrentHashMap<>(); - // The set of entries in zookeeper without locks, and the first time each was noticed private final Map<String,Long> locklessServers = new HashMap<>(); @@ -268,19 +264,6 @@ public class LiveTServerSet implements Watcher { .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS)); } - public void tabletServerShuttingDown(String server) { - - TServerInfo info = null; - synchronized (this) { - info = current.get(server); - } - if (info != null) { - serversShuttingDown.put(server, info); - } else { - log.info("Tablet Server reported it's shutting down, but not in list of current servers"); - } - } - public synchronized void scanServers() { try { final Set<TServerInstance> updates = new HashSet<>(); @@ -329,7 +312,6 @@ public class LiveTServerSet implements Watcher { doomed.add(info.instance); current.remove(zPath); currentInstances.remove(info.instance); - serversShuttingDown.remove(zPath); } Long firstSeen = locklessServers.get(zPath); @@ -408,7 +390,6 @@ public class LiveTServerSet implements Watcher { public synchronized Set<TServerInstance> getCurrentServers() { Set<TServerInstance> current = currentInstances.keySet(); - serversShuttingDown.values().forEach(tsi -> current.remove(tsi.instance)); return new HashSet<>(current); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 90346bdea0..1bb1516991 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1818,9 +1818,12 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, } // recovers state from the persistent transaction to shutdown a server - public void shutdownTServer(TServerInstance server) { - nextEvent.event("Tablet Server shutdown requested for %s", server); - serversToShutdown.add(server); + public boolean shutdownTServer(TServerInstance server) { + if (serversToShutdown.add(server)) { + nextEvent.event("Tablet Server shutdown requested for %s", server); + return true; + } + return false; } public EventCoordinator getEventCoordinator() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 505eefab7f..b0f173974b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -349,7 +349,16 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { SecurityErrorCode.PERMISSION_DENIED); } log.info("Tablet Server {} has reported it's shutting down", tabletServer); - manager.tserverSet.tabletServerShuttingDown(tabletServer); + var tserver = new TServerInstance(tabletServer); + if (manager.shutdownTServer(tserver)) { + // If there is an exception seeding the fate tx this should cause the RPC to fail which should + // cause the tserver to halt. Because of that not making an attempt to handle failure here. + Fate<Manager> fate = manager.fate(); + long tid = fate.startTransaction(); + String msg = "Shutdown tserver " + tabletServer; + fate.seedTransaction("ShutdownTServer", tid, + new TraceRepo<>(new ShutdownTServer(tserver, false)), true, msg); + } } @Override diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java index 8384a4a1b0..92b164c174 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java @@ -55,8 +55,7 @@ public class ShutdownTServerTest { // Put in a table info record, don't care what status.tableMap.put("a_table", new TableInfo()); - manager.shutdownTServer(tserver); - EasyMock.expectLastCall().once(); + EasyMock.expect(manager.shutdownTServer(tserver)).andReturn(true).once(); EasyMock.expect(manager.onlineTabletServers()).andReturn(Collections.singleton(tserver)); EasyMock.expect(manager.getConnection(tserver)).andReturn(tserverCnxn); EasyMock.expect(tserverCnxn.getTableMap(false)).andReturn(status); @@ -74,8 +73,7 @@ public class ShutdownTServerTest { // reset the table map to the empty set to simulate all tablets unloaded status.tableMap = new HashMap<>(); - manager.shutdownTServer(tserver); - EasyMock.expectLastCall().once(); + EasyMock.expect(manager.shutdownTServer(tserver)).andReturn(false).once(); EasyMock.expect(manager.onlineTabletServers()).andReturn(Collections.singleton(tserver)); EasyMock.expect(manager.getConnection(tserver)).andReturn(tserverCnxn); EasyMock.expect(tserverCnxn.getTableMap(false)).andReturn(status); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index e66f6ae2d4..2573d6b451 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -54,7 +54,6 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -89,7 +88,6 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.process.thrift.ServerProcessService; @@ -97,7 +95,6 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; import org.apache.accumulo.core.util.Halt; @@ -109,7 +106,6 @@ import org.apache.accumulo.core.util.Retry.RetryFactory; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.core.util.threads.ThreadPoolNames; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.AbstractServer; @@ -954,13 +950,29 @@ public class TabletServer extends AbstractServer } } - // Tell the Manager we are shutting down so that it doesn't try - // to assign tablets. ManagerClientService.Client iface = managerConnection(getManagerAddress()); try { - iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(), - getClientAddressString()); - } catch (TException e) { + // Ask the manager to unload our tablets and stop loading new tablets + if (iface == null) { + Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!"); + } else { + iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(), + getTabletSession().getHostPortSession()); + } + + boolean managerDown = false; + while (!getOnlineTablets().isEmpty()) { + log.info("Shutdown requested, waiting for manager to unload {} tablets", + getOnlineTablets().size()); + + managerDown = sendManagerMessages(managerDown, iface); + + UtilWaitThread.sleep(1000); + } + + sendManagerMessages(managerDown, iface); + + } catch (TException | RuntimeException e) { Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!", e); } finally { returnManagerConnection(iface); @@ -971,66 +983,6 @@ public class TabletServer extends AbstractServer this.replServer.stop(); } - // Best-effort attempt at unloading tablets. - log.debug("Unloading tablets"); - final List<Future<?>> futures = new ArrayList<>(); - final ThreadPoolExecutor tpe = getContext().threadPools() - .getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8) - .numMaxThreads(16).build(); - - iface = managerConnection(getManagerAddress()); - boolean managerDown = false; - - try { - for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) { - getOnlineTablets().keySet().forEach(ke -> { - if (DataLevel.of(ke.tableId()) == level) { - futures.add( - tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000))); - } - }); - while (!futures.isEmpty()) { - Iterator<Future<?>> unloads = futures.iterator(); - while (unloads.hasNext()) { - Future<?> f = unloads.next(); - if (f.isDone()) { - if (!managerDown) { - ManagerMessage mm = managerMessages.poll(); - try { - mm.send(getContext().rpcCreds(), getClientAddressString(), iface); - } catch (TException e) { - managerDown = true; - LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", - e.getMessage()); - } - } - unloads.remove(); - } - } - log.debug("Waiting on {} {} tablets to close.", futures.size(), level); - UtilWaitThread.sleep(1000); - } - log.debug("All {} tablets unloaded", level); - } - } finally { - if (!managerDown) { - try { - ManagerMessage mm = managerMessages.poll(); - do { - if (mm != null) { - mm.send(getContext().rpcCreds(), getClientAddressString(), iface); - } - mm = managerMessages.poll(); - } while (mm != null); - } catch (TException e) { - LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", - e.getMessage()); - } - } - returnManagerConnection(iface); - tpe.shutdown(); - } - log.debug("Stopping Thrift Servers"); if (server != null) { server.stop(); @@ -1054,6 +1006,21 @@ public class TabletServer extends AbstractServer } } + private boolean sendManagerMessages(boolean managerDown, ManagerClientService.Client iface) { + ManagerMessage mm = managerMessages.poll(); + while (mm != null && !managerDown) { + try { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + mm = managerMessages.poll(); + } catch (TException e) { + managerDown = true; + LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + return managerDown; + } + @SuppressWarnings("deprecation") private void setupReplication(AccumuloConfiguration aconf) { // Start the thrift service listening for incoming replication requests