This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 094c9b09d5 Add tests for CompactionDriver cleanupTabletMetadata (#4798)
094c9b09d5 is described below

commit 094c9b09d5c0004c833a8b0de289b6f824206790
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Mon Aug 12 08:09:51 2024 -0400

    Add tests for CompactionDriver cleanupTabletMetadata (#4798)
    
    This commit adds tests using TestAmple to verify that the
    cleanupTabletMetadata method, which is called from undo, properly cleans
    up any compaction metadata for a given fate Id across the tablets that
    match the range provided to the CompactionDriver
    
    This closes #4795
---
 .../manager/tableOps/compact/CompactionDriver.java |   2 +-
 .../compaction/ExternalCompactionTestUtils.java    |  27 +++--
 .../apache/accumulo/test/fate/ManagerRepoIT.java   | 134 +++++++++++++++++++++
 3 files changed, 152 insertions(+), 11 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 74fd66b49d..990b1a0d74 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-class CompactionDriver extends ManagerRepo {
+public class CompactionDriver extends ManagerRepo {
 
   private static final Logger log = 
LoggerFactory.getLogger(CompactionDriver.class);
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 20e6efc499..256751b036 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -370,18 +370,25 @@ public class ExternalCompactionTestUtils {
   public static void assertNoCompactionMetadata(ServerContext ctx, String 
tableName) {
     var tableId = 
TableId.of(ctx.tableOperations().tableIdMap().get(tableName));
     try (var tabletsMetadata = 
ctx.getAmple().readTablets().forTable(tableId).build()) {
+      assertNoCompactionMetadata(tabletsMetadata);
+    }
+  }
 
-      int count = 0;
+  public static void assertNoCompactionMetadata(TabletsMetadata 
tabletsMetadata) {
+    int count = 0;
 
-      for (var tabletMetadata : tabletsMetadata) {
-        assertEquals(Set.of(), tabletMetadata.getCompacted());
-        assertNull(tabletMetadata.getSelectedFiles());
-        assertEquals(Set.of(), 
tabletMetadata.getExternalCompactions().keySet());
-        assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested());
-        count++;
-      }
-
-      assertTrue(count > 0);
+    for (var tabletMetadata : tabletsMetadata) {
+      assertNoCompactionMetadata(tabletMetadata);
+      count++;
     }
+
+    assertTrue(count > 0);
+  }
+
+  public static void assertNoCompactionMetadata(TabletMetadata tabletMetadata) 
{
+    assertEquals(Set.of(), tabletMetadata.getCompacted());
+    assertNull(tabletMetadata.getSelectedFiles());
+    assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet());
+    assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested());
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
index 50ee816e86..a0cf45ee79 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
@@ -23,6 +23,7 @@ import static 
org.apache.accumulo.core.client.ConditionalWriter.Status.UNKNOWN;
 import static org.apache.accumulo.test.ample.TestAmpleUtil.mockWithAmple;
 import static 
org.apache.accumulo.test.ample.metadata.ConditionalWriterInterceptor.withStatus;
 import static org.apache.accumulo.test.ample.metadata.TestAmple.not;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -31,10 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -43,19 +49,28 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
 import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily;
+import org.apache.accumulo.core.metadata.schema.SelectedFiles;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.manager.tableOps.compact.CompactionDriver;
 import org.apache.accumulo.manager.tableOps.merge.DeleteRows;
 import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
 import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
@@ -74,6 +89,9 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.collect.Sets;
 
 public class ManagerRepoIT extends SharedMiniClusterBase {
 
@@ -307,6 +325,122 @@ public class ManagerRepoIT extends SharedMiniClusterBase {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("compactionDriverRanges")
+  public void testCompactionDriverCleanup(Pair<Text,Text> rangeText) throws 
Exception {
+    // Create a range for the test and generate table names
+    String[] tableNames = getUniqueNames(2);
+    var range = new Range(rangeText.getFirst(), true, rangeText.getSecond(), 
false);
+    var tableSuffix = (range.isInfiniteStartKey() ? "" : 
rangeText.getFirst().toString())
+        + (range.isInfiniteStopKey() ? "" : rangeText.getSecond().toString());
+    String metadataTable = tableNames[0] + tableSuffix;
+    String userTable = tableNames[1] + tableSuffix;
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+
+      // Create a table with 4 splits
+      var splits = Sets.newTreeSet(Arrays.asList(new Text("d"), new Text("m"), 
new Text("s")));
+      client.tableOperations().create(userTable, new 
NewTableConfiguration().withSplits(splits));
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      // Set up Test ample and manager
+      TestAmple.createMetadataTable(client, metadataTable);
+      TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+      testAmple.createMetadataFromExisting(client, tableId);
+      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+      var ctx = manager.getContext();
+
+      // Create the CompactionDriver to test with the given range passed into 
the method
+      final ManagerRepo repo = new 
CompactionDriver(ctx.getNamespaceId(tableId), tableId,
+          !range.isInfiniteStartKey() ? 
range.getStartKey().getRow().getBytes() : null,
+          !range.isInfiniteStopKey() ? range.getEndKey().getRow().getBytes() : 
null);
+
+      // Create a couple random fateIds and generate compaction metadata for
+      // the first fateId for all 4 tablets in the table
+      var fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+      var fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+      createCompactionMetadata(testAmple, tableId, fateId1);
+
+      // Verify there are 4 tablets and each tablet has compaction metadata 
generated
+      // for the first fateId and store all the extents in a set
+      Set<KeyExtent> extents = new HashSet<>();
+      try (TabletsMetadata tabletsMetadata = 
testAmple.readTablets().forTable(tableId).build()) {
+        assertEquals(4, tabletsMetadata.stream().count());
+        tabletsMetadata.forEach(tm -> {
+          extents.add(tm.getExtent());
+          assertHasCompactionMetadata(fateId1, tm);
+        });
+      }
+      assertEquals(4, extents.size());
+
+      // First call undo using the second fateId and verify there's still 
metadata for the first one
+      repo.undo(fateId2, manager);
+      try (TabletsMetadata tabletsMetadata = 
testAmple.readTablets().forTable(tableId).build()) {
+        tabletsMetadata.forEach(tm -> {
+          assertHasCompactionMetadata(fateId1, tm);
+        });
+      }
+
+      // Now call undo on the first fateId which would clean up all the 
metadata for all the
+      // tablets that overlap with the given range that was provided to the 
CompactionDriver
+      // during the creation of the repo
+      repo.undo(fateId1, manager);
+
+      // First, iterate over only the overlapping tablets and verify that 
those tablets
+      // were cleaned up and remove any visited tablets from the extents set
+      try (var tabletsMetadata = testAmple.readTablets().forTable(tableId)
+          .overlapping(rangeText.getFirst(), rangeText.getSecond()).build()) {
+        tabletsMetadata.forEach(tm -> {
+          extents.remove(tm.getExtent());
+          assertNoCompactionMetadata(tm);
+        });
+      }
+
+      // Second, for any remaining tablets that did not overlap the range 
provided,
+      // verify that metadata still exists as the CompactionDriver should not 
have cleaned
+      // up tablets that did not overlap the given range
+      try (var tabletsMetadata =
+          testAmple.readTablets().forTablets(extents, 
Optional.empty()).build()) {
+        tabletsMetadata.forEach(tm -> {
+          extents.remove(tm.getExtent());
+          assertHasCompactionMetadata(fateId1, tm);
+        });
+      }
+
+      // Verify all the tablets in the table were checked for correct metadata 
after undo
+      assertTrue(extents.isEmpty());
+    }
+  }
+
+  private void createCompactionMetadata(Ample testAmple, TableId tableId, 
FateId fateId) {
+    var stf1 = StoredTabletFile.of(new org.apache.hadoop.fs.Path(
+        
"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000050.rf"));
+    var stf2 = StoredTabletFile.of(new org.apache.hadoop.fs.Path(
+        
"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
+
+    try (TabletsMetadata tabletsMetadata = 
testAmple.readTablets().forTable(tableId).build();
+        TabletsMutator tabletsMutator = testAmple.mutateTablets()) {
+      var selectedFiles = new SelectedFiles(Set.of(stf1, stf2), true, fateId,
+          SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+      tabletsMetadata.forEach(tm -> 
tabletsMutator.mutateTablet(tm.getExtent()).putCompacted(fateId)
+          
.putUserCompactionRequested(fateId).putSelectedFiles(selectedFiles).mutate());
+    }
+  }
+
+  private void assertHasCompactionMetadata(FateId fateId, TabletMetadata tm) {
+    assertEquals(Set.of(fateId), tm.getCompacted());
+    assertNotNull(tm.getSelectedFiles());
+    assertEquals(Set.of(fateId), tm.getUserCompactionsRequested());
+  }
+
+  // Create a few ranges to test the CompactionDriver cleanup against
+  private static Stream<Pair<Text,Text>> compactionDriverRanges() {
+    return Stream.of(new Pair<>(null, null), new Pair<>(null, new Text("d")),
+        new Pair<>(new Text("dd"), new Text("mm")), new Pair<>(new Text("s"), 
null));
+  }
+
   private void createUnsplittableTable(ClientContext client, String table) 
throws Exception {
     // make a table and lower the configuration properties
     // @formatter:off

Reply via email to