This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f4776170d6 adds admin check for dangling fate references (#4686)
f4776170d6 is described below

commit f4776170d64b59dc20fc9cc97dda65d8981046eb
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Jun 25 13:56:38 2024 -0400

    adds admin check for dangling fate references (#4686)
    
    Added the ability to check for tablets that reference
    fate operations that are no longer active.  This was
    added to the `accumulo admin checkTablets` command.
    
    A unit test was added that validates the algorithm and
    the extraction of fate ids from tablet metadata. Manual
    testing was done to validate end to end functionality.
    
    For manual test the following command were run in the shell
    
    ```
    grant Table.WRITE -t accumulo.metadata -u root
    insert 1< srv opid SPLITTING:FATE:USER:dfdb85a6-65a0-47d2-a9e2-4c671b499829
    ```
    
    and then the following was run
    
    ```
    $ accumulo admin checkTablets
    
    *** Looking for offline tablets ***
    
    Scanning zookeeper
    Scanning accumulo.root
    Scanning accumulo.metadata
    1<< is UNASSIGNED  #walogs:0
    
    *** Looking for missing files ***
    
    Scanning : accumulo.root (-inf,~ : [] 9223372036854775807 false)
    Scan finished, 0 files of 2 missing
    
    Scanning : accumulo.metadata (-inf,~ : [] 9223372036854775807 false)
    Scan finished, 0 files of 0 missing
    
    *** Looking for dangling fate operations ***
    
    FATE:USER:dfdb85a6-65a0-47d2-a9e2-4c671b499829 1<<
    
    Found 1 dangling references to fate operations
    ```
---
 .../core/metadata/schema/TabletOperationId.java    |   7 +
 .../org/apache/accumulo/server/util/Admin.java     | 169 ++++++++++++++++++++-
 .../accumulo/server/util/FindOfflineTablets.java   |   4 +-
 .../org/apache/accumulo/server/util/AdminTest.java | 125 +++++++++++++++
 4 files changed, 302 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java
index 8da831eacd..089b40802c 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java
@@ -57,6 +57,12 @@ public class TabletOperationId extends 
AbstractId<TabletOperationId> {
     return TabletOperationType.valueOf(fields[0]);
   }
 
+  public FateId getFateId() {
+    var fields = canonical().split(":", 2);
+    Preconditions.checkState(fields.length == 2);
+    return FateId.from(fields[1]);
+  }
+
   public static TabletOperationId from(String opid) {
     return new TabletOperationId(validate(opid));
   }
@@ -64,4 +70,5 @@ public class TabletOperationId extends 
AbstractId<TabletOperationId> {
   public static TabletOperationId from(TabletOperationType type, FateId 
fateId) {
     return new TabletOperationId(type + ":" + fateId);
   }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java 
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index e40fc316df..b1be48007f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -27,18 +27,27 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Formatter;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
@@ -53,6 +62,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.AdminUtil;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
@@ -66,6 +76,8 @@ import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.manager.thrift.FateService;
 import org.apache.accumulo.core.manager.thrift.TFateId;
 import org.apache.accumulo.core.metadata.AccumuloTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.security.Authorizations;
@@ -89,6 +101,8 @@ import org.slf4j.LoggerFactory;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Lists;
@@ -118,7 +132,8 @@ public class Admin implements KeywordExecutable {
     List<String> args = new ArrayList<>();
   }
 
-  @Parameters(commandDescription = "print tablets that are offline in online 
tables")
+  @Parameters(commandDescription = "Looks for tablets that are unexpectedly 
offline, tablets that "
+      + "reference missing files, or tablets that reference absent fate 
operations.")
   static class CheckTabletsCommand {
     @Parameter(names = "--fixFiles", description = "Remove dangling file 
pointers")
     boolean fixFiles = false;
@@ -367,7 +382,10 @@ public class Admin implements KeywordExecutable {
             rc = 6;
           }
         }
-
+        System.out.println("\n*** Looking for dangling fate operations ***\n");
+        if (printDanglingFateOperations(context, 
checkTabletsCommand.tableName) > 0) {
+          rc = 7;
+        }
       } else if (cl.getParsedCommand().equals("stop")) {
         stopTabletServer(context, stopOpts.args, opts.force);
       } else if (cl.getParsedCommand().equals("dumpConfig")) {
@@ -948,4 +966,151 @@ public class Admin implements KeywordExecutable {
     }
     return typesFilter;
   }
+
+  private static long printDanglingFateOperations(ServerContext context, 
String tableName)
+      throws Exception {
+    long totalDanglingSeen = 0;
+    if (tableName == null) {
+      for (var dataLevel : Ample.DataLevel.values()) {
+        try (var tablets = 
context.getAmple().readTablets().forLevel(dataLevel).build()) {
+          totalDanglingSeen += printDanglingFateOperations(context, tablets);
+        }
+      }
+    } else {
+      var tableId = context.getTableId(tableName);
+      try (var tablets = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        totalDanglingSeen += printDanglingFateOperations(context, tablets);
+      }
+    }
+
+    System.out.printf("\nFound %,d dangling references to fate operations\n", 
totalDanglingSeen);
+
+    return totalDanglingSeen;
+  }
+
+  private static long printDanglingFateOperations(ServerContext context,
+      Iterable<TabletMetadata> tablets) throws Exception {
+    Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup 
= extents -> {
+      try (var lookedupTablets =
+          context.getAmple().readTablets().forTablets(extents, 
Optional.empty()).build()) {
+        Map<KeyExtent,TabletMetadata> tabletMap = new HashMap<>();
+        lookedupTablets
+            .forEach(tabletMetadata -> 
tabletMap.put(tabletMetadata.getExtent(), tabletMetadata));
+        return tabletMap;
+      }
+    };
+
+    UserFateStore<?> ufs = new UserFateStore<>(context);
+    MetaFateStore<?> mfs = new MetaFateStore<>(context.getZooKeeperRoot() + 
Constants.ZFATE,
+        context.getZooReaderWriter());
+    LoadingCache<FateId,ReadOnlyFateStore.TStatus> fateStatusCache = 
Caffeine.newBuilder()
+        .maximumSize(100_000).expireAfterWrite(10, 
TimeUnit.SECONDS).build(fateId -> {
+          if (fateId.getType() == FateInstanceType.META) {
+            return mfs.read(fateId).getStatus();
+          } else {
+            return ufs.read(fateId).getStatus();
+          }
+        });
+
+    Predicate<FateId> activePredicate = fateId -> {
+      var status = fateStatusCache.get(fateId);
+      switch (status) {
+        case NEW:
+        case IN_PROGRESS:
+        case SUBMITTED:
+        case FAILED_IN_PROGRESS:
+          return true;
+        case FAILED:
+        case SUCCESSFUL:
+        case UNKNOWN:
+          return false;
+        default:
+          throw new IllegalStateException("Unexpected status: " + status);
+      }
+    };
+
+    AtomicLong danglingSeen = new AtomicLong();
+    BiConsumer<KeyExtent,Set<FateId>> danglingConsumer = (extent, fateIds) -> {
+      danglingSeen.addAndGet(fateIds.size());
+      fateIds.forEach(fateId -> System.out.println(fateId + " " + extent));
+    };
+
+    findDanglingFateOperations(tablets, tabletLookup, activePredicate, 
danglingConsumer, 10_000);
+    return danglingSeen.get();
+  }
+
+  /**
+   * Finds tablets that point to fate operations that do not exists or are 
complete.
+   *
+   * @param tablets the tablets to inspect
+   * @param tabletLookup a function that can lookup a tablets latest metadata
+   * @param activePredicate a predicate that can determine if a fate id is 
currently active
+   * @param danglingConsumer a consumer that tablets with inactive fate ids 
will be sent to
+   */
+  static void findDanglingFateOperations(Iterable<TabletMetadata> tablets,
+      Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> 
tabletLookup,
+      Predicate<FateId> activePredicate, BiConsumer<KeyExtent,Set<FateId>> 
danglingConsumer,
+      int bufferSize) {
+
+    ArrayList<FateId> fateIds = new ArrayList<>();
+    Map<KeyExtent,Set<FateId>> candidates = new HashMap<>();
+    for (TabletMetadata tablet : tablets) {
+      fateIds.clear();
+      getAllFateIds(tablet, fateIds::add);
+      fateIds.removeIf(activePredicate);
+      if (!fateIds.isEmpty()) {
+        candidates.put(tablet.getExtent(), new HashSet<>(fateIds));
+        if (candidates.size() > bufferSize) {
+          processCandidates(candidates, tabletLookup, danglingConsumer);
+          candidates.clear();
+        }
+      }
+    }
+
+    processCandidates(candidates, tabletLookup, danglingConsumer);
+  }
+
+  private static void processCandidates(Map<KeyExtent,Set<FateId>> candidates,
+      Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> 
tabletLookup,
+      BiConsumer<KeyExtent,Set<FateId>> danglingConsumer) {
+    // Perform a 2nd check of the tablet to avoid race conditions like the 
following.
+    // 1. THREAD 1 : TabletMetadata is read and points to active fate operation
+    // 2. THREAD 2 : The fate operation is deleted from the tablet
+    // 3. THREAD 2 : The fate operation completes
+    // 4. THREAD 1 : Checks if the fate operation read in step 1 is active and 
finds it is not
+
+    Map<KeyExtent,TabletMetadata> currentTablets = 
tabletLookup.apply(candidates.keySet());
+    HashSet<FateId> currentFateIds = new HashSet<>();
+    candidates.forEach((extent, fateIds) -> {
+      var currentTablet = currentTablets.get(extent);
+      if (currentTablet != null) {
+        currentFateIds.clear();
+        getAllFateIds(currentTablet, currentFateIds::add);
+        // Only keep fate ids that are still present in the tablet. Any new 
fate ids in
+        // currentFateIds that were not seen on the first pass are not 
considered here. To check
+        // those new ones, the entire two-step process would need to be rerun.
+        fateIds.retainAll(currentFateIds);
+
+        if (!fateIds.isEmpty()) {
+          // the fateIds in this set were found to be inactive and still exist 
in the tablet
+          // metadata after being found inactive
+          danglingConsumer.accept(extent, fateIds);
+        }
+      } // else the tablet no longer exist so nothing to report
+    });
+  }
+
+  /**
+   * Extracts all fate ids that a tablet points to from any field.
+   */
+  private static void getAllFateIds(TabletMetadata tabletMetadata,
+      Consumer<FateId> fateIdConsumer) {
+    tabletMetadata.getLoaded().values().forEach(fateIdConsumer);
+    if (tabletMetadata.getSelectedFiles() != null) {
+      fateIdConsumer.accept(tabletMetadata.getSelectedFiles().getFateId());
+    }
+    if (tabletMetadata.getOperationId() != null) {
+      fateIdConsumer.accept(tabletMetadata.getOperationId().getFateId());
+    }
+  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
index cc69b67ffa..d09436e5bd 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
@@ -117,8 +118,9 @@ public class FindOfflineTablets {
       Set<TServerInstance> liveTServers = tservers.getCurrentServers();
       TabletState state = TabletState.compute(tabletMetadata, liveTServers);
       if (state != null && state != TabletState.HOSTED
+          && tabletMetadata.getTabletAvailability() == 
TabletAvailability.HOSTED
           && 
context.getTableManager().getTableState(tabletMetadata.getTableId())
-              != TableState.OFFLINE) {
+              == TableState.ONLINE) {
         System.out.println(tabletMetadata.getExtent() + " is " + state + "  
#walogs:"
             + tabletMetadata.getLogs().size());
         offline++;
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java 
b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
index 2e6754176e..714d3fafe3 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
@@ -18,16 +18,40 @@
  */
 package org.apache.accumulo.server.util;
 
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.SelectedFiles;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.core.util.time.SteadyTime;
+import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.jupiter.api.Test;
 
@@ -93,4 +117,105 @@ public class AdminTest {
     EasyMock.verify(zc);
   }
 
+  @Test
+  public void testDanglingFate() {
+    KeyExtent[] extents = new KeyExtent[10];
+    for (int i = 0; i < extents.length; i++) {
+      extents[i] = new KeyExtent(TableId.of("" + i), null, null);
+    }
+
+    FateId[] fateIds = new FateId[10];
+    for (int i = 0; i < fateIds.length; i++) {
+      fateIds[i] = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+    }
+
+    var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, 
fateIds[0]);
+    var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 
fateIds[1]);
+    var opid3 = TabletOperationId.from(TabletOperationType.MERGING, 
fateIds[5]);
+
+    var files = Set.of(StoredTabletFile.of(new 
Path("file:///accumulo/tables/4/t-1/f4.rf")));
+    var sf1 =
+        new SelectedFiles(files, true, fateIds[6], SteadyTime.from(100_100, 
TimeUnit.NANOSECONDS));
+    var sf2 =
+        new SelectedFiles(files, true, fateIds[7], SteadyTime.from(100_100, 
TimeUnit.NANOSECONDS));
+
+    var tm1 = 
TabletMetadata.builder(extents[0]).putOperation(opid1).build(LOADED, SELECTED);
+    var tm2 = 
TabletMetadata.builder(extents[1]).putOperation(opid2).build(LOADED, SELECTED);
+    var tm3 = TabletMetadata.builder(extents[2])
+        .putBulkFile(ReferencedTabletFile.of(new 
Path("file:///accumulo/tables/1/t-1/f1.rf")),
+            fateIds[2])
+        .build(OPID, SELECTED);
+    var tm4 = TabletMetadata.builder(extents[3])
+        .putBulkFile(ReferencedTabletFile.of(new 
Path("file:///accumulo/tables/2/t-1/f2.rf")),
+            fateIds[3])
+        .build(OPID, SELECTED);
+    var tm5 = TabletMetadata.builder(extents[4])
+        .putBulkFile(ReferencedTabletFile.of(new 
Path("file:///accumulo/tables/3/t-1/f3.rf")),
+            fateIds[4])
+        .putOperation(opid3).build(SELECTED);
+    var tm6 = 
TabletMetadata.builder(extents[5]).putSelectedFiles(sf1).build(OPID, LOADED);
+    var tm7 = 
TabletMetadata.builder(extents[6]).putSelectedFiles(sf2).build(OPID, LOADED);
+
+    var tablets1 = Map.of(tm1.getExtent(), tm1, tm2.getExtent(), tm2, 
tm3.getExtent(), tm3,
+        tm4.getExtent(), tm4, tm5.getExtent(), tm5, tm6.getExtent(), tm6, 
tm7.getExtent(), tm7);
+    var tablets2 = new HashMap<>(tablets1);
+    var found = new HashMap<KeyExtent,Set<FateId>>();
+    Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup 
= lookups -> {
+      var results = new HashMap<KeyExtent,TabletMetadata>();
+      lookups.forEach(extent -> {
+        assertTrue(tablets1.containsKey(extent));
+        if (tablets2.containsKey(extent)) {
+          results.put(extent, tablets2.get(extent));
+        }
+      });
+      return results;
+    };
+
+    // run test where every fate id is considered inactive
+    Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, fateId 
-> false, found::put,
+        3);
+    assertEquals(Map.of(tm1.getExtent(), Set.of(fateIds[0]), tm2.getExtent(), 
Set.of(fateIds[1]),
+        tm3.getExtent(), Set.of(fateIds[2]), tm4.getExtent(), 
Set.of(fateIds[3]), tm5.getExtent(),
+        Set.of(fateIds[4], fateIds[5]), tm6.getExtent(), Set.of(fateIds[6]), 
tm7.getExtent(),
+        Set.of(fateIds[7])), found);
+
+    // run test where some of the fate ids are active
+    Set<FateId> active = Set.of(fateIds[0], fateIds[2], fateIds[4], 
fateIds[6]);
+    found.clear();
+    Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, 
active::contains, found::put,
+        3);
+    assertEquals(Map.of(tm2.getExtent(), Set.of(fateIds[1]), tm4.getExtent(), 
Set.of(fateIds[3]),
+        tm5.getExtent(), Set.of(fateIds[5]), tm7.getExtent(), 
Set.of(fateIds[7])), found);
+
+    // run test where tablets change on 2nd read simulating race condition
+    var tm2_1 = TabletMetadata.builder(tm2.getExtent()).build(OPID, LOADED, 
SELECTED);
+    var tm4_1 = TabletMetadata.builder(tm4.getExtent())
+        .putBulkFile(ReferencedTabletFile.of(new 
Path("file:///accumulo/tables/2/t-1/f2.rf")),
+            fateIds[8])
+        .build(OPID, SELECTED);
+    tablets2.put(tm2_1.getExtent(), tm2_1);
+    tablets2.put(tm4_1.getExtent(), tm4_1);
+    tablets2.remove(tm5.getExtent());
+    found.clear();
+    Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, 
active::contains, found::put,
+        3);
+    assertEquals(Map.of(tm7.getExtent(), Set.of(fateIds[7])), found);
+    found.clear();
+
+    // run a test where all are active on second look
+    var tm7_1 = 
TabletMetadata.builder(tm7.getExtent()).putSelectedFiles(sf1).build(OPID, 
LOADED);
+    tablets2.put(tm7_1.getExtent(), tm7_1);
+    Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, 
active::contains, found::put,
+        3);
+    assertEquals(Map.of(), found);
+
+    // run a test where all active on the first look
+    active = Arrays.stream(fateIds).collect(Collectors.toSet());
+    found.clear();
+    Admin.findDanglingFateOperations(tablets1.values(), le -> {
+      assertTrue(le.isEmpty());
+      return Map.of();
+    }, active::contains, found::put, 3);
+    assertEquals(Map.of(), found);
+  }
 }

Reply via email to