This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new de237858e2 Fix SuspendedTabletsIT (#4056) de237858e2 is described below commit de237858e279d751ea6d0805af9de34b8e14e7b5 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Dec 12 10:29:03 2023 -0500 Fix SuspendedTabletsIT (#4056) --- .../functional/TabletResourceGroupBalanceIT.java | 12 +- .../accumulo/test/manager/SuspendedTabletsIT.java | 260 +++++++-------------- 2 files changed, 87 insertions(+), 185 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java index 8bbdafa9d5..c27b212557 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java @@ -60,6 +60,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; @@ -99,11 +100,12 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { SharedMiniClusterBase.stopMiniCluster(); } - private Map<String,String> getTServerGroups() throws Exception { + public static Map<String,String> getTServerGroups(MiniAccumuloClusterImpl cluster) + throws Exception { Map<String,String> tservers = new HashMap<>(); - ZooCache zk = getCluster().getServerContext().getZooCache(); - String zpath = getCluster().getServerContext().getZooKeeperRoot() + Constants.ZTSERVERS; + ZooCache zk = cluster.getServerContext().getZooCache(); + String zpath = cluster.getServerContext().getZooKeeperRoot() + Constants.ZTSERVERS; List<String> children = zk.getChildren(zpath); for (String child : children) { @@ -147,7 +149,7 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { client.tableOperations().create(names[1], ntc2); client.instanceOperations().waitForBalance(); - Map<String,String> tserverGroups = getTServerGroups(); + Map<String,String> tserverGroups = getTServerGroups(getCluster()); assertEquals(2, tserverGroups.size()); Ample ample = ((ClientContext) client).getAmple(); @@ -290,7 +292,7 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { assertEquals(numExpectedSplits, getCountOfHostedTablets(client, tableName)); - Map<String,String> tserverGroups = getTServerGroups(); + Map<String,String> tserverGroups = getTServerGroups(getCluster()); LOG.info("Tablet Server groups: {}", tserverGroups); assertEquals(2, tserverGroups.size()); diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index e8c38a5bc4..35397558da 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -19,16 +19,11 @@ package org.apache.accumulo.test.manager; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; -import java.net.UnknownHostException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -44,34 +39,36 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.ClientTabletCache; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.spi.balancer.HostRegexTableLoadBalancer; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException; import org.apache.accumulo.miniclusterImpl.ProcessReference; -import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.functional.TabletResourceGroupBalanceIT; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,16 +77,18 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; import com.google.common.net.HostAndPort; -@Disabled // ELASTICITY_TODO -public class SuspendedTabletsIT extends ConfigurableMacBase { +public class SuspendedTabletsIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class); private static ExecutorService THREAD_POOL; + private static final String TEST_GROUP_NAME = "SUSPEND_TEST"; public static final int TSERVERS = 3; public static final long SUSPEND_DURATION = 80; public static final int TABLETS = 30; - private ProcessReference metadataTserverProcess; + private String defaultGroup; + private Set<String> testGroup = new HashSet<>(); + private List<ProcessReference> tabletServerProcesses; @Override protected Duration defaultTimeout() { @@ -97,62 +96,45 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { } @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration fsConf) { + cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, "2"); + cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, "10s"); + cfg.setProperty(Property.TSERV_MIGRATE_MAXCONCURRENT, "50"); cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s"); cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s"); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - // Start with 1 tserver, we'll increase that later cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); - // config custom balancer to keep all metadata on one server - @SuppressWarnings("deprecation") - String p = HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY; - cfg.setProperty(p, "1ms"); - cfg.setProperty(Property.MANAGER_TABLET_BALANCER.getKey(), - HostAndPortRegexTableLoadBalancer.class.getName()); } - @Override @BeforeEach public void setUp() throws Exception { - super.setUp(); - - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - // Wait for all tablet servers to come online and then choose the first server in the list. - // Update the balancer configuration to assign all metadata tablets to that server (and - // everything else to other servers). - InstanceOperations iops = client.instanceOperations(); - List<String> tservers = iops.getTabletServers(); - while (tservers == null || tservers.size() < 1) { - Thread.sleep(1000L); - tservers = client.instanceOperations().getTabletServers(); - } - HostAndPort metadataServer = HostAndPort.fromString(tservers.get(0)); - log.info("Configuring balancer to assign all metadata tablets to {}", metadataServer); - @SuppressWarnings("deprecation") - String p = HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX; - iops.setProperty(p + MetadataTable.NAME, metadataServer.toString()); - // Wait for the balancer to assign all metadata tablets to the chosen server. - ClientContext ctx = (ClientContext) client; - TabletLocations tl = TabletLocations.retrieve(ctx, MetadataTable.NAME); - while (tl.hosted.keySet().size() != 1 || !tl.hosted.containsKey(metadataServer)) { - log.info("Metadata tablets are not hosted on the correct server. Waiting for balancer..."); - Thread.sleep(1000L); - tl = TabletLocations.retrieve(ctx, MetadataTable.NAME); + MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster(); + ProcessReference defaultTabletServer = + mac.getProcesses().get(ServerType.TABLET_SERVER).iterator().next(); + assertNotNull(defaultTabletServer); + + mac.getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(TEST_GROUP_NAME, + 2); + getCluster().start(); + + tabletServerProcesses = mac.getProcesses().get(ServerType.TABLET_SERVER).stream() + .filter(p -> !p.equals(defaultTabletServer)).collect(Collectors.toList()); + + Map<String,String> hostAndGroup = TabletResourceGroupBalanceIT.getTServerGroups(mac); + hostAndGroup.forEach((k, v) -> { + if (v.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME)) { + defaultGroup = k; + } else { + testGroup.add(k); } - log.info("Metadata tablets are now hosted on {}", metadataServer); - } + }); - // Since we started only a single tablet server, we know it's the one hosting the - // metadata table. Save its process reference off so we can exclude it later when - // killing tablet servers. - Collection<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER); - assertEquals(1, procs.size(), "Expected a single tserver process"); - metadataTserverProcess = procs.iterator().next(); + assertNotNull(defaultGroup); + assertEquals(2, testGroup.size()); - // Update the number of tservers and start the new tservers. - getCluster().getConfig().getClusterServerConfiguration().setNumDefaultTabletServers(TSERVERS); - getCluster().start(); + log.info("TabletServers in default group: {}", defaultGroup); + log.info("TabletServers in {} group: {}", TEST_GROUP_NAME, testGroup); } @Test @@ -160,20 +142,15 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // Run the test body. When we get to the point where we need a tserver to go away, get rid of it // via crashing suspensionTestBody((ctx, locs, count) -> { - // Exclude the tablet server hosting the metadata table from the list and only - // kill tablet servers that are not hosting the metadata table. - List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER) - .stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList()); - Collections.shuffle(procs, RANDOM.get()); - assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist"); - assertTrue(procs.size() >= count, "Attempting to kill more tservers (" + count - + ") than exist in the cluster (" + procs.size() + ")"); - - for (int i = 0; i < count; ++i) { - ProcessReference pr = procs.get(i); - log.info("Crashing {}", pr.getProcess()); - getCluster().killProcess(ServerType.TABLET_SERVER, pr); - } + tabletServerProcesses.forEach(proc -> { + try { + log.info("Killing processes: {}", proc); + ((MiniAccumuloClusterImpl) getCluster()).getClusterControl() + .killProcess(ServerType.TABLET_SERVER, proc); + } catch (ProcessNotFoundException | InterruptedException e) { + throw new RuntimeException("Error killing process: " + proc, e); + } + }); }); } @@ -182,63 +159,23 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // Run the test body. When we get to the point where we need tservers to go away, stop them via // a clean shutdown. suspensionTestBody((ctx, locs, count) -> { - Set<TServerInstance> tserverSet = new HashSet<>(); - Set<TServerInstance> metadataServerSet = new HashSet<>(); - - ClientTabletCache tl = ClientTabletCache.getInstance(ctx, MetadataTable.ID); - for (TabletMetadata tm : locs.locationStates.values()) { - if (tm.hasCurrent()) { - // add to set of all servers - tserverSet.add(tm.getLocation().getServerInstance()); - - // get server that the current tablets metadata is on - ClientTabletCache.CachedTablet tab = tl.findTablet(ctx, tm.getExtent().toMetaRow(), false, - ClientTabletCache.LocationNeed.REQUIRED); - // add it to the set of servers with metadata - metadataServerSet.add(new TServerInstance(tab.getTserverLocation().orElseThrow(), - Long.valueOf(tab.getTserverSession().orElseThrow(), 16))); - } - } - // remove servers with metadata on them from the list of servers to be shutdown - assertEquals(1, metadataServerSet.size(), "Expecting a single tServer in metadataServerSet"); - tserverSet.removeAll(metadataServerSet); - - assertEquals(TSERVERS - 1, tserverSet.size(), - "Expecting " + (TSERVERS - 1) + " tServers in shutdown-list"); - - List<TServerInstance> tserversList = new ArrayList<>(tserverSet); - Collections.shuffle(tserversList, RANDOM.get()); + testGroup.forEach(ts -> { + try { + ThriftClientTypes.MANAGER.executeVoid(ctx, client -> { + log.info("Sending shutdown command to {} via ManagerClientService", ts); + client.shutdownTabletServer(null, ctx.rpcCreds(), ts, false); + }); + } catch (AccumuloSecurityException | AccumuloException e) { + throw new RuntimeException("Error calling shutdownTabletServer for " + ts, e); + } + }); - for (int i1 = 0; i1 < count; ++i1) { - final String tserverName = tserversList.get(i1).getHostPortSession(); - ThriftClientTypes.MANAGER.executeVoid(ctx, client -> { - log.info("Sending shutdown command to {} via ManagerClientService", tserverName); - client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false); - }); + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1); } - log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es"); - for (int i2 = 0; i2 < 10; ++i2) { - List<ProcessReference> deadProcs = new ArrayList<>(); - for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { - Process p = pr1.getProcess(); - if (!p.isAlive()) { - deadProcs.add(pr1); - } - } - for (ProcessReference pr2 : deadProcs) { - log.info("Process {} is dead, informing cluster control about this", pr2.getProcess()); - getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2); - --count; - } - if (count == 0) { - return; - } else { - Thread.sleep(SECONDS.toMillis(2)); - } - } - throw new IllegalStateException("Tablet servers didn't die!"); }); } @@ -248,7 +185,8 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { * @param serverStopper callback which shuts down some tablet servers. */ private void suspensionTestBody(TServerKiller serverStopper) throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { ClientContext ctx = (ClientContext) client; String tableName = getUniqueNames(1)[0]; @@ -258,7 +196,12 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { splitPoints.add(new Text("" + i)); } log.info("Creating table " + tableName); - NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints); + Map<String,String> properties = new HashMap<>(); + properties.put("table.custom.assignment.group", TEST_GROUP_NAME); + + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints) + .withInitialHostingGoal(TabletHostingGoal.ALWAYS); + ntc.setProperties(properties); ctx.tableOperations().create(tableName, ntc); // Wait for all of the tablets to hosted ... @@ -268,9 +211,11 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { ds = TabletLocations.retrieve(ctx, tableName)) { Thread.sleep(1000); } + log.info("Tablets hosted"); // ... and balanced. ctx.instanceOperations().waitForBalance(); + log.info("Tablets balanced."); do { // Keep checking until all tablets are hosted and spread out across the tablet servers Thread.sleep(1000); @@ -281,10 +226,10 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // and some are hosted on each of the tablet servers other than the one reserved for hosting // the metadata table. assertEquals(TSERVERS - 1, ds.hosted.keySet().size()); + log.info("Tablet balance verified."); // Kill two tablet servers hosting our tablets. This should put tablets into suspended state, // and thus halt balancing. - TabletLocations beforeDeathState = ds; log.info("Eliminating tablet servers"); serverStopper.eliminateTabletServers(ctx, beforeDeathState, TSERVERS - 1); @@ -309,10 +254,10 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // Restart the first tablet server, making sure it ends up on the same port HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); log.info("Restarting " + restartedServer); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER, + ((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, ServerType.TABLET_SERVER, Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), - 1); + "-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME); // Eventually, the suspended tablets should be reassigned to the newly alive tserver. log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); @@ -323,7 +268,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); // Finally, after much longer, remaining suspended tablets should be reassigned. - log.info("Awaiting tablet reassignment for remaining tablets"); + log.info("Awaiting tablet reassignment for remaining tablets (suspension timeout)"); while (ds.hostedCount != TABLETS) { Thread.sleep(1000); ds = TabletLocations.retrieve(ctx, tableName); @@ -349,49 +294,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { THREAD_POOL.shutdownNow(); } - /** - * A version of {@link HostRegexTableLoadBalancer} that includes the tablet server port in - * addition to the host name when checking regular expressions. This is useful for testing when - * multiple tablet servers are running on the same host and one wishes to make pools from the - * tablet servers on that host. - */ - @SuppressWarnings("deprecation") - public static class HostAndPortRegexTableLoadBalancer extends HostRegexTableLoadBalancer { - private static final Logger LOG = - LoggerFactory.getLogger(HostAndPortRegexTableLoadBalancer.class.getName()); - - @Override - protected List<String> getPoolNamesForHost(TabletServerId tabletServerId) { - final String host = tabletServerId.getHost(); - String test = host; - if (!isIpBasedRegex()) { - try { - test = getNameFromIp(host); - } catch (UnknownHostException e1) { - LOG.error("Unable to determine host name for IP: " + host + ", setting to default pool", - e1); - return Collections.singletonList(DEFAULT_POOL); - } - } - - // Add the port on the end - final String hostString = test + ":" + tabletServerId.getPort(); - List<String> pools = getPoolNameToRegexPattern().entrySet().stream() - .filter(e -> e.getValue().matcher(hostString).matches()).map(Map.Entry::getKey) - .collect(Collectors.toList()); - if (pools.isEmpty()) { - pools.add(DEFAULT_POOL); - } - return pools; - } - - @Override - public long balance(BalanceParameters params) { - super.balance(params); - return 1000L; // Balance once per second during the test - } - } - private static class TabletLocations { public final Map<KeyExtent,TabletMetadata> locationStates = new HashMap<>(); public final SetMultimap<HostAndPort,KeyExtent> hosted = HashMultimap.create(); @@ -415,7 +317,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { THREAD_POOL.execute(tlsFuture); return tlsFuture.get(5, SECONDS); } catch (TimeoutException ex) { - log.debug("Retrieval timed out", ex); + log.debug("Retrieval timed out."); } catch (Exception ex) { log.warn("Failed to scan metadata", ex); } @@ -451,8 +353,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { } else if (tm.getLocation() != null && tm.getLocation().getType().equals(LocationType.FUTURE)) { ++assignedCount; - } else { - // unassigned case } } }