This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5c5150242d Stop waiting on fate when reporting compaction complete (#4339) 5c5150242d is described below commit 5c5150242d160160bfcf8aed3cfe6fdcefd81da5 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Mar 8 19:58:45 2024 -0500 Stop waiting on fate when reporting compaction complete (#4339) Compactors were waiting on the Fate transaction that does compaction commit. They were doing this so that the dead compaction detector would not kill the compaction during commit. This change removes the need for compactors to wait on compaction commit, freeing them to run another compaction. To accomplish this the dead compaction detector was modified to look for fate operations commiting compactions. This work leveraged the new FateKey. A bonus for this changes is that duplicate RPC messages reporting a compaction as completed will no longer spin up duplicate Fate operations to commit. The Fate operations would have likely been harmless, but would have consumed resources. A new method was added to Fate to list transactions with a certain FateKeyType. Tests were added for this new Fate method. An integration test was added to ensure the dead compaction detector does not delete compactions that no compactors knows of but do have a Fate transaction committing them. --- .../java/org/apache/accumulo/core/fate/Fate.java | 8 ++ .../accumulo/core/fate/ReadOnlyFateStore.java | 5 ++ .../org/apache/accumulo/core/fate/ZooStore.java | 6 ++ .../accumulo/core/fate/accumulo/AccumuloStore.java | 14 ++++ .../apache/accumulo/core/logging/FateLogger.java | 5 ++ .../org/apache/accumulo/core/fate/TestStore.java | 5 ++ .../coordinator/CompactionCoordinator.java | 33 ++------ .../coordinator/DeadCompactionDetector.java | 97 ++++++++++++++++++++-- .../coordinator/commit/PutGcCandidates.java | 11 +-- .../coordinator/commit/RefreshTablet.java | 7 +- .../test/compaction/ExternalCompaction_1_IT.java | 97 ++++++++++++++++++++++ .../accumulo/test/fate/accumulo/FateStoreIT.java | 65 +++++++++++++++ 12 files changed, 318 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 0b82f73f11..e5be68dbb2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Stream; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -482,6 +483,13 @@ public class Fate<T> { } } + /** + * Lists transctions for a given fate key type. + */ + public Stream<FateKey> list(FateKey.FateKeyType type) { + return store.list(type); + } + /** * Initiates shutdown of background threads and optionally waits on them. */ diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index bdbb7739f9..b2aa4999b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -138,6 +138,11 @@ public interface ReadOnlyFateStore<T> { */ Stream<FateIdStatus> list(); + /** + * list transaction in the store that have a given fate key type. + */ + Stream<FateKey> list(FateKey.FateKeyType type); + /** * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids * that are found are passed to the consumer. This method will block until at least one runnable diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 6813e727c5..af6fd233de 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -377,6 +377,12 @@ public class ZooStore<T> extends AbstractFateStore<T> { } } + @Override + public Stream<FateKey> list(FateKey.FateKeyType type) { + return getTransactions().flatMap(fis -> getKey(fis.getFateId()).stream()) + .filter(fateKey -> fateKey.getType() == type); + } + protected static class NodeValue { final TStatus status; final Optional<FateKey> fateKey; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 0d65f3e5d1..24ccb43baf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -173,6 +173,20 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { } } + @Override + public Stream<FateKey> list(FateKey.FateKeyType type) { + try { + Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(new Range()); + TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); + return scanner.stream().onClose(scanner::close) + .map(e -> FateKey.deserialize(e.getValue().get())) + .filter(fateKey -> fateKey.getType() == type); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + @Override protected TStatus _getStatus(FateId fateId) { return scanTx(scanner -> { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index d646389f92..a3c1b9cfd9 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -120,6 +120,11 @@ public class FateLogger { return store.list(); } + @Override + public Stream<FateKey> list(FateKey.FateKeyType type) { + return store.list(type); + } + @Override public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) { store.runnable(keepWaiting, idConsumer); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 6c69de60ef..50046a4b9b 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -216,6 +216,11 @@ public class TestStore implements FateStore<String> { }); } + @Override + public Stream<FateKey> list(FateKey.FateKeyType type) { + throw new UnsupportedOperationException(); + } + @Override public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) { throw new UnsupportedOperationException(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index d36ec662d5..2a12f08708 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -71,6 +71,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -202,7 +203,8 @@ public class CompactionCoordinator tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) .maximumWeight(10485760L).weigher(weigher).build(); - deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor); + deadCompactionDetector = + new DeadCompactionDetector(this.ctx, this, schedExecutor, fateInstances); // At this point the manager does not have its lock so no actions should be taken yet } @@ -684,31 +686,14 @@ public class CompactionCoordinator return; } + // Start a fate transaction to commit the compaction. CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats)); + var txid = localFate.seedTransaction("COMMIT_COMPACTION", FateKey.forCompactionCommit(ecid), + renameOp, true, "Commit compaction " + ecid); - // ELASTICITY_TODO add tag to fate that ECID can be added to. This solves two problem. First it - // defends against starting multiple fate txs for the same thing. This will help the split code - // also. Second the tag can be used by the dead compaction detector to ignore committing - // compactions. The imple coould hash the key to produce the fate tx id. - var txid = localFate.startTransaction(); - localFate.seedTransaction("COMMIT_COMPACTION", txid, renameOp, true, - "Commit compaction " + ecid); - - // ELASTICITY_TODO need to remove this wait. It is here because when the dead compaction - // detector ask a compactor what its currently running it expects that cover commit. To remove - // this wait would need another way for the dead compaction detector to know about committing - // compactions. Could add a tag to the fate tx with the ecid and have dead compaction detector - // scan these tags. This wait makes the code running in fate not be fault tolerant because in - // the - // case of faults the dead compaction detector may remove the compaction entry. - localFate.waitForCompletion(txid); - - // It's possible that RUNNING might not have an entry for this ecid in the case - // of a coordinator restart when the Coordinator can't find the TServer for the - // corresponding external compaction. - recordCompletion(ecid); - // ELASTICITY_TODO should above call move into fate code? + txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {} {}", ecid, fateId), + () -> LOG.debug("compaction commit already initiated for {}", ecid)); } @Override @@ -844,7 +829,7 @@ public class CompactionCoordinator } } - private void recordCompletion(ExternalCompactionId ecid) { + public void recordCompletion(ExternalCompactionId ecid) { var rc = RUNNING_CACHE.remove(ecid); if (rc != null) { completed.put(ecid, rc); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index 1de507f2a3..0857385239 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -27,11 +27,16 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -39,6 +44,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.manager.Manager; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.FindCompactionTmpFiles; import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats; @@ -55,13 +61,16 @@ public class DeadCompactionDetector { private final ScheduledThreadPoolExecutor schedExecutor; private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions; private final Set<TableId> tablesWithUnreferencedTmpFiles = new HashSet<>(); + private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator, - ScheduledThreadPoolExecutor stpe) { + ScheduledThreadPoolExecutor stpe, + AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) { this.context = context; this.coordinator = coordinator; this.schedExecutor = stpe; this.deadCompactions = new ConcurrentHashMap<>(); + this.fateInstances = fateInstances; } public void addTableId(TableId tableWithUnreferencedTmpFiles) { @@ -72,12 +81,69 @@ public class DeadCompactionDetector { private void detectDeadCompactions() { - // The order of obtaining information is very important to avoid race conditions. - + /* + * The order of obtaining information is very important to avoid race conditions. This algorithm + * ask compactors for information about what they are running. Compactors do the following. + * + * 1. Generate a compaction UUID. + * + * 2. Set the UUID as what they are currently working. This is reported to any other process + * that ask, like this dead compaction detection code. + * + * 3. Request work from the coordinator under the UUID. The coordinator will use this UUID to + * create a compaction entry in the metadata table. + * + * 4. Run the compaction + * + * 5. Ask the coordinator to commit the compaction. The coordinator will seed the fate operation + * that commits the compaction. + * + * 6. Clear the UUID they are currently working on. + * + * Given the fact that compactors report they are running a UUID until after its been seeded in + * fate, we can deduce the following for compactions that succeed. + * + * - There is time range from T1 to T2 where only the compactor will report a UUID. + * + * - There is a time range T2 to T3 where compactor and fate will report a UUID. + * + * - There is a time range T3 to T4 where only fate will report a UUID + * + * - After time T4 the compaction is complete and nothing will report the UUID + * + * This algorithm does the following. + * + * 1. Scan the metadata table looking for compaction UUIDs + * + * 2. Ask compactors what they are running + * + * 3. Ask Fate what compactions its committing. + * + * 4. Consider anything it saw in the metadata table that compactors or fate did not report as a + * possible dead compaction. + * + * When we see a compaction id in the metadata table, then we know we are already at time + * greater than T1 because the compactor generates and advertises ids prior to placing them in + * the metadata table. + * + * If this process ask a compactor if it's running a compaction uuid and it says yes, then that + * implies we are in the time range T1 to T3. + * + * If this process ask a compactor if it's running a compaction uuid and it says no, then that + * implies we are in the time range >T3 defined above. So if the compaction is still active then + * it will be reported by fate. If the time is >T4, then the compaction is finished and not + * dead. + * + * If a time gap existed between when a compactor reported and when fate reported, then it could + * result in false positives for dead compaction detection. If fate was queried before + * compactors, then it could result in false positives. If compactors were queried before the + * metadata table, then it could cause false positives. + */ log.debug("Starting to look for dead compactions"); + // ELASTICITY_TODO not looking for dead compactions in the metadata table Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>(); - // + // find what external compactions tablets think are running try (TabletsMetadata tabletsMetadata = context.getAmple().readTablets().forLevel(DataLevel.USER) .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, ColumnType.PREV_ROW) @@ -111,7 +177,7 @@ public class DeadCompactionDetector { Collection<ExternalCompactionId> running = ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); - running.forEach((ecid) -> { + running.forEach(ecid -> { if (tabletCompactions.remove(ecid) != null) { log.debug("Ignoring compaction {} that is running on a compactor", ecid); } @@ -120,6 +186,27 @@ public class DeadCompactionDetector { } }); + if (!tabletCompactions.isEmpty()) { + // look for any compactions committing in fate and remove those + var fateMap = fateInstances.get(); + if (fateMap == null) { + log.warn("Fate is not present, can not look for dead compactions"); + return; + } + // ELASTICITY_TODO need to handle metadata + var fate = fateMap.get(FateInstanceType.USER); + try (Stream<FateKey> keyStream = fate.list(FateKey.FateKeyType.COMPACTION_COMMIT)) { + keyStream.map(fateKey -> fateKey.getCompactionId().orElseThrow()).forEach(ecid -> { + if (tabletCompactions.remove(ecid) != null) { + log.debug("Ignoring compaction {} that is committing in a fate", ecid); + } + if (this.deadCompactions.remove(ecid) != null) { + log.debug("Removed {} from the dead compaction map, it's committing in fate", ecid); + } + }); + } + } + tabletCompactions.forEach((ecid, extent) -> { log.info("Possible dead compaction detected {} {}", ecid, extent); this.deadCompactions.merge(ecid, 1L, Long::sum); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java index b0cefe3dfe..3e5b62caa9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java @@ -20,7 +20,7 @@ package org.apache.accumulo.manager.compaction.coordinator.commit; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; @@ -41,12 +41,13 @@ public class PutGcCandidates extends ManagerRepo { manager.getContext().getAmple().putGcCandidates(commitData.getTableId(), commitData.getJobFiles()); - if (commitData.kind == CompactionKind.USER || refreshLocation == null) { - // user compactions will refresh tablets as part of the FATE operation driving the user - // compaction, so no need to do it here + if (refreshLocation == null) { + manager.getCompactionCoordinator().recordCompletion(ExternalCompactionId.of(commitData.ecid)); return null; } - return new RefreshTablet(commitData.textent, refreshLocation); + // For user initiated table compactions, the fate operation will refresh tablets. Can also + // refresh as part of this compaction commit as it may run sooner. + return new RefreshTablet(commitData.ecid, commitData.textent, refreshLocation); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java index 9cc486411b..bf5900ad4a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; @@ -38,8 +39,10 @@ public class RefreshTablet extends ManagerRepo { private static final long serialVersionUID = 1L; private final TKeyExtent extent; private final String tserverInstance; + private final String compactionId; - public RefreshTablet(TKeyExtent extent, String tserverInstance) { + public RefreshTablet(String ecid, TKeyExtent extent, String tserverInstance) { + this.compactionId = ecid; this.extent = extent; this.tserverInstance = tserverInstance; } @@ -60,6 +63,8 @@ public class RefreshTablet extends ManagerRepo { executorService.shutdownNow(); } + manager.getCompactionCoordinator().recordCompletion(ExternalCompactionId.of(compactionId)); + return null; } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 5d3d7b18fe..6a8dbd5040 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -48,6 +48,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; @@ -69,20 +71,31 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.FindCompactionTmpFiles; import org.apache.accumulo.test.functional.CompactionIT.ErrorThrowingSelector; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -206,6 +219,90 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } } + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate. + */ + @Test + public void testCompactionCommitAndDeadDetection() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + final String tableName = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + + c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); + writeData(c, tableName); + c.tableOperations().flush(tableName, null, null, true); + + var ctx = getCluster().getServerContext(); + var tableId = ctx.getTableId(tableName); + + // Create two random compaction ids + var cids = List.of(ExternalCompactionId.generate(UUID.randomUUID()), + ExternalCompactionId.generate(UUID.randomUUID())); + AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx); + + // Create a fate transaction for one of the compaction ids that is in the new state, it should + // never run. Its purpose is to prevent the dead compaction detector from deleting the id. + FateStore.FateTxStore<Manager> fateTx = + accumuloStore.createAndReserve(FateKey.forCompactionCommit(cids.get(0))).orElseThrow(); + var fateId = fateTx.getID(); + fateTx.unreserve(0, TimeUnit.MILLISECONDS); + + // Read the tablet metadata + var tabletsMeta = ctx.getAmple().readTablets().forTable(tableId).build().stream() + .collect(Collectors.toList()); + assertEquals(2, tabletsMeta.size()); + + // Insert fake compaction entries in the metadata table. No compactor will report ownership of + // these, so they should look like dead compactions and be removed. However, one of them has + // an associated fate tx that should prevent its removal. + try (var mutator = ctx.getAmple().mutateTablets()) { + for (int i = 0; i < tabletsMeta.size(); i++) { + var tabletMeta = tabletsMeta.get(0); + var tabletDir = + tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent(); + var tmpFile = new Path(tabletDir, "C1234.rf_tmp"); + + CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(), + ReferencedTabletFile.of(tmpFile), "localhost:16789", CompactionKind.SYSTEM, + (short) 10, CompactorGroupId.of(GROUP1), false, null); + + mutator.mutateTablet(tabletMeta.getExtent()).putExternalCompaction(cids.get(i), cm) + .mutate(); + } + } + + // Wait until the compaction id w/o a fate transaction is removed, should still see the one + // with a fate transaction + Wait.waitFor(() -> { + Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId) + .build().stream().map(TabletMetadata::getExternalCompactions) + .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); + System.out.println("currentIds1:" + currentIds); + assertTrue(currentIds.size() == 1 || currentIds.size() == 2); + return currentIds.equals(Set.of(cids.get(0))); + }); + + // Delete the fate transaction, should allow the dead compaction detector to clean up the + // remaining external compaction id + fateTx = accumuloStore.reserve(fateId); + fateTx.delete(); + fateTx.unreserve(0, TimeUnit.MILLISECONDS); + + // wait for the remaining compaction id to be removed + Wait.waitFor(() -> { + Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId) + .build().stream().map(TabletMetadata::getExternalCompactions) + .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); + System.out.println("currentIds2:" + currentIds); + assertTrue(currentIds.size() <= 1); + return currentIds.isEmpty(); + }); + } + } + @Test public void testCompactionAndCompactorDies() throws Exception { String table1 = this.getUniqueNames(1)[0]; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java index 3edd8717e9..63e8d64703 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java @@ -29,8 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.reflect.Method; import java.time.Duration; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -38,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -47,6 +50,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.StackOverflowException; @@ -421,6 +425,67 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT } + @Test + public void testListFateKeys() throws Exception { + executeTest(this::testListFateKeys); + } + + protected void testListFateKeys(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + + // this should not be seen when listing by key type because it has no key + var id1 = store.create(); + + TableId tid1 = TableId.of("test"); + var extent1 = new KeyExtent(tid1, new Text("m"), null); + var extent2 = new KeyExtent(tid1, null, new Text("m")); + var fateKey1 = FateKey.forSplit(extent1); + var fateKey2 = FateKey.forSplit(extent2); + + var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); + + assertNotEquals(cid1, cid2); + + var fateKey3 = FateKey.forCompactionCommit(cid1); + var fateKey4 = FateKey.forCompactionCommit(cid2); + + Map<FateKey,FateId> fateKeyIds = new HashMap<>(); + for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) { + var fateTx = store.createAndReserve(fateKey).orElseThrow(); + fateKeyIds.put(fateKey, fateTx.getID()); + fateTx.unreserve(0, TimeUnit.MILLISECONDS); + } + + HashSet<FateId> allIds = new HashSet<>(); + allIds.addAll(fateKeyIds.values()); + allIds.add(id1); + assertEquals(allIds, store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + assertEquals(5, allIds.size()); + + assertEquals(4, fateKeyIds.size()); + assertEquals(4, fateKeyIds.values().stream().distinct().count()); + + HashSet<KeyExtent> seenExtents = new HashSet<>(); + store.list(FateKey.FateKeyType.SPLIT).forEach(fateKey -> { + assertEquals(FateKey.FateKeyType.SPLIT, fateKey.getType()); + assertNotNull(fateKeyIds.remove(fateKey)); + assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow())); + }); + + assertEquals(2, fateKeyIds.size()); + assertEquals(Set.of(extent1, extent2), seenExtents); + + HashSet<ExternalCompactionId> seenCids = new HashSet<>(); + store.list(FateKey.FateKeyType.COMPACTION_COMMIT).forEach(fateKey -> { + assertEquals(FateKey.FateKeyType.COMPACTION_COMMIT, fateKey.getType()); + assertNotNull(fateKeyIds.remove(fateKey)); + assertTrue(seenCids.add(fateKey.getCompactionId().orElseThrow())); + }); + + assertEquals(0, fateKeyIds.size()); + assertEquals(Set.of(cid1, cid2), seenCids); + } + // create(fateKey) method is private so expose for testing to check error states @SuppressWarnings("unchecked") protected Optional<FateId> create(FateStore<TestEnv> store, FateKey fateKey) throws Exception {