This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 5cb7f7b87b Call requireSame(SUSPEND) in MetaDataStateStore to resolve 
todo (#4381)
5cb7f7b87b is described below

commit 5cb7f7b87b6f6b7b69dbdd6e170e537274d1d717
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Mar 19 10:25:04 2024 -0400

    Call requireSame(SUSPEND) in MetaDataStateStore to resolve todo (#4381)
---
 .../MiniAccumuloClusterControl.java                | 20 +++++
 .../server/manager/state/MetaDataStateStore.java   |  7 +-
 .../metadata/ConditionalTabletMutatorImpl.java     | 11 +++
 .../test/functional/AmpleConditionalWriterIT.java  | 85 ++++++++++++++++++++++
 4 files changed, 120 insertions(+), 3 deletions(-)

diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 9bf50cff2e..1e10be5faa 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -274,6 +274,26 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
     }
   }
 
+  public void stopTabletServerGroup(String tserverResourceGroup) {
+    synchronized (tabletServerProcesses) {
+      var group = tabletServerProcesses.get(tserverResourceGroup);
+      if (group == null) {
+        return;
+      }
+      group.forEach(process -> {
+        try {
+          cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
+        } catch (ExecutionException | TimeoutException e) {
+          log.warn("TabletServer did not fully stop after 30 seconds", e);
+          throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      });
+      tabletServerProcesses.remove(tserverResourceGroup);
+    }
+  }
+
   @Override
   public synchronized void stop(ServerType server, String hostname) throws 
IOException {
     switch (server) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index 689a667e4b..f69f280ed4 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.server.manager.state;
 
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
+
 import java.util.Collection;
 import java.util.List;
 
@@ -68,9 +70,8 @@ class MetaDataStateStore extends AbstractTabletStateStore 
implements TabletState
     try (var tabletsMutator = ample.conditionallyMutateTablets()) {
       for (TabletMetadata tm : tablets) {
         if (tm.getSuspend() != null) {
-          // ELASTICITY_TODO add conditional mutation check that tls.suspend 
is what currently
-          // exists in the tablet
-          
tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension()
+          tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation()
+              .requireSame(tm, SUSPEND).deleteSuspension()
               .submit(tabletMetadata -> tabletMetadata.getSuspend() == null);
         }
       }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 6003fd73e6..6995941089 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.AVAILABILITY_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;
@@ -239,6 +240,16 @@ public class ConditionalTabletMutatorImpl extends 
TabletMutatorBase<Ample.Condit
         mutation.addCondition(c);
       }
         break;
+      case SUSPEND: {
+        Condition c =
+            new Condition(SUSPEND_COLUMN.getColumnFamily(), 
SUSPEND_COLUMN.getColumnQualifier());
+        if (tabletMetadata.getSuspend() != null) {
+          c.setValue(tabletMetadata.getSuspend().server + "|"
+              + tabletMetadata.getSuspend().suspensionTime);
+        }
+        mutation.addCondition(c);
+      }
+        break;
       default:
         throw new UnsupportedOperationException("Column type " + type + " is 
not supported.");
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 48255dc82a..c6445d62a6 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -30,12 +30,14 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED;
 import static org.apache.accumulo.core.util.LazySingletons.GSON;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -61,7 +63,9 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
@@ -75,6 +79,7 @@ import 
org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.SuspendingTServer;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
@@ -89,17 +94,22 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationId;
 import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
 import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
@@ -1335,4 +1345,79 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testSuspendMarker() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      final String SUSPEND_RG = "SUSPEND";
+
+      MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) getCluster();
+      
cluster.getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(SUSPEND_RG,
+          1);
+      cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+      List<Process> processes = 
cluster.getClusterControl().getTabletServers(SUSPEND_RG);
+      assertNotNull(processes);
+      assertEquals(1, processes.size());
+
+      String tableName = getUniqueNames(2)[1];
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
+      ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), "30s",
+          TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY, SUSPEND_RG));
+      c.tableOperations().create(tableName, ntc);
+
+      c.instanceOperations().waitForBalance();
+
+      TableId suspendTableTid = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      TabletMetadata originalTM = null;
+      try (TabletsMetadata tms = 
TabletsMetadata.builder(c).forTable(suspendTableTid).build()) {
+        assertEquals(1, Iterables.size(tms));
+        originalTM = tms.iterator().next();
+        assertNull(originalTM.getSuspend());
+      }
+
+      cluster.getClusterControl().stopTabletServerGroup(SUSPEND_RG);
+
+      Wait.waitFor(() -> getSuspendedColumn(c, suspendTableTid) != null, 
60_000);
+
+      try (var tabletsMutator = 
getServerContext().getAmple().conditionallyMutateTablets()) {
+        
tabletsMutator.mutateTablet(originalTM.getExtent()).requireAbsentOperation()
+            .requireSame(originalTM, 
SUSPEND).putTabletAvailability(TabletAvailability.ONDEMAND)
+            .submit(tabletMetadata -> false);
+
+        // This should fail because the original tablet metadata does not have 
a suspend column
+        // and the current tablet metadata does.
+        
assertTrue(tabletsMutator.process().get(originalTM.getExtent()).getStatus()
+            .equals(Status.REJECTED));
+
+      }
+
+      cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+
+      Wait.waitFor(() -> getSuspendedColumn(c, suspendTableTid) == null, 
60_000);
+
+      try (var tabletsMutator = 
getServerContext().getAmple().conditionallyMutateTablets()) {
+        
tabletsMutator.mutateTablet(originalTM.getExtent()).requireAbsentOperation()
+            .requireSame(originalTM, 
SUSPEND).putTabletAvailability(TabletAvailability.ONDEMAND)
+            .submit(tabletMetadata -> false);
+
+        // This should succeed because the original tablet metadata does not 
have a suspend column
+        // and the current tablet metadata does not also because the tablet 
server for the SUSPEND
+        // resource group was restarted.
+        
assertTrue(tabletsMutator.process().get(originalTM.getExtent()).getStatus()
+            .equals(Status.ACCEPTED));
+
+      }
+    }
+  }
+
+  private static SuspendingTServer getSuspendedColumn(AccumuloClient c, 
TableId tid) {
+    try (TabletsMetadata tms = 
TabletsMetadata.builder(c).forTable(tid).build()) {
+      assertEquals(1, Iterables.size(tms));
+      TabletMetadata tm = tms.iterator().next();
+      return tm.getSuspend();
+    }
+  }
+
 }

Reply via email to