This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit d7348ae5219e40f7e960eb585b14960a1a242bcd Merge: 58be770cb5 ea5afb9d85 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Feb 22 19:24:44 2024 -0500 Merge branch 'main' into elasticity .../server/manager/state/MetaDataStateStore.java | 9 +- .../accumulo/test/manager/SuspendedTabletsIT.java | 160 ++++++++++++++------- 2 files changed, 112 insertions(+), 57 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 66f9b87a0a,ccb5dc2747..689a667e4b --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@@ -64,22 -67,29 +64,21 @@@ class MetaDataStateStore extends Abstra } @Override - public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException { - try (var tabletsMutator = ample.mutateTablets()) { - for (Assignment assignment : assignments) { - TabletMutator tabletMutator = tabletsMutator.mutateTablet(assignment.tablet); - tabletMutator.putLocation(Location.current(assignment.server)); - ManagerMetadataUtil.updateLastForAssignmentMode(context, tabletMutator, assignment.server, - assignment.lastLocation); - tabletMutator.deleteLocation(Location.future(assignment.server)); - tabletMutator.deleteSuspension(); - tabletMutator.mutate(); + public void unsuspend(Collection<TabletMetadata> tablets) throws DistributedStoreException { + try (var tabletsMutator = ample.conditionallyMutateTablets()) { + for (TabletMetadata tm : tablets) { + if (tm.getSuspend() != null) { - continue; ++ // ELASTICITY_TODO add conditional mutation check that tls.suspend is what currently ++ // exists in the tablet ++ tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension() ++ .submit(tabletMetadata -> tabletMetadata.getSuspend() == null); + } - - // ELASTICITY_TODO pending #3314, add conditional mutation check that tls.suspend exists - tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension() - .submit(tabletMetadata -> tabletMetadata.getSuspend() == null); } - } catch (RuntimeException ex) { - throw new DistributedStoreException(ex); - } - } - @Override - public void setFutureLocations(Collection<Assignment> assignments) - throws DistributedStoreException { - try (var tabletsMutator = ample.mutateTablets()) { - for (Assignment assignment : assignments) { - tabletsMutator.mutateTablet(assignment.tablet).deleteSuspension() - .putLocation(Location.future(assignment.server)).mutate(); + boolean unacceptedConditions = tabletsMutator.process().values().stream() + .anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED); + if (unacceptedConditions) { + throw new DistributedStoreException("Some mutations failed to satisfy conditions"); } } catch (RuntimeException ex) { throw new DistributedStoreException(ex); diff --cc test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index 9c8dfccc0b,8ebb75378b..03f42fa253 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@@ -19,11 -19,16 +19,12 @@@ package org.apache.accumulo.test.manager; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.net.UnknownHostException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@@ -77,18 -78,14 +78,17 @@@ import com.google.common.collect.HashMu import com.google.common.collect.SetMultimap; import com.google.common.net.HostAndPort; -public class SuspendedTabletsIT extends ConfigurableMacBase { +public class SuspendedTabletsIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class); private static ExecutorService THREAD_POOL; + private static final String TEST_GROUP_NAME = "SUSPEND_TEST"; public static final int TSERVERS = 3; - public static final long SUSPEND_DURATION = 80; public static final int TABLETS = 30; - private ProcessReference metadataTserverProcess; + private String defaultGroup; + private Set<String> testGroup = new HashSet<>(); + private List<ProcessReference> tabletServerProcesses; @Override protected Duration defaultTimeout() { @@@ -96,47 -93,76 +96,59 @@@ } @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration fsConf) { + cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, "2"); + cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, "10s"); + cfg.setProperty(Property.TSERV_MIGRATE_MAXCONCURRENT, "50"); - cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s"); cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s"); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - // Start with 1 tserver, we'll increase that later - cfg.setNumTservers(1); - // config custom balancer to keep all metadata on one server - cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, "1ms"); - cfg.setProperty(Property.MANAGER_TABLET_BALANCER.getKey(), - HostAndPortRegexTableLoadBalancer.class.getName()); + cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); } - @Override @BeforeEach public void setUp() throws Exception { - super.setUp(); - - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - // Wait for all tablet servers to come online and then choose the first server in the list. - // Update the balancer configuration to assign all metadata tablets to that server (and - // everything else to other servers). - InstanceOperations iops = client.instanceOperations(); - List<String> tservers = iops.getTabletServers(); - while (tservers == null || tservers.size() < 1) { - Thread.sleep(1000L); - tservers = client.instanceOperations().getTabletServers(); - } - HostAndPort metadataServer = HostAndPort.fromString(tservers.get(0)); - log.info("Configuring balancer to assign all metadata tablets to {}", metadataServer); - iops.setProperty( - HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + AccumuloTable.METADATA.tableName(), - metadataServer.toString()); - // Wait for the balancer to assign all metadata tablets to the chosen server. - ClientContext ctx = (ClientContext) client; - TabletLocations tl = TabletLocations.retrieve(ctx, AccumuloTable.METADATA.tableName(), - AccumuloTable.ROOT.tableName()); - while (tl.hosted.keySet().size() != 1 || !tl.hosted.containsKey(metadataServer)) { - log.info("Metadata tablets are not hosted on the correct server. Waiting for balancer..."); - Thread.sleep(1000L); - tl = TabletLocations.retrieve(ctx, AccumuloTable.METADATA.tableName(), - AccumuloTable.ROOT.tableName()); + MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster(); + ProcessReference defaultTabletServer = + mac.getProcesses().get(ServerType.TABLET_SERVER).iterator().next(); + assertNotNull(defaultTabletServer); + + mac.getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(TEST_GROUP_NAME, + 2); + getCluster().start(); + + tabletServerProcesses = mac.getProcesses().get(ServerType.TABLET_SERVER).stream() + .filter(p -> !p.equals(defaultTabletServer)).collect(Collectors.toList()); + + Map<String,String> hostAndGroup = TabletResourceGroupBalanceIT.getTServerGroups(mac); + hostAndGroup.forEach((k, v) -> { + if (v.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME)) { + defaultGroup = k; + } else { + testGroup.add(k); } - log.info("Metadata tablets are now hosted on {}", metadataServer); - } + }); - // Since we started only a single tablet server, we know it's the one hosting the - // metadata table. Save its process reference off so we can exclude it later when - // killing tablet servers. - Collection<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER); - assertEquals(1, procs.size(), "Expected a single tserver process"); - metadataTserverProcess = procs.iterator().next(); + assertNotNull(defaultGroup); + assertEquals(2, testGroup.size()); - // Update the number of tservers and start the new tservers. - getCluster().getConfig().setNumTservers(TSERVERS); - getCluster().start(); + log.info("TabletServers in default group: {}", defaultGroup); + log.info("TabletServers in {} group: {}", TEST_GROUP_NAME, testGroup); } + enum AfterSuspendAction { + RESUME("80s"), + // Set a long suspend time for testing offline table, want the suspension to be cleared because + // the tablet went offline and not the because the suspension timed out. + OFFLINE("800s"); + + public final String suspendTime; + + AfterSuspendAction(String suspendTime) { + this.suspendTime = suspendTime; + } + } + @Test public void crashAndResumeTserver() throws Exception { // Run the test body. When we get to the point where we need a tserver to go away, get rid of it @@@ -184,9 -196,9 +182,10 @@@ * * @param serverStopper callback which shuts down some tablet servers. */ - private void suspensionTestBody(TServerKiller serverStopper) throws Exception { + private void suspensionTestBody(TServerKiller serverStopper, AfterSuspendAction action) + throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { ClientContext ctx = (ClientContext) client; String tableName = getUniqueNames(1)[0]; @@@ -196,12 -208,8 +195,13 @@@ splitPoints.add(new Text("" + i)); } log.info("Creating table " + tableName); - NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints); - ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), action.suspendTime)); + Map<String,String> properties = new HashMap<>(); + properties.put("table.custom.assignment.group", TEST_GROUP_NAME); ++ properties.put(Property.TABLE_SUSPEND_DURATION.getKey(), action.suspendTime); + + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints) + .withInitialTabletAvailability(TabletAvailability.HOSTED); + ntc.setProperties(properties); ctx.tableOperations().create(tableName, ntc); // Wait for all of the tablets to hosted ... @@@ -251,27 -257,43 +251,42 @@@ assertEquals(beforeDeathState.hosted.get(server), deadTabletsByServer.get(server)); } assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount); - // Restart the first tablet server, making sure it ends up on the same port - HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); - log.info("Restarting " + restartedServer); - ((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, ServerType.TABLET_SERVER, - Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), - Property.TSERV_PORTSEARCH.getKey(), "false"), - "-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME); - - // Eventually, the suspended tablets should be reassigned to the newly alive tserver. - log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); - while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) { - Thread.sleep(1000); - ds = TabletLocations.retrieve(ctx, tableName); - } - assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); - // Finally, after much longer, remaining suspended tablets should be reassigned. - log.info("Awaiting tablet reassignment for remaining tablets (suspension timeout)"); - while (ds.hostedCount != TABLETS) { - Thread.sleep(1000); - ds = TabletLocations.retrieve(ctx, tableName); + assertTrue(ds.suspendedCount > 0); + + if (action == AfterSuspendAction.OFFLINE) { + client.tableOperations().offline(tableName, true); + + while (ds.suspendedCount > 0) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + log.info("Waiting for suspended {}", ds.suspended); + } + } else if (action == AfterSuspendAction.RESUME) { + // Restart the first tablet server, making sure it ends up on the same port + HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); + log.info("Restarting " + restartedServer); - getCluster().getClusterControl() - .start( - ServerType.TABLET_SERVER, Map.of(Property.TSERV_CLIENTPORT.getKey(), - "" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), - 1); ++ ((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, ServerType.TABLET_SERVER, ++ Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), ++ Property.TSERV_PORTSEARCH.getKey(), "false"), ++ "-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME); + + // Eventually, the suspended tablets should be reassigned to the newly alive tserver. + log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); + while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + } + assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); + + // Finally, after much longer, remaining suspended tablets should be reassigned. - log.info("Awaiting tablet reassignment for remaining tablets"); ++ log.info("Awaiting tablet reassignment for remaining tablets (suspension timeout)"); + while (ds.hostedCount != TABLETS) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + } + } else { + throw new IllegalStateException("Unknown action " + action); } } } @@@ -281,6 -303,93 +296,47 @@@ throws Exception; } + private class ShutdownTserverKiller implements TServerKiller { + + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) + throws Exception { - Set<TServerInstance> tserverSet = new HashSet<>(); - Set<TServerInstance> metadataServerSet = new HashSet<>(); - - TabletLocator tl = TabletLocator.getLocator(ctx, AccumuloTable.METADATA.tableId()); - for (TabletLocationState tls : locs.locationStates.values()) { - if (tls.current != null) { - // add to set of all servers - tserverSet.add(tls.current.getServerInstance()); - - // get server that the current tablets metadata is on - TabletLocator.TabletLocation tab = - tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false); - // add it to the set of servers with metadata - metadataServerSet.add(new TServerInstance(tab.getTserverLocation(), - Long.valueOf(tab.getTserverSession(), 16))); ++ testGroup.forEach(ts -> { ++ try { ++ ThriftClientTypes.MANAGER.executeVoid(ctx, client -> { ++ log.info("Sending shutdown command to {} via ManagerClientService", ts); ++ client.shutdownTabletServer(null, ctx.rpcCreds(), ts, false); ++ }); ++ } catch (AccumuloSecurityException | AccumuloException e) { ++ throw new RuntimeException("Error calling shutdownTabletServer for " + ts, e); + } - } - - // remove servers with metadata on them from the list of servers to be shutdown - assertEquals(1, metadataServerSet.size(), "Expecting a single tServer in metadataServerSet"); - tserverSet.removeAll(metadataServerSet); - - assertEquals(TSERVERS - 1, tserverSet.size(), - "Expecting " + (TSERVERS - 1) + " tServers in shutdown-list"); ++ }); + - List<TServerInstance> tserversList = new ArrayList<>(tserverSet); - Collections.shuffle(tserversList, RANDOM.get()); - - for (int i1 = 0; i1 < count; ++i1) { - final String tserverName = tserversList.get(i1).getHostPortSession(); - ThriftClientTypes.MANAGER.executeVoid(ctx, client -> { - log.info("Sending shutdown command to {} via ManagerClientService", tserverName); - client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false); - }); ++ try (AccumuloClient client = ++ Accumulo.newClient().from(getCluster().getClientProperties()).build()) { ++ Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1); + } + - log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es"); - for (int i2 = 0; i2 < 10; ++i2) { - List<ProcessReference> deadProcs = new ArrayList<>(); - for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { - Process p = pr1.getProcess(); - if (!p.isAlive()) { - deadProcs.add(pr1); - } - } - for (ProcessReference pr2 : deadProcs) { - log.info("Process {} is dead, informing cluster control about this", pr2.getProcess()); - getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2); - --count; - } - if (count == 0) { - return; - } else { - Thread.sleep(SECONDS.toMillis(2)); - } - } - throw new IllegalStateException("Tablet servers didn't die!"); + } + } + + private class CrashTserverKiller implements TServerKiller { + + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) + throws Exception { - // Exclude the tablet server hosting the metadata table from the list and only - // kill tablet servers that are not hosting the metadata table. - List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER) - .stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList()); - Collections.shuffle(procs, RANDOM.get()); - assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist"); - assertTrue(procs.size() >= count, "Attempting to kill more tservers (" + count - + ") than exist in the cluster (" + procs.size() + ")"); - - for (int i = 0; i < count; ++i) { - ProcessReference pr = procs.get(i); - log.info("Crashing {}", pr.getProcess()); - getCluster().killProcess(ServerType.TABLET_SERVER, pr); - } ++ tabletServerProcesses.forEach(proc -> { ++ try { ++ log.info("Killing processes: {}", proc); ++ ((MiniAccumuloClusterImpl) getCluster()).getClusterControl() ++ .killProcess(ServerType.TABLET_SERVER, proc); ++ } catch (ProcessNotFoundException | InterruptedException e) { ++ throw new RuntimeException("Error killing process: " + proc, e); ++ } ++ }); + } + } + private static final AtomicInteger threadCounter = new AtomicInteger(0); @BeforeAll