This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5cb7f7b87b Call requireSame(SUSPEND) in MetaDataStateStore to resolve todo (#4381) 5cb7f7b87b is described below commit 5cb7f7b87b6f6b7b69dbdd6e170e537274d1d717 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 19 10:25:04 2024 -0400 Call requireSame(SUSPEND) in MetaDataStateStore to resolve todo (#4381) --- .../MiniAccumuloClusterControl.java | 20 +++++ .../server/manager/state/MetaDataStateStore.java | 7 +- .../metadata/ConditionalTabletMutatorImpl.java | 11 +++ .../test/functional/AmpleConditionalWriterIT.java | 85 ++++++++++++++++++++++ 4 files changed, 120 insertions(+), 3 deletions(-) diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 9bf50cff2e..1e10be5faa 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -274,6 +274,26 @@ public class MiniAccumuloClusterControl implements ClusterControl { } } + public void stopTabletServerGroup(String tserverResourceGroup) { + synchronized (tabletServerProcesses) { + var group = tabletServerProcesses.get(tserverResourceGroup); + if (group == null) { + return; + } + group.forEach(process -> { + try { + cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + log.warn("TabletServer did not fully stop after 30 seconds", e); + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + tabletServerProcesses.remove(tserverResourceGroup); + } + } + @Override public synchronized void stop(ServerType server, String hostname) throws IOException { switch (server) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 689a667e4b..f69f280ed4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.manager.state; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; + import java.util.Collection; import java.util.List; @@ -68,9 +70,8 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState try (var tabletsMutator = ample.conditionallyMutateTablets()) { for (TabletMetadata tm : tablets) { if (tm.getSuspend() != null) { - // ELASTICITY_TODO add conditional mutation check that tls.suspend is what currently - // exists in the tablet - tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension() + tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation() + .requireSame(tm, SUSPEND).deleteSuspension() .submit(tabletMetadata -> tabletMetadata.getSuspend() == null); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 6003fd73e6..6995941089 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.AVAILABILITY_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow; @@ -239,6 +240,16 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit mutation.addCondition(c); } break; + case SUSPEND: { + Condition c = + new Condition(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier()); + if (tabletMetadata.getSuspend() != null) { + c.setValue(tabletMetadata.getSuspend().server + "|" + + tabletMetadata.getSuspend().suspensionTime); + } + mutation.addCondition(c); + } + break; default: throw new UnsupportedOperationException("Column type " + type + " is not supported."); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 48255dc82a..c6445d62a6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -30,12 +30,14 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.apache.accumulo.core.util.LazySingletons.GSON; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -61,7 +63,9 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -75,6 +79,7 @@ import org.apache.accumulo.core.iterators.user.TabletMetadataFilter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.SuspendingTServer; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; @@ -89,17 +94,22 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl; import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; public class AmpleConditionalWriterIT extends AccumuloClusterHarness { @@ -1335,4 +1345,79 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } } + @Test + public void testSuspendMarker() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + final String SUSPEND_RG = "SUSPEND"; + + MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) getCluster(); + cluster.getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(SUSPEND_RG, + 1); + cluster.getClusterControl().start(ServerType.TABLET_SERVER); + List<Process> processes = cluster.getClusterControl().getTabletServers(SUSPEND_RG); + assertNotNull(processes); + assertEquals(1, processes.size()); + + String tableName = getUniqueNames(2)[1]; + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), "30s", + TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY, SUSPEND_RG)); + c.tableOperations().create(tableName, ntc); + + c.instanceOperations().waitForBalance(); + + TableId suspendTableTid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + TabletMetadata originalTM = null; + try (TabletsMetadata tms = TabletsMetadata.builder(c).forTable(suspendTableTid).build()) { + assertEquals(1, Iterables.size(tms)); + originalTM = tms.iterator().next(); + assertNull(originalTM.getSuspend()); + } + + cluster.getClusterControl().stopTabletServerGroup(SUSPEND_RG); + + Wait.waitFor(() -> getSuspendedColumn(c, suspendTableTid) != null, 60_000); + + try (var tabletsMutator = getServerContext().getAmple().conditionallyMutateTablets()) { + tabletsMutator.mutateTablet(originalTM.getExtent()).requireAbsentOperation() + .requireSame(originalTM, SUSPEND).putTabletAvailability(TabletAvailability.ONDEMAND) + .submit(tabletMetadata -> false); + + // This should fail because the original tablet metadata does not have a suspend column + // and the current tablet metadata does. + assertTrue(tabletsMutator.process().get(originalTM.getExtent()).getStatus() + .equals(Status.REJECTED)); + + } + + cluster.getClusterControl().start(ServerType.TABLET_SERVER); + + Wait.waitFor(() -> getSuspendedColumn(c, suspendTableTid) == null, 60_000); + + try (var tabletsMutator = getServerContext().getAmple().conditionallyMutateTablets()) { + tabletsMutator.mutateTablet(originalTM.getExtent()).requireAbsentOperation() + .requireSame(originalTM, SUSPEND).putTabletAvailability(TabletAvailability.ONDEMAND) + .submit(tabletMetadata -> false); + + // This should succeed because the original tablet metadata does not have a suspend column + // and the current tablet metadata does not also because the tablet server for the SUSPEND + // resource group was restarted. + assertTrue(tabletsMutator.process().get(originalTM.getExtent()).getStatus() + .equals(Status.ACCEPTED)); + + } + } + } + + private static SuspendingTServer getSuspendedColumn(AccumuloClient c, TableId tid) { + try (TabletsMetadata tms = TabletsMetadata.builder(c).forTable(tid).build()) { + assertEquals(1, Iterables.size(tms)); + TabletMetadata tm = tms.iterator().next(); + return tm.getSuspend(); + } + } + }