This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 9049e22abe fixes bug with cleaning up multiple dead compactions in single tablet (#5083) 9049e22abe is described below commit 9049e22abefda919ae0aafff15c8339569c9899e Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Nov 20 08:37:32 2024 -0500 fixes bug with cleaning up multiple dead compactions in single tablet (#5083) When a single tablet had multiple dead compactions the dead compaction cleanup code would attempt to write separate conditonal mutations for the same tablet. For simplicity Ample does not support this case and throws an exception. Reworked the code to only write a single conditional mutation per tablet for dead compaction detection. --- .../coordinator/CompactionCoordinator.java | 62 ++++++++++++---------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 941d6647d0..925da279fc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -19,7 +19,6 @@ package org.apache.accumulo.manager.compaction.coordinator; import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; @@ -37,11 +36,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -779,34 +778,43 @@ public class CompactionCoordinator void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) { // Need to process each level by itself because the conditional tablet mutator does not support - // mutating multiple data levels at the same time - compactions.entrySet().stream() - .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()), - Collectors.toMap(Entry::getKey, Entry::getValue))) - .forEach((level, compactionsByLevel) -> compactionFailedForLevel(compactionsByLevel)); + // mutating multiple data levels at the same time. Also the conditional tablet mutator does not + // support submitting multiple mutations for a single tablet, so need to group by extent. + + Map<DataLevel,Map<KeyExtent,Set<ExternalCompactionId>>> groupedCompactions = + new EnumMap<>(DataLevel.class); + + compactions.forEach((ecid, extent) -> { + groupedCompactions.computeIfAbsent(DataLevel.of(extent.tableId()), dl -> new HashMap<>()) + .computeIfAbsent(extent, e -> new HashSet<>()).add(ecid); + }); + + groupedCompactions + .forEach((dataLevel, levelCompactions) -> compactionFailedForLevel(levelCompactions)); } - void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent> compactions) { + void compactionFailedForLevel(Map<KeyExtent,Set<ExternalCompactionId>> compactions) { try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { - compactions.forEach((ecid, extent) -> { + compactions.forEach((extent, ecids) -> { try { ctx.requireNotDeleted(extent.tableId()); - tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) - .deleteExternalCompaction(ecid).submit(new RejectionHandler() { - - @Override - public boolean callWhenTabletDoesNotExists() { - return true; - } + var mutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation(); + ecids.forEach(mutator::requireCompaction); + ecids.forEach(mutator::deleteExternalCompaction); + mutator.submit(new RejectionHandler() { + @Override + public boolean callWhenTabletDoesNotExists() { + return true; + } - @Override - public boolean test(TabletMetadata tabletMetadata) { - return tabletMetadata == null - || !tabletMetadata.getExternalCompactions().containsKey(ecid); - } + @Override + public boolean test(TabletMetadata tabletMetadata) { + return tabletMetadata == null + || Collections.disjoint(tabletMetadata.getExternalCompactions().keySet(), ecids); + } - }); + }); } catch (TableDeletedException e) { LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.", extent.tableId()); @@ -820,10 +828,7 @@ public class CompactionCoordinator // this should try again later when the dead compaction detector runs, lets log it in case // its a persistent problem if (LOG.isDebugEnabled()) { - var ecid = - compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent)) - .findFirst().map(Map.Entry::getKey).orElse(null); - LOG.debug("Unable to remove failed compaction {} {}", extent, ecid); + LOG.debug("Unable to remove failed compaction {} {}", extent, compactions.get(extent)); } } else { // compactionFailed is called from the Compactor when either a compaction fails or @@ -833,8 +838,7 @@ public class CompactionCoordinator // that have a corresponding ecid in the name. ecidsForTablet.clear(); - compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0) - .map(Entry::getKey).forEach(ecidsForTablet::add); + ecidsForTablet.addAll(compactions.get(extent)); if (!ecidsForTablet.isEmpty()) { final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); @@ -875,7 +879,7 @@ public class CompactionCoordinator }); } - compactions.forEach((k, v) -> recordCompletion(k)); + compactions.values().forEach(ecids -> ecids.forEach(this::recordCompletion)); } /**