This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 094c9b09d5 Add tests for CompactionDriver cleanupTabletMetadata (#4798) 094c9b09d5 is described below commit 094c9b09d5c0004c833a8b0de289b6f824206790 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Mon Aug 12 08:09:51 2024 -0400 Add tests for CompactionDriver cleanupTabletMetadata (#4798) This commit adds tests using TestAmple to verify that the cleanupTabletMetadata method, which is called from undo, properly cleans up any compaction metadata for a given fate Id across the tablets that match the range provided to the CompactionDriver This closes #4795 --- .../manager/tableOps/compact/CompactionDriver.java | 2 +- .../compaction/ExternalCompactionTestUtils.java | 27 +++-- .../apache/accumulo/test/fate/ManagerRepoIT.java | 134 +++++++++++++++++++++ 3 files changed, 152 insertions(+), 11 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 74fd66b49d..990b1a0d74 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -class CompactionDriver extends ManagerRepo { +public class CompactionDriver extends ManagerRepo { private static final Logger log = LoggerFactory.getLogger(CompactionDriver.class); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index 20e6efc499..256751b036 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -370,18 +370,25 @@ public class ExternalCompactionTestUtils { public static void assertNoCompactionMetadata(ServerContext ctx, String tableName) { var tableId = TableId.of(ctx.tableOperations().tableIdMap().get(tableName)); try (var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build()) { + assertNoCompactionMetadata(tabletsMetadata); + } + } - int count = 0; + public static void assertNoCompactionMetadata(TabletsMetadata tabletsMetadata) { + int count = 0; - for (var tabletMetadata : tabletsMetadata) { - assertEquals(Set.of(), tabletMetadata.getCompacted()); - assertNull(tabletMetadata.getSelectedFiles()); - assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); - assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested()); - count++; - } - - assertTrue(count > 0); + for (var tabletMetadata : tabletsMetadata) { + assertNoCompactionMetadata(tabletMetadata); + count++; } + + assertTrue(count > 0); + } + + public static void assertNoCompactionMetadata(TabletMetadata tabletMetadata) { + assertEquals(Set.of(), tabletMetadata.getCompacted()); + assertNull(tabletMetadata.getSelectedFiles()); + assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); + assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java index 50ee816e86..a0cf45ee79 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.client.ConditionalWriter.Status.UNKNOWN; import static org.apache.accumulo.test.ample.TestAmpleUtil.mockWithAmple; import static org.apache.accumulo.test.ample.metadata.ConditionalWriterInterceptor.withStatus; import static org.apache.accumulo.test.ample.metadata.TestAmple.not; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -31,10 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; @@ -43,19 +49,28 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.compact.CompactionDriver; import org.apache.accumulo.manager.tableOps.merge.DeleteRows; import org.apache.accumulo.manager.tableOps.merge.MergeInfo; import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; @@ -74,6 +89,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; + +import com.google.common.collect.Sets; public class ManagerRepoIT extends SharedMiniClusterBase { @@ -307,6 +325,122 @@ public class ManagerRepoIT extends SharedMiniClusterBase { } } + @ParameterizedTest + @MethodSource("compactionDriverRanges") + public void testCompactionDriverCleanup(Pair<Text,Text> rangeText) throws Exception { + // Create a range for the test and generate table names + String[] tableNames = getUniqueNames(2); + var range = new Range(rangeText.getFirst(), true, rangeText.getSecond(), false); + var tableSuffix = (range.isInfiniteStartKey() ? "" : rangeText.getFirst().toString()) + + (range.isInfiniteStopKey() ? "" : rangeText.getSecond().toString()); + String metadataTable = tableNames[0] + tableSuffix; + String userTable = tableNames[1] + tableSuffix; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + + // Create a table with 4 splits + var splits = Sets.newTreeSet(Arrays.asList(new Text("d"), new Text("m"), new Text("s"))); + client.tableOperations().create(userTable, new NewTableConfiguration().withSplits(splits)); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + // Set up Test ample and manager + TestAmple.createMetadataTable(client, metadataTable); + TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + testAmple.createMetadataFromExisting(client, tableId); + Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + var ctx = manager.getContext(); + + // Create the CompactionDriver to test with the given range passed into the method + final ManagerRepo repo = new CompactionDriver(ctx.getNamespaceId(tableId), tableId, + !range.isInfiniteStartKey() ? range.getStartKey().getRow().getBytes() : null, + !range.isInfiniteStopKey() ? range.getEndKey().getRow().getBytes() : null); + + // Create a couple random fateIds and generate compaction metadata for + // the first fateId for all 4 tablets in the table + var fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + var fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + createCompactionMetadata(testAmple, tableId, fateId1); + + // Verify there are 4 tablets and each tablet has compaction metadata generated + // for the first fateId and store all the extents in a set + Set<KeyExtent> extents = new HashSet<>(); + try (TabletsMetadata tabletsMetadata = testAmple.readTablets().forTable(tableId).build()) { + assertEquals(4, tabletsMetadata.stream().count()); + tabletsMetadata.forEach(tm -> { + extents.add(tm.getExtent()); + assertHasCompactionMetadata(fateId1, tm); + }); + } + assertEquals(4, extents.size()); + + // First call undo using the second fateId and verify there's still metadata for the first one + repo.undo(fateId2, manager); + try (TabletsMetadata tabletsMetadata = testAmple.readTablets().forTable(tableId).build()) { + tabletsMetadata.forEach(tm -> { + assertHasCompactionMetadata(fateId1, tm); + }); + } + + // Now call undo on the first fateId which would clean up all the metadata for all the + // tablets that overlap with the given range that was provided to the CompactionDriver + // during the creation of the repo + repo.undo(fateId1, manager); + + // First, iterate over only the overlapping tablets and verify that those tablets + // were cleaned up and remove any visited tablets from the extents set + try (var tabletsMetadata = testAmple.readTablets().forTable(tableId) + .overlapping(rangeText.getFirst(), rangeText.getSecond()).build()) { + tabletsMetadata.forEach(tm -> { + extents.remove(tm.getExtent()); + assertNoCompactionMetadata(tm); + }); + } + + // Second, for any remaining tablets that did not overlap the range provided, + // verify that metadata still exists as the CompactionDriver should not have cleaned + // up tablets that did not overlap the given range + try (var tabletsMetadata = + testAmple.readTablets().forTablets(extents, Optional.empty()).build()) { + tabletsMetadata.forEach(tm -> { + extents.remove(tm.getExtent()); + assertHasCompactionMetadata(fateId1, tm); + }); + } + + // Verify all the tablets in the table were checked for correct metadata after undo + assertTrue(extents.isEmpty()); + } + } + + private void createCompactionMetadata(Ample testAmple, TableId tableId, FateId fateId) { + var stf1 = StoredTabletFile.of(new org.apache.hadoop.fs.Path( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000050.rf")); + var stf2 = StoredTabletFile.of(new org.apache.hadoop.fs.Path( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + + try (TabletsMetadata tabletsMetadata = testAmple.readTablets().forTable(tableId).build(); + TabletsMutator tabletsMutator = testAmple.mutateTablets()) { + var selectedFiles = new SelectedFiles(Set.of(stf1, stf2), true, fateId, + SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + tabletsMetadata.forEach(tm -> tabletsMutator.mutateTablet(tm.getExtent()).putCompacted(fateId) + .putUserCompactionRequested(fateId).putSelectedFiles(selectedFiles).mutate()); + } + } + + private void assertHasCompactionMetadata(FateId fateId, TabletMetadata tm) { + assertEquals(Set.of(fateId), tm.getCompacted()); + assertNotNull(tm.getSelectedFiles()); + assertEquals(Set.of(fateId), tm.getUserCompactionsRequested()); + } + + // Create a few ranges to test the CompactionDriver cleanup against + private static Stream<Pair<Text,Text>> compactionDriverRanges() { + return Stream.of(new Pair<>(null, null), new Pair<>(null, new Text("d")), + new Pair<>(new Text("dd"), new Text("mm")), new Pair<>(new Text("s"), null)); + } + private void createUnsplittableTable(ClientContext client, String table) throws Exception { // make a table and lower the configuration properties // @formatter:off