This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 22621ea501 Update dead compaction detector to handle metadata/root (#4354) 22621ea501 is described below commit 22621ea50190389131146b92546bf5b9c2ab47cf Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Wed Mar 13 11:43:02 2024 -0400 Update dead compaction detector to handle metadata/root (#4354) This updates DeadCompactionDetector to also look at the root and metadata table metadata for dead compactions on those tables This closes #4340 Co-authored-by: Keith Turner <ktur...@apache.org> --- .../coordinator/CompactionCoordinator.java | 19 +- .../coordinator/DeadCompactionDetector.java | 20 +- .../test/compaction/ExternalCompaction_1_IT.java | 231 +++++++++++++++------ 3 files changed, 197 insertions(+), 73 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 2a12f08708..97397c019b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -20,6 +20,9 @@ package org.apache.accumulo.manager.compaction.coordinator; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; @@ -81,6 +84,7 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -577,7 +581,7 @@ public class CompactionCoordinator var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(), dfv.getTime()); - }).collect(Collectors.toList()); + }).collect(toList()); FateInstanceType type = FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId()); FateId fateId = FateId.from(type, 0); @@ -707,10 +711,19 @@ public class CompactionCoordinator KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent); final var ecid = ExternalCompactionId.of(externalCompactionId); - compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); } - void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) { + void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) { + // Need to process each level by itself because the conditional tablet mutator does not support + // mutating multiple data levels at the same time + compactions.entrySet().stream() + .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()), + Collectors.toMap(Entry::getKey, Entry::getValue))) + .forEach((level, compactionsByLevel) -> compactionFailedForLevel(compactionsByLevel)); + } + + void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent> compactions) { try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { compactions.forEach((ecid, extent) -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index 0857385239..87a4cef78b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; @@ -141,13 +142,17 @@ public class DeadCompactionDetector { */ log.debug("Starting to look for dead compactions"); - // ELASTICITY_TODO not looking for dead compactions in the metadata table Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>(); // find what external compactions tablets think are running - try (TabletsMetadata tabletsMetadata = context.getAmple().readTablets().forLevel(DataLevel.USER) - .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, ColumnType.PREV_ROW) - .build()) { + try (Stream<TabletMetadata> tabletsMetadata = Stream + // Listing the data levels vs using DataLevel.values() prevents unexpected + // behavior if a new DataLevel is added + .of(DataLevel.ROOT, DataLevel.METADATA, DataLevel.USER) + .map(dataLevel -> context.getAmple().readTablets().forLevel(dataLevel) + .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, ColumnType.PREV_ROW) + .build()) + .flatMap(TabletsMetadata::stream)) { tabletsMetadata.forEach(tm -> { tm.getExternalCompactions().keySet().forEach(ecid -> { tabletCompactions.put(ecid, tm.getExtent()); @@ -193,9 +198,8 @@ public class DeadCompactionDetector { log.warn("Fate is not present, can not look for dead compactions"); return; } - // ELASTICITY_TODO need to handle metadata - var fate = fateMap.get(FateInstanceType.USER); - try (Stream<FateKey> keyStream = fate.list(FateKey.FateKeyType.COMPACTION_COMMIT)) { + try (Stream<FateKey> keyStream = fateMap.values().stream() + .flatMap(fate -> fate.list(FateKey.FateKeyType.COMPACTION_COMMIT))) { keyStream.map(fateKey -> fateKey.getCompactionId().orElseThrow()).forEach(ecid -> { if (tabletCompactions.remove(ecid) != null) { log.debug("Ignoring compaction {} that is committing in a fate", ecid); @@ -222,7 +226,7 @@ public class DeadCompactionDetector { tabletCompactions.forEach((ecid, extent) -> { log.warn("Compaction believed to be dead, failing it: id: {}, extent: {}", ecid, extent); }); - coordinator.compactionFailed(tabletCompactions); + coordinator.compactionsFailed(tabletCompactions); this.deadCompactions.keySet().removeAll(toFail); } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 6a8dbd5040..16dd39b09f 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -71,14 +73,18 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -221,86 +227,187 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { /** * This test verifies the dead compaction detector does not remove compactions that are committing - * in fate. + * in fate for the Root table. */ @Test - public void testCompactionCommitAndDeadDetection() throws Exception { + public void testCompactionCommitAndDeadDetectionRoot() throws Exception { + var ctx = getCluster().getServerContext(); + FateStore<Manager> zkStore = + new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - final String tableName = getUniqueNames(1)[0]; + var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); + var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); + var fateId = createCompactionCommitAndDeadMetadata(c, zkStore, AccumuloTable.ROOT.tableName(), + allCids); + verifyCompactionCommitAndDead(zkStore, tableId, fateId, allCids.get(tableId)); + } + } + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate for the Metadata table. + */ + @Test + public void testCompactionCommitAndDeadDetectionMeta() throws Exception { + var ctx = getCluster().getServerContext(); + FateStore<Manager> zkStore = + new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + // Metadata table by default already has 2 tablets + var tableId = ctx.getTableId(AccumuloTable.METADATA.tableName()); + var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); + var fateId = createCompactionCommitAndDeadMetadata(c, zkStore, + AccumuloTable.METADATA.tableName(), allCids); + verifyCompactionCommitAndDead(zkStore, tableId, fateId, allCids.get(tableId)); + } + } + + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate for a User table. + */ + @Test + public void testCompactionCommitAndDeadDetectionUser() throws Exception { + var ctx = getCluster().getServerContext(); + final String tableName = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx); SortedSet<Text> splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); - c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); writeData(c, tableName); - c.tableOperations().flush(tableName, null, null, true); - var ctx = getCluster().getServerContext(); var tableId = ctx.getTableId(tableName); + var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); + var fateId = createCompactionCommitAndDeadMetadata(c, accumuloStore, tableName, allCids); + verifyCompactionCommitAndDead(accumuloStore, tableId, fateId, allCids.get(tableId)); + } + } - // Create two random compaction ids - var cids = List.of(ExternalCompactionId.generate(UUID.randomUUID()), - ExternalCompactionId.generate(UUID.randomUUID())); + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate when all data levels have compactions + */ + @Test + public void testCompactionCommitAndDeadDetectionAll() throws Exception { + var ctx = getCluster().getServerContext(); + final String userTable = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx); + FateStore<Manager> zkStore = + new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); - // Create a fate transaction for one of the compaction ids that is in the new state, it should - // never run. Its purpose is to prevent the dead compaction detector from deleting the id. - FateStore.FateTxStore<Manager> fateTx = - accumuloStore.createAndReserve(FateKey.forCompactionCommit(cids.get(0))).orElseThrow(); - var fateId = fateTx.getID(); - fateTx.unreserve(0, TimeUnit.MILLISECONDS); + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + c.tableOperations().create(userTable, new NewTableConfiguration().withSplits(splits)); + writeData(c, userTable); + + Map<TableId,FateId> fateIds = new HashMap<>(); + Map<TableId,List<ExternalCompactionId>> allCids = new HashMap<>(); + + // create compaction metadata for each data level to test + for (String tableName : List.of(AccumuloTable.ROOT.tableName(), + AccumuloTable.METADATA.tableName(), userTable)) { + var tableId = ctx.getTableId(tableName); + var fateStore = FateInstanceType.fromTableId(tableId) == FateInstanceType.USER + ? accumuloStore : zkStore; + fateIds.put(tableId, + createCompactionCommitAndDeadMetadata(c, fateStore, tableName, allCids)); + } - // Read the tablet metadata - var tabletsMeta = ctx.getAmple().readTablets().forTable(tableId).build().stream() - .collect(Collectors.toList()); + // verify the dead compaction was removed for each level + // but not the compaction associated with a fate id + for (Entry<TableId,FateId> entry : fateIds.entrySet()) { + var tableId = entry.getKey(); + var fateStore = FateInstanceType.fromTableId(tableId) == FateInstanceType.USER + ? accumuloStore : zkStore; + verifyCompactionCommitAndDead(fateStore, tableId, entry.getValue(), allCids.get(tableId)); + } + } + } + + private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c, + FateStore<Manager> fateStore, String tableName, + Map<TableId,List<ExternalCompactionId>> allCids) throws Exception { + var ctx = getCluster().getServerContext(); + c.tableOperations().flush(tableName, null, null, true); + var tableId = ctx.getTableId(tableName); + + allCids.put(tableId, List.of(ExternalCompactionId.generate(UUID.randomUUID()), + ExternalCompactionId.generate(UUID.randomUUID()))); + + // Create a fate transaction for one of the compaction ids that is in the new state, it + // should never run. Its purpose is to prevent the dead compaction detector + // from deleting the id. + FateStore.FateTxStore<Manager> fateTx = fateStore + .createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow(); + var fateId = fateTx.getID(); + fateTx.unreserve(0, TimeUnit.MILLISECONDS); + + // Read the tablet metadata + var tabletsMeta = ctx.getAmple().readTablets().forTable(tableId).build().stream() + .collect(Collectors.toList()); + // Root is always 1 tablet + if (!tableId.equals(AccumuloTable.ROOT.tableId())) { assertEquals(2, tabletsMeta.size()); + } - // Insert fake compaction entries in the metadata table. No compactor will report ownership of - // these, so they should look like dead compactions and be removed. However, one of them has - // an associated fate tx that should prevent its removal. - try (var mutator = ctx.getAmple().mutateTablets()) { - for (int i = 0; i < tabletsMeta.size(); i++) { - var tabletMeta = tabletsMeta.get(0); - var tabletDir = - tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent(); - var tmpFile = new Path(tabletDir, "C1234.rf_tmp"); - - CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(), - ReferencedTabletFile.of(tmpFile), "localhost:16789", CompactionKind.SYSTEM, - (short) 10, CompactorGroupId.of(GROUP1), false, null); - - mutator.mutateTablet(tabletMeta.getExtent()).putExternalCompaction(cids.get(i), cm) - .mutate(); - } + // Insert fake compaction entries in the metadata table. No compactor will report ownership + // of these, so they should look like dead compactions and be removed. However, one of + // them hasan associated fate tx that should prevent its removal. + try (var mutator = ctx.getAmple().mutateTablets()) { + for (int i = 0; i < tabletsMeta.size(); i++) { + var tabletMeta = tabletsMeta.get(0); + var tabletDir = + tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent(); + var tmpFile = new Path(tabletDir, "C1234.rf_tmp"); + + CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(), + ReferencedTabletFile.of(tmpFile), "localhost:16789", CompactionKind.SYSTEM, (short) 10, + CompactorGroupId.of(GROUP1), false, null); + + mutator.mutateTablet(tabletMeta.getExtent()) + .putExternalCompaction(allCids.get(tableId).get(i), cm).mutate(); } - - // Wait until the compaction id w/o a fate transaction is removed, should still see the one - // with a fate transaction - Wait.waitFor(() -> { - Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId) - .build().stream().map(TabletMetadata::getExternalCompactions) - .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); - System.out.println("currentIds1:" + currentIds); - assertTrue(currentIds.size() == 1 || currentIds.size() == 2); - return currentIds.equals(Set.of(cids.get(0))); - }); - - // Delete the fate transaction, should allow the dead compaction detector to clean up the - // remaining external compaction id - fateTx = accumuloStore.reserve(fateId); - fateTx.delete(); - fateTx.unreserve(0, TimeUnit.MILLISECONDS); - - // wait for the remaining compaction id to be removed - Wait.waitFor(() -> { - Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId) - .build().stream().map(TabletMetadata::getExternalCompactions) - .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); - System.out.println("currentIds2:" + currentIds); - assertTrue(currentIds.size() <= 1); - return currentIds.isEmpty(); - }); } + + return fateId; + } + + private void verifyCompactionCommitAndDead(FateStore<Manager> fateStore, TableId tableId, + FateId fateId, List<ExternalCompactionId> cids) { + var ctx = getCluster().getServerContext(); + + // Wait until the compaction id w/o a fate transaction is removed, should still see the one + // with a fate transaction + Wait.waitFor(() -> { + Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId).build() + .stream().map(TabletMetadata::getExternalCompactions) + .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); + System.out.println("currentIds1:" + currentIds); + assertTrue(currentIds.size() == 1 || currentIds.size() == 2); + return currentIds.equals(Set.of(cids.get(0))); + }); + + // Delete the fate transaction, should allow the dead compaction detector to clean up the + // remaining external compaction id + var fateTx = fateStore.reserve(fateId); + fateTx.delete(); + fateTx.unreserve(0, TimeUnit.MILLISECONDS); + + // wait for the remaining compaction id to be removed + Wait.waitFor(() -> { + Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId).build() + .stream().map(TabletMetadata::getExternalCompactions) + .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); + System.out.println("currentIds2:" + currentIds); + assertTrue(currentIds.size() <= 1); + return currentIds.isEmpty(); + }); } @Test