This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit d7348ae5219e40f7e960eb585b14960a1a242bcd
Merge: 58be770cb5 ea5afb9d85
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Feb 22 19:24:44 2024 -0500

    Merge branch 'main' into elasticity

 .../server/manager/state/MetaDataStateStore.java   |   9 +-
 .../accumulo/test/manager/SuspendedTabletsIT.java  | 160 ++++++++++++++-------
 2 files changed, 112 insertions(+), 57 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index 66f9b87a0a,ccb5dc2747..689a667e4b
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@@ -64,22 -67,29 +64,21 @@@ class MetaDataStateStore extends Abstra
    }
  
    @Override
 -  public void setLocations(Collection<Assignment> assignments) throws 
DistributedStoreException {
 -    try (var tabletsMutator = ample.mutateTablets()) {
 -      for (Assignment assignment : assignments) {
 -        TabletMutator tabletMutator = 
tabletsMutator.mutateTablet(assignment.tablet);
 -        tabletMutator.putLocation(Location.current(assignment.server));
 -        ManagerMetadataUtil.updateLastForAssignmentMode(context, 
tabletMutator, assignment.server,
 -            assignment.lastLocation);
 -        tabletMutator.deleteLocation(Location.future(assignment.server));
 -        tabletMutator.deleteSuspension();
 -        tabletMutator.mutate();
 +  public void unsuspend(Collection<TabletMetadata> tablets) throws 
DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (TabletMetadata tm : tablets) {
 +        if (tm.getSuspend() != null) {
-           continue;
++          // ELASTICITY_TODO add conditional mutation check that tls.suspend 
is what currently
++          // exists in the tablet
++          
tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension()
++              .submit(tabletMetadata -> tabletMetadata.getSuspend() == null);
 +        }
- 
-         // ELASTICITY_TODO pending #3314, add conditional mutation check that 
tls.suspend exists
-         
tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension()
-             .submit(tabletMetadata -> tabletMetadata.getSuspend() == null);
        }
 -    } catch (RuntimeException ex) {
 -      throw new DistributedStoreException(ex);
 -    }
 -  }
  
 -  @Override
 -  public void setFutureLocations(Collection<Assignment> assignments)
 -      throws DistributedStoreException {
 -    try (var tabletsMutator = ample.mutateTablets()) {
 -      for (Assignment assignment : assignments) {
 -        tabletsMutator.mutateTablet(assignment.tablet).deleteSuspension()
 -            .putLocation(Location.future(assignment.server)).mutate();
 +      boolean unacceptedConditions = 
tabletsMutator.process().values().stream()
 +          .anyMatch(conditionalResult -> conditionalResult.getStatus() != 
Status.ACCEPTED);
 +      if (unacceptedConditions) {
 +        throw new DistributedStoreException("Some mutations failed to satisfy 
conditions");
        }
      } catch (RuntimeException ex) {
        throw new DistributedStoreException(ex);
diff --cc 
test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
index 9c8dfccc0b,8ebb75378b..03f42fa253
--- 
a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
@@@ -19,11 -19,16 +19,12 @@@
  package org.apache.accumulo.test.manager;
  
  import static java.util.concurrent.TimeUnit.SECONDS;
 -import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertNotNull;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
  import static org.junit.jupiter.api.Assertions.fail;
  
 -import java.net.UnknownHostException;
  import java.time.Duration;
 -import java.util.ArrayList;
 -import java.util.Collection;
 -import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
@@@ -77,18 -78,14 +78,17 @@@ import com.google.common.collect.HashMu
  import com.google.common.collect.SetMultimap;
  import com.google.common.net.HostAndPort;
  
 -public class SuspendedTabletsIT extends ConfigurableMacBase {
 +public class SuspendedTabletsIT extends AccumuloClusterHarness {
    private static final Logger log = 
LoggerFactory.getLogger(SuspendedTabletsIT.class);
    private static ExecutorService THREAD_POOL;
 +  private static final String TEST_GROUP_NAME = "SUSPEND_TEST";
  
    public static final int TSERVERS = 3;
-   public static final long SUSPEND_DURATION = 80;
    public static final int TABLETS = 30;
  
 -  private ProcessReference metadataTserverProcess;
 +  private String defaultGroup;
 +  private Set<String> testGroup = new HashSet<>();
 +  private List<ProcessReference> tabletServerProcesses;
  
    @Override
    protected Duration defaultTimeout() {
@@@ -96,47 -93,76 +96,59 @@@
    }
  
    @Override
 -  public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
fsConf) {
 +    cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, "2");
 +    cfg.setProperty(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, "10s");
 +    cfg.setProperty(Property.TSERV_MIGRATE_MAXCONCURRENT, "50");
-     cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s");
      cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s");
      cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
 -    // Start with 1 tserver, we'll increase that later
 -    cfg.setNumTservers(1);
 -    // config custom balancer to keep all metadata on one server
 -    cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, 
"1ms");
 -    cfg.setProperty(Property.MANAGER_TABLET_BALANCER.getKey(),
 -        HostAndPortRegexTableLoadBalancer.class.getName());
 +    cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
    }
  
 -  @Override
    @BeforeEach
    public void setUp() throws Exception {
 -    super.setUp();
 -
 -    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
 -      // Wait for all tablet servers to come online and then choose the first 
server in the list.
 -      // Update the balancer configuration to assign all metadata tablets to 
that server (and
 -      // everything else to other servers).
 -      InstanceOperations iops = client.instanceOperations();
 -      List<String> tservers = iops.getTabletServers();
 -      while (tservers == null || tservers.size() < 1) {
 -        Thread.sleep(1000L);
 -        tservers = client.instanceOperations().getTabletServers();
 -      }
 -      HostAndPort metadataServer = HostAndPort.fromString(tservers.get(0));
 -      log.info("Configuring balancer to assign all metadata tablets to {}", 
metadataServer);
 -      iops.setProperty(
 -          HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + 
AccumuloTable.METADATA.tableName(),
 -          metadataServer.toString());
  
 -      // Wait for the balancer to assign all metadata tablets to the chosen 
server.
 -      ClientContext ctx = (ClientContext) client;
 -      TabletLocations tl = TabletLocations.retrieve(ctx, 
AccumuloTable.METADATA.tableName(),
 -          AccumuloTable.ROOT.tableName());
 -      while (tl.hosted.keySet().size() != 1 || 
!tl.hosted.containsKey(metadataServer)) {
 -        log.info("Metadata tablets are not hosted on the correct server. 
Waiting for balancer...");
 -        Thread.sleep(1000L);
 -        tl = TabletLocations.retrieve(ctx, AccumuloTable.METADATA.tableName(),
 -            AccumuloTable.ROOT.tableName());
 +    MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
 +    ProcessReference defaultTabletServer =
 +        mac.getProcesses().get(ServerType.TABLET_SERVER).iterator().next();
 +    assertNotNull(defaultTabletServer);
 +
 +    
mac.getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(TEST_GROUP_NAME,
 +        2);
 +    getCluster().start();
 +
 +    tabletServerProcesses = 
mac.getProcesses().get(ServerType.TABLET_SERVER).stream()
 +        .filter(p -> 
!p.equals(defaultTabletServer)).collect(Collectors.toList());
 +
 +    Map<String,String> hostAndGroup = 
TabletResourceGroupBalanceIT.getTServerGroups(mac);
 +    hostAndGroup.forEach((k, v) -> {
 +      if (v.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME)) {
 +        defaultGroup = k;
 +      } else {
 +        testGroup.add(k);
        }
 -      log.info("Metadata tablets are now hosted on {}", metadataServer);
 -    }
 +    });
  
 -    // Since we started only a single tablet server, we know it's the one 
hosting the
 -    // metadata table. Save its process reference off so we can exclude it 
later when
 -    // killing tablet servers.
 -    Collection<ProcessReference> procs = 
getCluster().getProcesses().get(ServerType.TABLET_SERVER);
 -    assertEquals(1, procs.size(), "Expected a single tserver process");
 -    metadataTserverProcess = procs.iterator().next();
 +    assertNotNull(defaultGroup);
 +    assertEquals(2, testGroup.size());
  
 -    // Update the number of tservers and start the new tservers.
 -    getCluster().getConfig().setNumTservers(TSERVERS);
 -    getCluster().start();
 +    log.info("TabletServers in default group: {}", defaultGroup);
 +    log.info("TabletServers in {} group: {}", TEST_GROUP_NAME, testGroup);
    }
  
+   enum AfterSuspendAction {
+     RESUME("80s"),
+     // Set a long suspend time for testing offline table, want the suspension 
to be cleared because
+     // the tablet went offline and not the because the suspension timed out.
+     OFFLINE("800s");
+ 
+     public final String suspendTime;
+ 
+     AfterSuspendAction(String suspendTime) {
+       this.suspendTime = suspendTime;
+     }
+   }
+ 
    @Test
    public void crashAndResumeTserver() throws Exception {
      // Run the test body. When we get to the point where we need a tserver to 
go away, get rid of it
@@@ -184,9 -196,9 +182,10 @@@
     *
     * @param serverStopper callback which shuts down some tablet servers.
     */
-   private void suspensionTestBody(TServerKiller serverStopper) throws 
Exception {
+   private void suspensionTestBody(TServerKiller serverStopper, 
AfterSuspendAction action)
+       throws Exception {
 -    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
 +    try (AccumuloClient client =
 +        
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
        ClientContext ctx = (ClientContext) client;
  
        String tableName = getUniqueNames(1)[0];
@@@ -196,12 -208,8 +195,13 @@@
          splitPoints.add(new Text("" + i));
        }
        log.info("Creating table " + tableName);
 -      NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(splitPoints);
 -      ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), 
action.suspendTime));
 +      Map<String,String> properties = new HashMap<>();
 +      properties.put("table.custom.assignment.group", TEST_GROUP_NAME);
++      properties.put(Property.TABLE_SUSPEND_DURATION.getKey(), 
action.suspendTime);
 +
 +      NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(splitPoints)
 +          .withInitialTabletAvailability(TabletAvailability.HOSTED);
 +      ntc.setProperties(properties);
        ctx.tableOperations().create(tableName, ntc);
  
        // Wait for all of the tablets to hosted ...
@@@ -251,27 -257,43 +251,42 @@@
          assertEquals(beforeDeathState.hosted.get(server), 
deadTabletsByServer.get(server));
        }
        assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount);
-       // Restart the first tablet server, making sure it ends up on the same 
port
-       HostAndPort restartedServer = 
deadTabletsByServer.keySet().iterator().next();
-       log.info("Restarting " + restartedServer);
-       ((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, 
ServerType.TABLET_SERVER,
-           Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + 
restartedServer.getPort(),
-               Property.TSERV_PORTSEARCH.getKey(), "false"),
-           "-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME);
- 
-       // Eventually, the suspended tablets should be reassigned to the newly 
alive tserver.
-       log.info("Awaiting tablet unsuspension for tablets belonging to " + 
restartedServer);
-       while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 
0) {
-         Thread.sleep(1000);
-         ds = TabletLocations.retrieve(ctx, tableName);
-       }
-       assertEquals(deadTabletsByServer.get(restartedServer), 
ds.hosted.get(restartedServer));
  
-       // Finally, after much longer, remaining suspended tablets should be 
reassigned.
-       log.info("Awaiting tablet reassignment for remaining tablets 
(suspension timeout)");
-       while (ds.hostedCount != TABLETS) {
-         Thread.sleep(1000);
-         ds = TabletLocations.retrieve(ctx, tableName);
+       assertTrue(ds.suspendedCount > 0);
+ 
+       if (action == AfterSuspendAction.OFFLINE) {
+         client.tableOperations().offline(tableName, true);
+ 
+         while (ds.suspendedCount > 0) {
+           Thread.sleep(1000);
+           ds = TabletLocations.retrieve(ctx, tableName);
+           log.info("Waiting for suspended {}", ds.suspended);
+         }
+       } else if (action == AfterSuspendAction.RESUME) {
+         // Restart the first tablet server, making sure it ends up on the 
same port
+         HostAndPort restartedServer = 
deadTabletsByServer.keySet().iterator().next();
+         log.info("Restarting " + restartedServer);
 -        getCluster().getClusterControl()
 -            .start(
 -                ServerType.TABLET_SERVER, 
Map.of(Property.TSERV_CLIENTPORT.getKey(),
 -                    "" + restartedServer.getPort(), 
Property.TSERV_PORTSEARCH.getKey(), "false"),
 -                1);
++        ((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, 
ServerType.TABLET_SERVER,
++            Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + 
restartedServer.getPort(),
++                Property.TSERV_PORTSEARCH.getKey(), "false"),
++            "-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME);
+ 
+         // Eventually, the suspended tablets should be reassigned to the 
newly alive tserver.
+         log.info("Awaiting tablet unsuspension for tablets belonging to " + 
restartedServer);
+         while (ds.suspended.containsKey(restartedServer) || ds.assignedCount 
!= 0) {
+           Thread.sleep(1000);
+           ds = TabletLocations.retrieve(ctx, tableName);
+         }
+         assertEquals(deadTabletsByServer.get(restartedServer), 
ds.hosted.get(restartedServer));
+ 
+         // Finally, after much longer, remaining suspended tablets should be 
reassigned.
 -        log.info("Awaiting tablet reassignment for remaining tablets");
++        log.info("Awaiting tablet reassignment for remaining tablets 
(suspension timeout)");
+         while (ds.hostedCount != TABLETS) {
+           Thread.sleep(1000);
+           ds = TabletLocations.retrieve(ctx, tableName);
+         }
+       } else {
+         throw new IllegalStateException("Unknown action " + action);
        }
      }
    }
@@@ -281,6 -303,93 +296,47 @@@
          throws Exception;
    }
  
+   private class ShutdownTserverKiller implements TServerKiller {
+ 
+     @Override
+     public void eliminateTabletServers(ClientContext ctx, TabletLocations 
locs, int count)
+         throws Exception {
 -      Set<TServerInstance> tserverSet = new HashSet<>();
 -      Set<TServerInstance> metadataServerSet = new HashSet<>();
 -
 -      TabletLocator tl = TabletLocator.getLocator(ctx, 
AccumuloTable.METADATA.tableId());
 -      for (TabletLocationState tls : locs.locationStates.values()) {
 -        if (tls.current != null) {
 -          // add to set of all servers
 -          tserverSet.add(tls.current.getServerInstance());
 -
 -          // get server that the current tablets metadata is on
 -          TabletLocator.TabletLocation tab =
 -              tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false);
 -          // add it to the set of servers with metadata
 -          metadataServerSet.add(new TServerInstance(tab.getTserverLocation(),
 -              Long.valueOf(tab.getTserverSession(), 16)));
++      testGroup.forEach(ts -> {
++        try {
++          ThriftClientTypes.MANAGER.executeVoid(ctx, client -> {
++            log.info("Sending shutdown command to {} via 
ManagerClientService", ts);
++            client.shutdownTabletServer(null, ctx.rpcCreds(), ts, false);
++          });
++        } catch (AccumuloSecurityException | AccumuloException e) {
++          throw new RuntimeException("Error calling shutdownTabletServer for 
" + ts, e);
+         }
 -      }
 -
 -      // remove servers with metadata on them from the list of servers to be 
shutdown
 -      assertEquals(1, metadataServerSet.size(), "Expecting a single tServer 
in metadataServerSet");
 -      tserverSet.removeAll(metadataServerSet);
 -
 -      assertEquals(TSERVERS - 1, tserverSet.size(),
 -          "Expecting " + (TSERVERS - 1) + " tServers in shutdown-list");
++      });
+ 
 -      List<TServerInstance> tserversList = new ArrayList<>(tserverSet);
 -      Collections.shuffle(tserversList, RANDOM.get());
 -
 -      for (int i1 = 0; i1 < count; ++i1) {
 -        final String tserverName = tserversList.get(i1).getHostPortSession();
 -        ThriftClientTypes.MANAGER.executeVoid(ctx, client -> {
 -          log.info("Sending shutdown command to {} via ManagerClientService", 
tserverName);
 -          client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, 
false);
 -        });
++      try (AccumuloClient client =
++          
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
++        Wait.waitFor(() -> 
client.instanceOperations().getTabletServers().size() == 1);
+       }
+ 
 -      log.info("Waiting for tserver process{} to die", count == 1 ? "" : 
"es");
 -      for (int i2 = 0; i2 < 10; ++i2) {
 -        List<ProcessReference> deadProcs = new ArrayList<>();
 -        for (ProcessReference pr1 : 
getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
 -          Process p = pr1.getProcess();
 -          if (!p.isAlive()) {
 -            deadProcs.add(pr1);
 -          }
 -        }
 -        for (ProcessReference pr2 : deadProcs) {
 -          log.info("Process {} is dead, informing cluster control about 
this", pr2.getProcess());
 -          
getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2);
 -          --count;
 -        }
 -        if (count == 0) {
 -          return;
 -        } else {
 -          Thread.sleep(SECONDS.toMillis(2));
 -        }
 -      }
 -      throw new IllegalStateException("Tablet servers didn't die!");
+     }
+   }
+ 
+   private class CrashTserverKiller implements TServerKiller {
+ 
+     @Override
+     public void eliminateTabletServers(ClientContext ctx, TabletLocations 
locs, int count)
+         throws Exception {
 -      // Exclude the tablet server hosting the metadata table from the list 
and only
 -      // kill tablet servers that are not hosting the metadata table.
 -      List<ProcessReference> procs = 
getCluster().getProcesses().get(ServerType.TABLET_SERVER)
 -          .stream().filter(p -> 
!metadataTserverProcess.equals(p)).collect(Collectors.toList());
 -      Collections.shuffle(procs, RANDOM.get());
 -      assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist");
 -      assertTrue(procs.size() >= count, "Attempting to kill more tservers (" 
+ count
 -          + ") than exist in the cluster (" + procs.size() + ")");
 -
 -      for (int i = 0; i < count; ++i) {
 -        ProcessReference pr = procs.get(i);
 -        log.info("Crashing {}", pr.getProcess());
 -        getCluster().killProcess(ServerType.TABLET_SERVER, pr);
 -      }
++      tabletServerProcesses.forEach(proc -> {
++        try {
++          log.info("Killing processes: {}", proc);
++          ((MiniAccumuloClusterImpl) getCluster()).getClusterControl()
++              .killProcess(ServerType.TABLET_SERVER, proc);
++        } catch (ProcessNotFoundException | InterruptedException e) {
++          throw new RuntimeException("Error killing process: " + proc, e);
++        }
++      });
+     }
+   }
+ 
    private static final AtomicInteger threadCounter = new AtomicInteger(0);
  
    @BeforeAll

Reply via email to