This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 22621ea501 Update dead compaction detector to handle metadata/root 
(#4354)
22621ea501 is described below

commit 22621ea50190389131146b92546bf5b9c2ab47cf
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Wed Mar 13 11:43:02 2024 -0400

    Update dead compaction detector to handle metadata/root (#4354)
    
    This updates DeadCompactionDetector to also look at the root and
    metadata table metadata for dead compactions on those tables
    
    This closes #4340
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../coordinator/CompactionCoordinator.java         |  19 +-
 .../coordinator/DeadCompactionDetector.java        |  20 +-
 .../test/compaction/ExternalCompaction_1_IT.java   | 231 +++++++++++++++------
 3 files changed, 197 insertions(+), 73 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 2a12f08708..97397c019b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -20,6 +20,9 @@ package org.apache.accumulo.manager.compaction.coordinator;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
@@ -81,6 +84,7 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -577,7 +581,7 @@ public class CompactionCoordinator
       var dfv = 
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
       return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), 
dfv.getNumEntries(),
           dfv.getTime());
-    }).collect(Collectors.toList());
+    }).collect(toList());
 
     FateInstanceType type = 
FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId());
     FateId fateId = FateId.from(type, 0);
@@ -707,10 +711,19 @@ public class CompactionCoordinator
     KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
     LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, 
fromThriftExtent);
     final var ecid = ExternalCompactionId.of(externalCompactionId);
-    compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
+    compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
   }
 
-  void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
+  void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
+    // Need to process each level by itself because the conditional tablet 
mutator does not support
+    // mutating multiple data levels at the same time
+    compactions.entrySet().stream()
+        .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()),
+            Collectors.toMap(Entry::getKey, Entry::getValue)))
+        .forEach((level, compactionsByLevel) -> 
compactionFailedForLevel(compactionsByLevel));
+  }
+
+  void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent> 
compactions) {
 
     try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
       compactions.forEach((ecid, extent) -> {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index 0857385239..87a4cef78b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
@@ -141,13 +142,17 @@ public class DeadCompactionDetector {
      */
     log.debug("Starting to look for dead compactions");
 
-    // ELASTICITY_TODO not looking for dead compactions in the metadata table
     Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>();
 
     // find what external compactions tablets think are running
-    try (TabletsMetadata tabletsMetadata = 
context.getAmple().readTablets().forLevel(DataLevel.USER)
-        .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, 
ColumnType.PREV_ROW)
-        .build()) {
+    try (Stream<TabletMetadata> tabletsMetadata = Stream
+        // Listing the data levels vs using DataLevel.values() prevents 
unexpected
+        // behavior if a new DataLevel is added
+        .of(DataLevel.ROOT, DataLevel.METADATA, DataLevel.USER)
+        .map(dataLevel -> context.getAmple().readTablets().forLevel(dataLevel)
+            .filter(new 
HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, ColumnType.PREV_ROW)
+            .build())
+        .flatMap(TabletsMetadata::stream)) {
       tabletsMetadata.forEach(tm -> {
         tm.getExternalCompactions().keySet().forEach(ecid -> {
           tabletCompactions.put(ecid, tm.getExtent());
@@ -193,9 +198,8 @@ public class DeadCompactionDetector {
           log.warn("Fate is not present, can not look for dead compactions");
           return;
         }
-        // ELASTICITY_TODO need to handle metadata
-        var fate = fateMap.get(FateInstanceType.USER);
-        try (Stream<FateKey> keyStream = 
fate.list(FateKey.FateKeyType.COMPACTION_COMMIT)) {
+        try (Stream<FateKey> keyStream = fateMap.values().stream()
+            .flatMap(fate -> 
fate.list(FateKey.FateKeyType.COMPACTION_COMMIT))) {
           keyStream.map(fateKey -> 
fateKey.getCompactionId().orElseThrow()).forEach(ecid -> {
             if (tabletCompactions.remove(ecid) != null) {
               log.debug("Ignoring compaction {} that is committing in a fate", 
ecid);
@@ -222,7 +226,7 @@ public class DeadCompactionDetector {
       tabletCompactions.forEach((ecid, extent) -> {
         log.warn("Compaction believed to be dead, failing it: id: {}, extent: 
{}", ecid, extent);
       });
-      coordinator.compactionFailed(tabletCompactions);
+      coordinator.compactionsFailed(tabletCompactions);
       this.deadCompactions.keySet().removeAll(toFail);
     }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 6a8dbd5040..16dd39b09f 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -71,14 +73,18 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ZooStore;
 import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
 import org.apache.accumulo.core.iterators.DevNull;
 import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -221,86 +227,187 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
 
   /**
    * This test verifies the dead compaction detector does not remove 
compactions that are committing
-   * in fate.
+   * in fate for the Root table.
    */
   @Test
-  public void testCompactionCommitAndDeadDetection() throws Exception {
+  public void testCompactionCommitAndDeadDetectionRoot() throws Exception {
+    var ctx = getCluster().getServerContext();
+    FateStore<Manager> zkStore =
+        new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, 
ctx.getZooReaderWriter());
+
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
-      final String tableName = getUniqueNames(1)[0];
+      var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName());
+      var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
+      var fateId = createCompactionCommitAndDeadMetadata(c, zkStore, 
AccumuloTable.ROOT.tableName(),
+          allCids);
+      verifyCompactionCommitAndDead(zkStore, tableId, fateId, 
allCids.get(tableId));
+    }
+  }
 
+  /**
+   * This test verifies the dead compaction detector does not remove 
compactions that are committing
+   * in fate for the Metadata table.
+   */
+  @Test
+  public void testCompactionCommitAndDeadDetectionMeta() throws Exception {
+    var ctx = getCluster().getServerContext();
+    FateStore<Manager> zkStore =
+        new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, 
ctx.getZooReaderWriter());
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      // Metadata table by default already has 2 tablets
+      var tableId = ctx.getTableId(AccumuloTable.METADATA.tableName());
+      var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
+      var fateId = createCompactionCommitAndDeadMetadata(c, zkStore,
+          AccumuloTable.METADATA.tableName(), allCids);
+      verifyCompactionCommitAndDead(zkStore, tableId, fateId, 
allCids.get(tableId));
+    }
+  }
+
+  /**
+   * This test verifies the dead compaction detector does not remove 
compactions that are committing
+   * in fate for a User table.
+   */
+  @Test
+  public void testCompactionCommitAndDeadDetectionUser() throws Exception {
+    var ctx = getCluster().getServerContext();
+    final String tableName = getUniqueNames(1)[0];
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx);
       SortedSet<Text> splits = new TreeSet<>();
       splits.add(new Text(row(MAX_DATA / 2)));
-
       c.tableOperations().create(tableName, new 
NewTableConfiguration().withSplits(splits));
       writeData(c, tableName);
-      c.tableOperations().flush(tableName, null, null, true);
 
-      var ctx = getCluster().getServerContext();
       var tableId = ctx.getTableId(tableName);
+      var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
+      var fateId = createCompactionCommitAndDeadMetadata(c, accumuloStore, 
tableName, allCids);
+      verifyCompactionCommitAndDead(accumuloStore, tableId, fateId, 
allCids.get(tableId));
+    }
+  }
 
-      // Create two random compaction ids
-      var cids = List.of(ExternalCompactionId.generate(UUID.randomUUID()),
-          ExternalCompactionId.generate(UUID.randomUUID()));
+  /**
+   * This test verifies the dead compaction detector does not remove 
compactions that are committing
+   * in fate when all data levels have compactions
+   */
+  @Test
+  public void testCompactionCommitAndDeadDetectionAll() throws Exception {
+    var ctx = getCluster().getServerContext();
+    final String userTable = getUniqueNames(1)[0];
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
       AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx);
+      FateStore<Manager> zkStore =
+          new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, 
ctx.getZooReaderWriter());
 
-      // Create a fate transaction for one of the compaction ids that is in 
the new state, it should
-      // never run. Its purpose is to prevent the dead compaction detector 
from deleting the id.
-      FateStore.FateTxStore<Manager> fateTx =
-          
accumuloStore.createAndReserve(FateKey.forCompactionCommit(cids.get(0))).orElseThrow();
-      var fateId = fateTx.getID();
-      fateTx.unreserve(0, TimeUnit.MILLISECONDS);
+      SortedSet<Text> splits = new TreeSet<>();
+      splits.add(new Text(row(MAX_DATA / 2)));
+      c.tableOperations().create(userTable, new 
NewTableConfiguration().withSplits(splits));
+      writeData(c, userTable);
+
+      Map<TableId,FateId> fateIds = new HashMap<>();
+      Map<TableId,List<ExternalCompactionId>> allCids = new HashMap<>();
+
+      // create compaction metadata for each data level to test
+      for (String tableName : List.of(AccumuloTable.ROOT.tableName(),
+          AccumuloTable.METADATA.tableName(), userTable)) {
+        var tableId = ctx.getTableId(tableName);
+        var fateStore = FateInstanceType.fromTableId(tableId) == 
FateInstanceType.USER
+            ? accumuloStore : zkStore;
+        fateIds.put(tableId,
+            createCompactionCommitAndDeadMetadata(c, fateStore, tableName, 
allCids));
+      }
 
-      // Read the tablet metadata
-      var tabletsMeta = 
ctx.getAmple().readTablets().forTable(tableId).build().stream()
-          .collect(Collectors.toList());
+      // verify the dead compaction was removed for each level
+      // but not the compaction associated with a fate id
+      for (Entry<TableId,FateId> entry : fateIds.entrySet()) {
+        var tableId = entry.getKey();
+        var fateStore = FateInstanceType.fromTableId(tableId) == 
FateInstanceType.USER
+            ? accumuloStore : zkStore;
+        verifyCompactionCommitAndDead(fateStore, tableId, entry.getValue(), 
allCids.get(tableId));
+      }
+    }
+  }
+
+  private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c,
+      FateStore<Manager> fateStore, String tableName,
+      Map<TableId,List<ExternalCompactionId>> allCids) throws Exception {
+    var ctx = getCluster().getServerContext();
+    c.tableOperations().flush(tableName, null, null, true);
+    var tableId = ctx.getTableId(tableName);
+
+    allCids.put(tableId, 
List.of(ExternalCompactionId.generate(UUID.randomUUID()),
+        ExternalCompactionId.generate(UUID.randomUUID())));
+
+    // Create a fate transaction for one of the compaction ids that is in the 
new state, it
+    // should never run. Its purpose is to prevent the dead compaction detector
+    // from deleting the id.
+    FateStore.FateTxStore<Manager> fateTx = fateStore
+        
.createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow();
+    var fateId = fateTx.getID();
+    fateTx.unreserve(0, TimeUnit.MILLISECONDS);
+
+    // Read the tablet metadata
+    var tabletsMeta = 
ctx.getAmple().readTablets().forTable(tableId).build().stream()
+        .collect(Collectors.toList());
+    // Root is always 1 tablet
+    if (!tableId.equals(AccumuloTable.ROOT.tableId())) {
       assertEquals(2, tabletsMeta.size());
+    }
 
-      // Insert fake compaction entries in the metadata table. No compactor 
will report ownership of
-      // these, so they should look like dead compactions and be removed. 
However, one of them has
-      // an associated fate tx that should prevent its removal.
-      try (var mutator = ctx.getAmple().mutateTablets()) {
-        for (int i = 0; i < tabletsMeta.size(); i++) {
-          var tabletMeta = tabletsMeta.get(0);
-          var tabletDir =
-              
tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent();
-          var tmpFile = new Path(tabletDir, "C1234.rf_tmp");
-
-          CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(),
-              ReferencedTabletFile.of(tmpFile), "localhost:16789", 
CompactionKind.SYSTEM,
-              (short) 10, CompactorGroupId.of(GROUP1), false, null);
-
-          
mutator.mutateTablet(tabletMeta.getExtent()).putExternalCompaction(cids.get(i), 
cm)
-              .mutate();
-        }
+    // Insert fake compaction entries in the metadata table. No compactor will 
report ownership
+    // of these, so they should look like dead compactions and be removed. 
However, one of
+    // them hasan associated fate tx that should prevent its removal.
+    try (var mutator = ctx.getAmple().mutateTablets()) {
+      for (int i = 0; i < tabletsMeta.size(); i++) {
+        var tabletMeta = tabletsMeta.get(0);
+        var tabletDir =
+            
tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent();
+        var tmpFile = new Path(tabletDir, "C1234.rf_tmp");
+
+        CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(),
+            ReferencedTabletFile.of(tmpFile), "localhost:16789", 
CompactionKind.SYSTEM, (short) 10,
+            CompactorGroupId.of(GROUP1), false, null);
+
+        mutator.mutateTablet(tabletMeta.getExtent())
+            .putExternalCompaction(allCids.get(tableId).get(i), cm).mutate();
       }
-
-      // Wait until the compaction id w/o a fate transaction is removed, 
should still see the one
-      // with a fate transaction
-      Wait.waitFor(() -> {
-        Set<ExternalCompactionId> currentIds = 
ctx.getAmple().readTablets().forTable(tableId)
-            .build().stream().map(TabletMetadata::getExternalCompactions)
-            .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
-        System.out.println("currentIds1:" + currentIds);
-        assertTrue(currentIds.size() == 1 || currentIds.size() == 2);
-        return currentIds.equals(Set.of(cids.get(0)));
-      });
-
-      // Delete the fate transaction, should allow the dead compaction 
detector to clean up the
-      // remaining external compaction id
-      fateTx = accumuloStore.reserve(fateId);
-      fateTx.delete();
-      fateTx.unreserve(0, TimeUnit.MILLISECONDS);
-
-      // wait for the remaining compaction id to be removed
-      Wait.waitFor(() -> {
-        Set<ExternalCompactionId> currentIds = 
ctx.getAmple().readTablets().forTable(tableId)
-            .build().stream().map(TabletMetadata::getExternalCompactions)
-            .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
-        System.out.println("currentIds2:" + currentIds);
-        assertTrue(currentIds.size() <= 1);
-        return currentIds.isEmpty();
-      });
     }
+
+    return fateId;
+  }
+
+  private void verifyCompactionCommitAndDead(FateStore<Manager> fateStore, 
TableId tableId,
+      FateId fateId, List<ExternalCompactionId> cids) {
+    var ctx = getCluster().getServerContext();
+
+    // Wait until the compaction id w/o a fate transaction is removed, should 
still see the one
+    // with a fate transaction
+    Wait.waitFor(() -> {
+      Set<ExternalCompactionId> currentIds = 
ctx.getAmple().readTablets().forTable(tableId).build()
+          .stream().map(TabletMetadata::getExternalCompactions)
+          .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
+      System.out.println("currentIds1:" + currentIds);
+      assertTrue(currentIds.size() == 1 || currentIds.size() == 2);
+      return currentIds.equals(Set.of(cids.get(0)));
+    });
+
+    // Delete the fate transaction, should allow the dead compaction detector 
to clean up the
+    // remaining external compaction id
+    var fateTx = fateStore.reserve(fateId);
+    fateTx.delete();
+    fateTx.unreserve(0, TimeUnit.MILLISECONDS);
+
+    // wait for the remaining compaction id to be removed
+    Wait.waitFor(() -> {
+      Set<ExternalCompactionId> currentIds = 
ctx.getAmple().readTablets().forTable(tableId).build()
+          .stream().map(TabletMetadata::getExternalCompactions)
+          .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
+      System.out.println("currentIds2:" + currentIds);
+      assertTrue(currentIds.size() <= 1);
+      return currentIds.isEmpty();
+    });
   }
 
   @Test

Reply via email to