This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new bdec3a7c3a fixes clearing suspension for offline tables (#4295) bdec3a7c3a is described below commit bdec3a7c3a5d373cf95fd7123363dd35679a0ab8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Feb 22 18:35:10 2024 -0500 fixes clearing suspension for offline tables (#4295) There was code in the manager that seemed to have the intent of clearing suspension markers in tablets for an offline table. This code was not working or tested. This commit fixes this code and adds a tests that validates that suspension markers are removed when a table is taken offline. fixes #3314 --- .../server/manager/state/MetaDataStateStore.java | 3 +- .../manager/state/TabletStateChangeIterator.java | 3 + .../accumulo/test/manager/SuspendedTabletsIT.java | 251 +++++++++++++-------- 3 files changed, 159 insertions(+), 98 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 8c7bc888eb..9af443895a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -150,9 +150,8 @@ class MetaDataStateStore implements TabletStateStore { try (var tabletsMutator = ample.mutateTablets()) { for (TabletLocationState tls : tablets) { if (tls.suspend != null) { - continue; + tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate(); } - tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate(); } } catch (RuntimeException ex) { throw new DistributedStoreException(ex); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java index c4c9e611de..42fd9cb05c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java @@ -209,6 +209,9 @@ public class TabletStateChangeIterator extends SkippingIterator { case ASSIGNED_TO_DEAD_SERVER: return; case SUSPENDED: + // Always return data about suspended tablets. Need to clear the suspension stats when the + // tablet is offline. May need to assign the tablet when the tablet is online. + return; case UNASSIGNED: if (shouldBeOnline) { return; diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index 46e81fb055..764bc1dfc9 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -83,7 +83,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { private static ExecutorService THREAD_POOL; public static final int TSERVERS = 3; - public static final long SUSPEND_DURATION = 80; public static final int TABLETS = 30; private ProcessReference metadataTserverProcess; @@ -95,7 +94,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { - cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s"); cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s"); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); // Start with 1 tserver, we'll increase that later @@ -149,91 +147,45 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { getCluster().start(); } + enum AfterSuspendAction { + RESUME("80s"), + // Set a long suspend time for testing offline table, want the suspension to be cleared because + // the tablet went offline and not the because the suspension timed out. + OFFLINE("800s"); + + public final String suspendTime; + + AfterSuspendAction(String suspendTime) { + this.suspendTime = suspendTime; + } + } + @Test public void crashAndResumeTserver() throws Exception { // Run the test body. When we get to the point where we need a tserver to go away, get rid of it // via crashing - suspensionTestBody((ctx, locs, count) -> { - // Exclude the tablet server hosting the metadata table from the list and only - // kill tablet servers that are not hosting the metadata table. - List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER) - .stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList()); - Collections.shuffle(procs, random); - assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist"); - assertTrue(procs.size() >= count, "Attempting to kill more tservers (" + count - + ") than exist in the cluster (" + procs.size() + ")"); + suspensionTestBody(new CrashTserverKiller(), AfterSuspendAction.RESUME); + } - for (int i = 0; i < count; ++i) { - ProcessReference pr = procs.get(i); - log.info("Crashing {}", pr.getProcess()); - getCluster().killProcess(ServerType.TABLET_SERVER, pr); - } - }); + @Test + public void crashAndOffline() throws Exception { + // Test to ensure that taking a table offline causes the suspension markers to be cleared. + // Suspension markers can prevent balancing and possibly cause other problems, so its good to + // clear them for offline tables. + suspensionTestBody(new CrashTserverKiller(), AfterSuspendAction.OFFLINE); } @Test public void shutdownAndResumeTserver() throws Exception { // Run the test body. When we get to the point where we need tservers to go away, stop them via // a clean shutdown. - suspensionTestBody((ctx, locs, count) -> { - Set<TServerInstance> tserverSet = new HashSet<>(); - Set<TServerInstance> metadataServerSet = new HashSet<>(); - - TabletLocator tl = TabletLocator.getLocator(ctx, MetadataTable.ID); - for (TabletLocationState tls : locs.locationStates.values()) { - if (tls.current != null) { - // add to set of all servers - tserverSet.add(tls.current.getServerInstance()); - - // get server that the current tablets metadata is on - TabletLocator.TabletLocation tab = - tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false); - // add it to the set of servers with metadata - metadataServerSet - .add(new TServerInstance(tab.tablet_location, Long.valueOf(tab.tablet_session, 16))); - } - } - - // remove servers with metadata on them from the list of servers to be shutdown - assertEquals(1, metadataServerSet.size(), "Expecting a single tServer in metadataServerSet"); - tserverSet.removeAll(metadataServerSet); - - assertEquals(TSERVERS - 1, tserverSet.size(), - "Expecting " + (TSERVERS - 1) + " tServers in shutdown-list"); - - List<TServerInstance> tserversList = new ArrayList<>(tserverSet); - Collections.shuffle(tserversList, random); - - for (int i1 = 0; i1 < count; ++i1) { - final String tserverName = tserversList.get(i1).getHostPortSession(); - ThriftClientTypes.MANAGER.executeVoid(ctx, client -> { - log.info("Sending shutdown command to {} via ManagerClientService", tserverName); - client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false); - }); - } + suspensionTestBody(new ShutdownTserverKiller(), AfterSuspendAction.RESUME); + } - log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es"); - for (int i2 = 0; i2 < 10; ++i2) { - List<ProcessReference> deadProcs = new ArrayList<>(); - for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { - Process p = pr1.getProcess(); - if (!p.isAlive()) { - deadProcs.add(pr1); - } - } - for (ProcessReference pr2 : deadProcs) { - log.info("Process {} is dead, informing cluster control about this", pr2.getProcess()); - getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2); - --count; - } - if (count == 0) { - return; - } else { - Thread.sleep(SECONDS.toMillis(2)); - } - } - throw new IllegalStateException("Tablet servers didn't die!"); - }); + @Test + public void shutdownAndOffline() throws Exception { + // Test to ensure that taking a table offline causes the suspension markers to be cleared. + suspensionTestBody(new ShutdownTserverKiller(), AfterSuspendAction.OFFLINE); } /** @@ -241,7 +193,8 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { * * @param serverStopper callback which shuts down some tablet servers. */ - private void suspensionTestBody(TServerKiller serverStopper) throws Exception { + private void suspensionTestBody(TServerKiller serverStopper, AfterSuspendAction action) + throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { ClientContext ctx = (ClientContext) client; @@ -253,6 +206,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { } log.info("Creating table " + tableName); NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints); + ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), action.suspendTime)); ctx.tableOperations().create(tableName, ntc); // Wait for all of the tablets to hosted ... @@ -300,27 +254,43 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { assertEquals(beforeDeathState.hosted.get(server), deadTabletsByServer.get(server)); } assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount); - // Restart the first tablet server, making sure it ends up on the same port - HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); - log.info("Restarting " + restartedServer); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER, - Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), - Property.TSERV_PORTSEARCH.getKey(), "false"), - 1); - - // Eventually, the suspended tablets should be reassigned to the newly alive tserver. - log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); - while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) { - Thread.sleep(1000); - ds = TabletLocations.retrieve(ctx, tableName); - } - assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); - // Finally, after much longer, remaining suspended tablets should be reassigned. - log.info("Awaiting tablet reassignment for remaining tablets"); - while (ds.hostedCount != TABLETS) { - Thread.sleep(1000); - ds = TabletLocations.retrieve(ctx, tableName); + assertTrue(ds.suspendedCount > 0); + + if (action == AfterSuspendAction.OFFLINE) { + client.tableOperations().offline(tableName, true); + + while (ds.suspendedCount > 0) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + log.info("Waiting for suspended {}", ds.suspended); + } + } else if (action == AfterSuspendAction.RESUME) { + // Restart the first tablet server, making sure it ends up on the same port + HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); + log.info("Restarting " + restartedServer); + getCluster().getClusterControl() + .start( + ServerType.TABLET_SERVER, Map.of(Property.TSERV_CLIENTPORT.getKey(), + "" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), + 1); + + // Eventually, the suspended tablets should be reassigned to the newly alive tserver. + log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); + while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + } + assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); + + // Finally, after much longer, remaining suspended tablets should be reassigned. + log.info("Awaiting tablet reassignment for remaining tablets"); + while (ds.hostedCount != TABLETS) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + } + } else { + throw new IllegalStateException("Unknown action " + action); } } } @@ -330,6 +300,95 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { throws Exception; } + private class ShutdownTserverKiller implements TServerKiller { + + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) + throws Exception { + + Set<TServerInstance> tserverSet = new HashSet<>(); + Set<TServerInstance> metadataServerSet = new HashSet<>(); + + TabletLocator tl = TabletLocator.getLocator(ctx, MetadataTable.ID); + for (TabletLocationState tls : locs.locationStates.values()) { + if (tls.current != null) { + // add to set of all servers + tserverSet.add(tls.current.getServerInstance()); + + // get server that the current tablets metadata is on + TabletLocator.TabletLocation tab = + tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false); + // add it to the set of servers with metadata + metadataServerSet + .add(new TServerInstance(tab.tablet_location, Long.valueOf(tab.tablet_session, 16))); + } + } + + // remove servers with metadata on them from the list of servers to be shutdown + assertEquals(1, metadataServerSet.size(), "Expecting a single tServer in metadataServerSet"); + tserverSet.removeAll(metadataServerSet); + + assertEquals(TSERVERS - 1, tserverSet.size(), + "Expecting " + (TSERVERS - 1) + " tServers in shutdown-list"); + + List<TServerInstance> tserversList = new ArrayList<>(tserverSet); + Collections.shuffle(tserversList, random); + + for (int i1 = 0; i1 < count; ++i1) { + final String tserverName = tserversList.get(i1).getHostPortSession(); + ThriftClientTypes.MANAGER.executeVoid(ctx, client -> { + log.info("Sending shutdown command to {} via ManagerClientService", tserverName); + client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false); + }); + } + + log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es"); + for (int i2 = 0; i2 < 10; ++i2) { + List<ProcessReference> deadProcs = new ArrayList<>(); + for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { + Process p = pr1.getProcess(); + if (!p.isAlive()) { + deadProcs.add(pr1); + } + } + for (ProcessReference pr2 : deadProcs) { + log.info("Process {} is dead, informing cluster control about this", pr2.getProcess()); + getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2); + --count; + } + if (count == 0) { + return; + } else { + Thread.sleep(SECONDS.toMillis(2)); + } + } + throw new IllegalStateException("Tablet servers didn't die!"); + + } + } + + private class CrashTserverKiller implements TServerKiller { + + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) + throws Exception { + // Exclude the tablet server hosting the metadata table from the list and only + // kill tablet servers that are not hosting the metadata table. + List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER) + .stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList()); + Collections.shuffle(procs, random); + assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist"); + assertTrue(procs.size() >= count, "Attempting to kill more tservers (" + count + + ") than exist in the cluster (" + procs.size() + ")"); + + for (int i = 0; i < count; ++i) { + ProcessReference pr = procs.get(i); + log.info("Crashing {}", pr.getProcess()); + getCluster().killProcess(ServerType.TABLET_SERVER, pr); + } + } + } + private static final AtomicInteger threadCounter = new AtomicInteger(0); @BeforeAll