This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 22621ea501 Update dead compaction detector to handle metadata/root
(#4354)
22621ea501 is described below
commit 22621ea50190389131146b92546bf5b9c2ab47cf
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Mar 13 11:43:02 2024 -0400
Update dead compaction detector to handle metadata/root (#4354)
This updates DeadCompactionDetector to also look at the root and
metadata table metadata for dead compactions on those tables
This closes #4340
Co-authored-by: Keith Turner <[email protected]>
---
.../coordinator/CompactionCoordinator.java | 19 +-
.../coordinator/DeadCompactionDetector.java | 20 +-
.../test/compaction/ExternalCompaction_1_IT.java | 231 +++++++++++++++------
3 files changed, 197 insertions(+), 73 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 2a12f08708..97397c019b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -20,6 +20,9 @@ package org.apache.accumulo.manager.compaction.coordinator;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
@@ -81,6 +84,7 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -577,7 +581,7 @@ public class CompactionCoordinator
var dfv =
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(),
dfv.getNumEntries(),
dfv.getTime());
- }).collect(Collectors.toList());
+ }).collect(toList());
FateInstanceType type =
FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId());
FateId fateId = FateId.from(type, 0);
@@ -707,10 +711,19 @@ public class CompactionCoordinator
KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId,
fromThriftExtent);
final var ecid = ExternalCompactionId.of(externalCompactionId);
- compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
+ compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
}
- void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
+ void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
+ // Need to process each level by itself because the conditional tablet
mutator does not support
+ // mutating multiple data levels at the same time
+ compactions.entrySet().stream()
+ .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()),
+ Collectors.toMap(Entry::getKey, Entry::getValue)))
+ .forEach((level, compactionsByLevel) ->
compactionFailedForLevel(compactionsByLevel));
+ }
+
+ void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent>
compactions) {
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
compactions.forEach((ecid, extent) -> {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index 0857385239..87a4cef78b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
@@ -141,13 +142,17 @@ public class DeadCompactionDetector {
*/
log.debug("Starting to look for dead compactions");
- // ELASTICITY_TODO not looking for dead compactions in the metadata table
Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>();
// find what external compactions tablets think are running
- try (TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forLevel(DataLevel.USER)
- .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP,
ColumnType.PREV_ROW)
- .build()) {
+ try (Stream<TabletMetadata> tabletsMetadata = Stream
+ // Listing the data levels vs using DataLevel.values() prevents
unexpected
+ // behavior if a new DataLevel is added
+ .of(DataLevel.ROOT, DataLevel.METADATA, DataLevel.USER)
+ .map(dataLevel -> context.getAmple().readTablets().forLevel(dataLevel)
+ .filter(new
HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, ColumnType.PREV_ROW)
+ .build())
+ .flatMap(TabletsMetadata::stream)) {
tabletsMetadata.forEach(tm -> {
tm.getExternalCompactions().keySet().forEach(ecid -> {
tabletCompactions.put(ecid, tm.getExtent());
@@ -193,9 +198,8 @@ public class DeadCompactionDetector {
log.warn("Fate is not present, can not look for dead compactions");
return;
}
- // ELASTICITY_TODO need to handle metadata
- var fate = fateMap.get(FateInstanceType.USER);
- try (Stream<FateKey> keyStream =
fate.list(FateKey.FateKeyType.COMPACTION_COMMIT)) {
+ try (Stream<FateKey> keyStream = fateMap.values().stream()
+ .flatMap(fate ->
fate.list(FateKey.FateKeyType.COMPACTION_COMMIT))) {
keyStream.map(fateKey ->
fateKey.getCompactionId().orElseThrow()).forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.debug("Ignoring compaction {} that is committing in a fate",
ecid);
@@ -222,7 +226,7 @@ public class DeadCompactionDetector {
tabletCompactions.forEach((ecid, extent) -> {
log.warn("Compaction believed to be dead, failing it: id: {}, extent:
{}", ecid, extent);
});
- coordinator.compactionFailed(tabletCompactions);
+ coordinator.compactionsFailed(tabletCompactions);
this.deadCompactions.keySet().removeAll(toFail);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 6a8dbd5040..16dd39b09f 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
@@ -71,14 +73,18 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ZooStore;
import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
import org.apache.accumulo.core.iterators.DevNull;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -221,86 +227,187 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
/**
* This test verifies the dead compaction detector does not remove
compactions that are committing
- * in fate.
+ * in fate for the Root table.
*/
@Test
- public void testCompactionCommitAndDeadDetection() throws Exception {
+ public void testCompactionCommitAndDeadDetectionRoot() throws Exception {
+ var ctx = getCluster().getServerContext();
+ FateStore<Manager> zkStore =
+ new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE,
ctx.getZooReaderWriter());
+
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
- final String tableName = getUniqueNames(1)[0];
+ var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName());
+ var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
+ var fateId = createCompactionCommitAndDeadMetadata(c, zkStore,
AccumuloTable.ROOT.tableName(),
+ allCids);
+ verifyCompactionCommitAndDead(zkStore, tableId, fateId,
allCids.get(tableId));
+ }
+ }
+ /**
+ * This test verifies the dead compaction detector does not remove
compactions that are committing
+ * in fate for the Metadata table.
+ */
+ @Test
+ public void testCompactionCommitAndDeadDetectionMeta() throws Exception {
+ var ctx = getCluster().getServerContext();
+ FateStore<Manager> zkStore =
+ new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE,
ctx.getZooReaderWriter());
+
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ // Metadata table by default already has 2 tablets
+ var tableId = ctx.getTableId(AccumuloTable.METADATA.tableName());
+ var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
+ var fateId = createCompactionCommitAndDeadMetadata(c, zkStore,
+ AccumuloTable.METADATA.tableName(), allCids);
+ verifyCompactionCommitAndDead(zkStore, tableId, fateId,
allCids.get(tableId));
+ }
+ }
+
+ /**
+ * This test verifies the dead compaction detector does not remove
compactions that are committing
+ * in fate for a User table.
+ */
+ @Test
+ public void testCompactionCommitAndDeadDetectionUser() throws Exception {
+ var ctx = getCluster().getServerContext();
+ final String tableName = getUniqueNames(1)[0];
+
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx);
SortedSet<Text> splits = new TreeSet<>();
splits.add(new Text(row(MAX_DATA / 2)));
-
c.tableOperations().create(tableName, new
NewTableConfiguration().withSplits(splits));
writeData(c, tableName);
- c.tableOperations().flush(tableName, null, null, true);
- var ctx = getCluster().getServerContext();
var tableId = ctx.getTableId(tableName);
+ var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
+ var fateId = createCompactionCommitAndDeadMetadata(c, accumuloStore,
tableName, allCids);
+ verifyCompactionCommitAndDead(accumuloStore, tableId, fateId,
allCids.get(tableId));
+ }
+ }
- // Create two random compaction ids
- var cids = List.of(ExternalCompactionId.generate(UUID.randomUUID()),
- ExternalCompactionId.generate(UUID.randomUUID()));
+ /**
+ * This test verifies the dead compaction detector does not remove
compactions that are committing
+ * in fate when all data levels have compactions
+ */
+ @Test
+ public void testCompactionCommitAndDeadDetectionAll() throws Exception {
+ var ctx = getCluster().getServerContext();
+ final String userTable = getUniqueNames(1)[0];
+
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx);
+ FateStore<Manager> zkStore =
+ new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE,
ctx.getZooReaderWriter());
- // Create a fate transaction for one of the compaction ids that is in
the new state, it should
- // never run. Its purpose is to prevent the dead compaction detector
from deleting the id.
- FateStore.FateTxStore<Manager> fateTx =
-
accumuloStore.createAndReserve(FateKey.forCompactionCommit(cids.get(0))).orElseThrow();
- var fateId = fateTx.getID();
- fateTx.unreserve(0, TimeUnit.MILLISECONDS);
+ SortedSet<Text> splits = new TreeSet<>();
+ splits.add(new Text(row(MAX_DATA / 2)));
+ c.tableOperations().create(userTable, new
NewTableConfiguration().withSplits(splits));
+ writeData(c, userTable);
+
+ Map<TableId,FateId> fateIds = new HashMap<>();
+ Map<TableId,List<ExternalCompactionId>> allCids = new HashMap<>();
+
+ // create compaction metadata for each data level to test
+ for (String tableName : List.of(AccumuloTable.ROOT.tableName(),
+ AccumuloTable.METADATA.tableName(), userTable)) {
+ var tableId = ctx.getTableId(tableName);
+ var fateStore = FateInstanceType.fromTableId(tableId) ==
FateInstanceType.USER
+ ? accumuloStore : zkStore;
+ fateIds.put(tableId,
+ createCompactionCommitAndDeadMetadata(c, fateStore, tableName,
allCids));
+ }
- // Read the tablet metadata
- var tabletsMeta =
ctx.getAmple().readTablets().forTable(tableId).build().stream()
- .collect(Collectors.toList());
+ // verify the dead compaction was removed for each level
+ // but not the compaction associated with a fate id
+ for (Entry<TableId,FateId> entry : fateIds.entrySet()) {
+ var tableId = entry.getKey();
+ var fateStore = FateInstanceType.fromTableId(tableId) ==
FateInstanceType.USER
+ ? accumuloStore : zkStore;
+ verifyCompactionCommitAndDead(fateStore, tableId, entry.getValue(),
allCids.get(tableId));
+ }
+ }
+ }
+
+ private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c,
+ FateStore<Manager> fateStore, String tableName,
+ Map<TableId,List<ExternalCompactionId>> allCids) throws Exception {
+ var ctx = getCluster().getServerContext();
+ c.tableOperations().flush(tableName, null, null, true);
+ var tableId = ctx.getTableId(tableName);
+
+ allCids.put(tableId,
List.of(ExternalCompactionId.generate(UUID.randomUUID()),
+ ExternalCompactionId.generate(UUID.randomUUID())));
+
+ // Create a fate transaction for one of the compaction ids that is in the
new state, it
+ // should never run. Its purpose is to prevent the dead compaction detector
+ // from deleting the id.
+ FateStore.FateTxStore<Manager> fateTx = fateStore
+
.createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow();
+ var fateId = fateTx.getID();
+ fateTx.unreserve(0, TimeUnit.MILLISECONDS);
+
+ // Read the tablet metadata
+ var tabletsMeta =
ctx.getAmple().readTablets().forTable(tableId).build().stream()
+ .collect(Collectors.toList());
+ // Root is always 1 tablet
+ if (!tableId.equals(AccumuloTable.ROOT.tableId())) {
assertEquals(2, tabletsMeta.size());
+ }
- // Insert fake compaction entries in the metadata table. No compactor
will report ownership of
- // these, so they should look like dead compactions and be removed.
However, one of them has
- // an associated fate tx that should prevent its removal.
- try (var mutator = ctx.getAmple().mutateTablets()) {
- for (int i = 0; i < tabletsMeta.size(); i++) {
- var tabletMeta = tabletsMeta.get(0);
- var tabletDir =
-
tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent();
- var tmpFile = new Path(tabletDir, "C1234.rf_tmp");
-
- CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(),
- ReferencedTabletFile.of(tmpFile), "localhost:16789",
CompactionKind.SYSTEM,
- (short) 10, CompactorGroupId.of(GROUP1), false, null);
-
-
mutator.mutateTablet(tabletMeta.getExtent()).putExternalCompaction(cids.get(i),
cm)
- .mutate();
- }
+ // Insert fake compaction entries in the metadata table. No compactor will
report ownership
+ // of these, so they should look like dead compactions and be removed.
However, one of
+ // them hasan associated fate tx that should prevent its removal.
+ try (var mutator = ctx.getAmple().mutateTablets()) {
+ for (int i = 0; i < tabletsMeta.size(); i++) {
+ var tabletMeta = tabletsMeta.get(0);
+ var tabletDir =
+
tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent();
+ var tmpFile = new Path(tabletDir, "C1234.rf_tmp");
+
+ CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(),
+ ReferencedTabletFile.of(tmpFile), "localhost:16789",
CompactionKind.SYSTEM, (short) 10,
+ CompactorGroupId.of(GROUP1), false, null);
+
+ mutator.mutateTablet(tabletMeta.getExtent())
+ .putExternalCompaction(allCids.get(tableId).get(i), cm).mutate();
}
-
- // Wait until the compaction id w/o a fate transaction is removed,
should still see the one
- // with a fate transaction
- Wait.waitFor(() -> {
- Set<ExternalCompactionId> currentIds =
ctx.getAmple().readTablets().forTable(tableId)
- .build().stream().map(TabletMetadata::getExternalCompactions)
- .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
- System.out.println("currentIds1:" + currentIds);
- assertTrue(currentIds.size() == 1 || currentIds.size() == 2);
- return currentIds.equals(Set.of(cids.get(0)));
- });
-
- // Delete the fate transaction, should allow the dead compaction
detector to clean up the
- // remaining external compaction id
- fateTx = accumuloStore.reserve(fateId);
- fateTx.delete();
- fateTx.unreserve(0, TimeUnit.MILLISECONDS);
-
- // wait for the remaining compaction id to be removed
- Wait.waitFor(() -> {
- Set<ExternalCompactionId> currentIds =
ctx.getAmple().readTablets().forTable(tableId)
- .build().stream().map(TabletMetadata::getExternalCompactions)
- .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
- System.out.println("currentIds2:" + currentIds);
- assertTrue(currentIds.size() <= 1);
- return currentIds.isEmpty();
- });
}
+
+ return fateId;
+ }
+
+ private void verifyCompactionCommitAndDead(FateStore<Manager> fateStore,
TableId tableId,
+ FateId fateId, List<ExternalCompactionId> cids) {
+ var ctx = getCluster().getServerContext();
+
+ // Wait until the compaction id w/o a fate transaction is removed, should
still see the one
+ // with a fate transaction
+ Wait.waitFor(() -> {
+ Set<ExternalCompactionId> currentIds =
ctx.getAmple().readTablets().forTable(tableId).build()
+ .stream().map(TabletMetadata::getExternalCompactions)
+ .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
+ System.out.println("currentIds1:" + currentIds);
+ assertTrue(currentIds.size() == 1 || currentIds.size() == 2);
+ return currentIds.equals(Set.of(cids.get(0)));
+ });
+
+ // Delete the fate transaction, should allow the dead compaction detector
to clean up the
+ // remaining external compaction id
+ var fateTx = fateStore.reserve(fateId);
+ fateTx.delete();
+ fateTx.unreserve(0, TimeUnit.MILLISECONDS);
+
+ // wait for the remaining compaction id to be removed
+ Wait.waitFor(() -> {
+ Set<ExternalCompactionId> currentIds =
ctx.getAmple().readTablets().forTable(tableId).build()
+ .stream().map(TabletMetadata::getExternalCompactions)
+ .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
+ System.out.println("currentIds2:" + currentIds);
+ assertTrue(currentIds.size() <= 1);
+ return currentIds.isEmpty();
+ });
}
@Test