This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f4776170d6 adds admin check for dangling fate references (#4686) f4776170d6 is described below commit f4776170d64b59dc20fc9cc97dda65d8981046eb Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Jun 25 13:56:38 2024 -0400 adds admin check for dangling fate references (#4686) Added the ability to check for tablets that reference fate operations that are no longer active. This was added to the `accumulo admin checkTablets` command. A unit test was added that validates the algorithm and the extraction of fate ids from tablet metadata. Manual testing was done to validate end to end functionality. For manual test the following command were run in the shell ``` grant Table.WRITE -t accumulo.metadata -u root insert 1< srv opid SPLITTING:FATE:USER:dfdb85a6-65a0-47d2-a9e2-4c671b499829 ``` and then the following was run ``` $ accumulo admin checkTablets *** Looking for offline tablets *** Scanning zookeeper Scanning accumulo.root Scanning accumulo.metadata 1<< is UNASSIGNED #walogs:0 *** Looking for missing files *** Scanning : accumulo.root (-inf,~ : [] 9223372036854775807 false) Scan finished, 0 files of 2 missing Scanning : accumulo.metadata (-inf,~ : [] 9223372036854775807 false) Scan finished, 0 files of 0 missing *** Looking for dangling fate operations *** FATE:USER:dfdb85a6-65a0-47d2-a9e2-4c671b499829 1<< Found 1 dangling references to fate operations ``` --- .../core/metadata/schema/TabletOperationId.java | 7 + .../org/apache/accumulo/server/util/Admin.java | 169 ++++++++++++++++++++- .../accumulo/server/util/FindOfflineTablets.java | 4 +- .../org/apache/accumulo/server/util/AdminTest.java | 125 +++++++++++++++ 4 files changed, 302 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java index 8da831eacd..089b40802c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java @@ -57,6 +57,12 @@ public class TabletOperationId extends AbstractId<TabletOperationId> { return TabletOperationType.valueOf(fields[0]); } + public FateId getFateId() { + var fields = canonical().split(":", 2); + Preconditions.checkState(fields.length == 2); + return FateId.from(fields[1]); + } + public static TabletOperationId from(String opid) { return new TabletOperationId(validate(opid)); } @@ -64,4 +70,5 @@ public class TabletOperationId extends AbstractId<TabletOperationId> { public static TabletOperationId from(TabletOperationType type, FateId fateId) { return new TabletOperationId(type + ":" + fateId); } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index e40fc316df..b1be48007f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -27,18 +27,27 @@ import java.io.FileWriter; import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.Formatter; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -53,6 +62,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; @@ -66,6 +76,8 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.security.Authorizations; @@ -89,6 +101,8 @@ import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Lists; @@ -118,7 +132,8 @@ public class Admin implements KeywordExecutable { List<String> args = new ArrayList<>(); } - @Parameters(commandDescription = "print tablets that are offline in online tables") + @Parameters(commandDescription = "Looks for tablets that are unexpectedly offline, tablets that " + + "reference missing files, or tablets that reference absent fate operations.") static class CheckTabletsCommand { @Parameter(names = "--fixFiles", description = "Remove dangling file pointers") boolean fixFiles = false; @@ -367,7 +382,10 @@ public class Admin implements KeywordExecutable { rc = 6; } } - + System.out.println("\n*** Looking for dangling fate operations ***\n"); + if (printDanglingFateOperations(context, checkTabletsCommand.tableName) > 0) { + rc = 7; + } } else if (cl.getParsedCommand().equals("stop")) { stopTabletServer(context, stopOpts.args, opts.force); } else if (cl.getParsedCommand().equals("dumpConfig")) { @@ -948,4 +966,151 @@ public class Admin implements KeywordExecutable { } return typesFilter; } + + private static long printDanglingFateOperations(ServerContext context, String tableName) + throws Exception { + long totalDanglingSeen = 0; + if (tableName == null) { + for (var dataLevel : Ample.DataLevel.values()) { + try (var tablets = context.getAmple().readTablets().forLevel(dataLevel).build()) { + totalDanglingSeen += printDanglingFateOperations(context, tablets); + } + } + } else { + var tableId = context.getTableId(tableName); + try (var tablets = context.getAmple().readTablets().forTable(tableId).build()) { + totalDanglingSeen += printDanglingFateOperations(context, tablets); + } + } + + System.out.printf("\nFound %,d dangling references to fate operations\n", totalDanglingSeen); + + return totalDanglingSeen; + } + + private static long printDanglingFateOperations(ServerContext context, + Iterable<TabletMetadata> tablets) throws Exception { + Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup = extents -> { + try (var lookedupTablets = + context.getAmple().readTablets().forTablets(extents, Optional.empty()).build()) { + Map<KeyExtent,TabletMetadata> tabletMap = new HashMap<>(); + lookedupTablets + .forEach(tabletMetadata -> tabletMap.put(tabletMetadata.getExtent(), tabletMetadata)); + return tabletMap; + } + }; + + UserFateStore<?> ufs = new UserFateStore<>(context); + MetaFateStore<?> mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, + context.getZooReaderWriter()); + LoadingCache<FateId,ReadOnlyFateStore.TStatus> fateStatusCache = Caffeine.newBuilder() + .maximumSize(100_000).expireAfterWrite(10, TimeUnit.SECONDS).build(fateId -> { + if (fateId.getType() == FateInstanceType.META) { + return mfs.read(fateId).getStatus(); + } else { + return ufs.read(fateId).getStatus(); + } + }); + + Predicate<FateId> activePredicate = fateId -> { + var status = fateStatusCache.get(fateId); + switch (status) { + case NEW: + case IN_PROGRESS: + case SUBMITTED: + case FAILED_IN_PROGRESS: + return true; + case FAILED: + case SUCCESSFUL: + case UNKNOWN: + return false; + default: + throw new IllegalStateException("Unexpected status: " + status); + } + }; + + AtomicLong danglingSeen = new AtomicLong(); + BiConsumer<KeyExtent,Set<FateId>> danglingConsumer = (extent, fateIds) -> { + danglingSeen.addAndGet(fateIds.size()); + fateIds.forEach(fateId -> System.out.println(fateId + " " + extent)); + }; + + findDanglingFateOperations(tablets, tabletLookup, activePredicate, danglingConsumer, 10_000); + return danglingSeen.get(); + } + + /** + * Finds tablets that point to fate operations that do not exists or are complete. + * + * @param tablets the tablets to inspect + * @param tabletLookup a function that can lookup a tablets latest metadata + * @param activePredicate a predicate that can determine if a fate id is currently active + * @param danglingConsumer a consumer that tablets with inactive fate ids will be sent to + */ + static void findDanglingFateOperations(Iterable<TabletMetadata> tablets, + Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup, + Predicate<FateId> activePredicate, BiConsumer<KeyExtent,Set<FateId>> danglingConsumer, + int bufferSize) { + + ArrayList<FateId> fateIds = new ArrayList<>(); + Map<KeyExtent,Set<FateId>> candidates = new HashMap<>(); + for (TabletMetadata tablet : tablets) { + fateIds.clear(); + getAllFateIds(tablet, fateIds::add); + fateIds.removeIf(activePredicate); + if (!fateIds.isEmpty()) { + candidates.put(tablet.getExtent(), new HashSet<>(fateIds)); + if (candidates.size() > bufferSize) { + processCandidates(candidates, tabletLookup, danglingConsumer); + candidates.clear(); + } + } + } + + processCandidates(candidates, tabletLookup, danglingConsumer); + } + + private static void processCandidates(Map<KeyExtent,Set<FateId>> candidates, + Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup, + BiConsumer<KeyExtent,Set<FateId>> danglingConsumer) { + // Perform a 2nd check of the tablet to avoid race conditions like the following. + // 1. THREAD 1 : TabletMetadata is read and points to active fate operation + // 2. THREAD 2 : The fate operation is deleted from the tablet + // 3. THREAD 2 : The fate operation completes + // 4. THREAD 1 : Checks if the fate operation read in step 1 is active and finds it is not + + Map<KeyExtent,TabletMetadata> currentTablets = tabletLookup.apply(candidates.keySet()); + HashSet<FateId> currentFateIds = new HashSet<>(); + candidates.forEach((extent, fateIds) -> { + var currentTablet = currentTablets.get(extent); + if (currentTablet != null) { + currentFateIds.clear(); + getAllFateIds(currentTablet, currentFateIds::add); + // Only keep fate ids that are still present in the tablet. Any new fate ids in + // currentFateIds that were not seen on the first pass are not considered here. To check + // those new ones, the entire two-step process would need to be rerun. + fateIds.retainAll(currentFateIds); + + if (!fateIds.isEmpty()) { + // the fateIds in this set were found to be inactive and still exist in the tablet + // metadata after being found inactive + danglingConsumer.accept(extent, fateIds); + } + } // else the tablet no longer exist so nothing to report + }); + } + + /** + * Extracts all fate ids that a tablet points to from any field. + */ + private static void getAllFateIds(TabletMetadata tabletMetadata, + Consumer<FateId> fateIdConsumer) { + tabletMetadata.getLoaded().values().forEach(fateIdConsumer); + if (tabletMetadata.getSelectedFiles() != null) { + fateIdConsumer.accept(tabletMetadata.getSelectedFiles().getFateId()); + } + if (tabletMetadata.getOperationId() != null) { + fateIdConsumer.accept(tabletMetadata.getOperationId().getFateId()); + } + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java index cc69b67ffa..d09436e5bd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.TServerInstance; @@ -117,8 +118,9 @@ public class FindOfflineTablets { Set<TServerInstance> liveTServers = tservers.getCurrentServers(); TabletState state = TabletState.compute(tabletMetadata, liveTServers); if (state != null && state != TabletState.HOSTED + && tabletMetadata.getTabletAvailability() == TabletAvailability.HOSTED && context.getTableManager().getTableState(tabletMetadata.getTableId()) - != TableState.OFFLINE) { + == TableState.ONLINE) { System.out.println(tabletMetadata.getExtent() + " is " + state + " #walogs:" + tabletMetadata.getLogs().size()); offline++; diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java index 2e6754176e..714d3fafe3 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java @@ -18,16 +18,40 @@ */ package org.apache.accumulo.server.util; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.hadoop.fs.Path; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; @@ -93,4 +117,105 @@ public class AdminTest { EasyMock.verify(zc); } + @Test + public void testDanglingFate() { + KeyExtent[] extents = new KeyExtent[10]; + for (int i = 0; i < extents.length; i++) { + extents[i] = new KeyExtent(TableId.of("" + i), null, null); + } + + FateId[] fateIds = new FateId[10]; + for (int i = 0; i < fateIds.length; i++) { + fateIds[i] = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + } + + var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateIds[0]); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateIds[1]); + var opid3 = TabletOperationId.from(TabletOperationType.MERGING, fateIds[5]); + + var files = Set.of(StoredTabletFile.of(new Path("file:///accumulo/tables/4/t-1/f4.rf"))); + var sf1 = + new SelectedFiles(files, true, fateIds[6], SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + var sf2 = + new SelectedFiles(files, true, fateIds[7], SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + + var tm1 = TabletMetadata.builder(extents[0]).putOperation(opid1).build(LOADED, SELECTED); + var tm2 = TabletMetadata.builder(extents[1]).putOperation(opid2).build(LOADED, SELECTED); + var tm3 = TabletMetadata.builder(extents[2]) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/t-1/f1.rf")), + fateIds[2]) + .build(OPID, SELECTED); + var tm4 = TabletMetadata.builder(extents[3]) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/2/t-1/f2.rf")), + fateIds[3]) + .build(OPID, SELECTED); + var tm5 = TabletMetadata.builder(extents[4]) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/3/t-1/f3.rf")), + fateIds[4]) + .putOperation(opid3).build(SELECTED); + var tm6 = TabletMetadata.builder(extents[5]).putSelectedFiles(sf1).build(OPID, LOADED); + var tm7 = TabletMetadata.builder(extents[6]).putSelectedFiles(sf2).build(OPID, LOADED); + + var tablets1 = Map.of(tm1.getExtent(), tm1, tm2.getExtent(), tm2, tm3.getExtent(), tm3, + tm4.getExtent(), tm4, tm5.getExtent(), tm5, tm6.getExtent(), tm6, tm7.getExtent(), tm7); + var tablets2 = new HashMap<>(tablets1); + var found = new HashMap<KeyExtent,Set<FateId>>(); + Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup = lookups -> { + var results = new HashMap<KeyExtent,TabletMetadata>(); + lookups.forEach(extent -> { + assertTrue(tablets1.containsKey(extent)); + if (tablets2.containsKey(extent)) { + results.put(extent, tablets2.get(extent)); + } + }); + return results; + }; + + // run test where every fate id is considered inactive + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, fateId -> false, found::put, + 3); + assertEquals(Map.of(tm1.getExtent(), Set.of(fateIds[0]), tm2.getExtent(), Set.of(fateIds[1]), + tm3.getExtent(), Set.of(fateIds[2]), tm4.getExtent(), Set.of(fateIds[3]), tm5.getExtent(), + Set.of(fateIds[4], fateIds[5]), tm6.getExtent(), Set.of(fateIds[6]), tm7.getExtent(), + Set.of(fateIds[7])), found); + + // run test where some of the fate ids are active + Set<FateId> active = Set.of(fateIds[0], fateIds[2], fateIds[4], fateIds[6]); + found.clear(); + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, active::contains, found::put, + 3); + assertEquals(Map.of(tm2.getExtent(), Set.of(fateIds[1]), tm4.getExtent(), Set.of(fateIds[3]), + tm5.getExtent(), Set.of(fateIds[5]), tm7.getExtent(), Set.of(fateIds[7])), found); + + // run test where tablets change on 2nd read simulating race condition + var tm2_1 = TabletMetadata.builder(tm2.getExtent()).build(OPID, LOADED, SELECTED); + var tm4_1 = TabletMetadata.builder(tm4.getExtent()) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/2/t-1/f2.rf")), + fateIds[8]) + .build(OPID, SELECTED); + tablets2.put(tm2_1.getExtent(), tm2_1); + tablets2.put(tm4_1.getExtent(), tm4_1); + tablets2.remove(tm5.getExtent()); + found.clear(); + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, active::contains, found::put, + 3); + assertEquals(Map.of(tm7.getExtent(), Set.of(fateIds[7])), found); + found.clear(); + + // run a test where all are active on second look + var tm7_1 = TabletMetadata.builder(tm7.getExtent()).putSelectedFiles(sf1).build(OPID, LOADED); + tablets2.put(tm7_1.getExtent(), tm7_1); + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, active::contains, found::put, + 3); + assertEquals(Map.of(), found); + + // run a test where all active on the first look + active = Arrays.stream(fateIds).collect(Collectors.toSet()); + found.clear(); + Admin.findDanglingFateOperations(tablets1.values(), le -> { + assertTrue(le.isEmpty()); + return Map.of(); + }, active::contains, found::put, 3); + assertEquals(Map.of(), found); + } }