This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 9049e22abe fixes bug with cleaning up multiple dead compactions in 
single tablet (#5083)
9049e22abe is described below

commit 9049e22abefda919ae0aafff15c8339569c9899e
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Nov 20 08:37:32 2024 -0500

    fixes bug with cleaning up multiple dead compactions in single tablet 
(#5083)
    
    When a single tablet had multiple dead compactions the dead compaction
    cleanup code would attempt to write separate conditonal mutations for
    the same tablet.  For simplicity Ample does not support this case and
    throws an exception.  Reworked the code to only write a single
    conditional mutation per tablet for dead compaction detection.
---
 .../coordinator/CompactionCoordinator.java         | 62 ++++++++++++----------
 1 file changed, 33 insertions(+), 29 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 941d6647d0..925da279fc 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.manager.compaction.coordinator;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
@@ -37,11 +36,11 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -779,34 +778,43 @@ public class CompactionCoordinator
 
   void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
     // Need to process each level by itself because the conditional tablet 
mutator does not support
-    // mutating multiple data levels at the same time
-    compactions.entrySet().stream()
-        .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()),
-            Collectors.toMap(Entry::getKey, Entry::getValue)))
-        .forEach((level, compactionsByLevel) -> 
compactionFailedForLevel(compactionsByLevel));
+    // mutating multiple data levels at the same time. Also the conditional 
tablet mutator does not
+    // support submitting multiple mutations for a single tablet, so need to 
group by extent.
+
+    Map<DataLevel,Map<KeyExtent,Set<ExternalCompactionId>>> groupedCompactions 
=
+        new EnumMap<>(DataLevel.class);
+
+    compactions.forEach((ecid, extent) -> {
+      groupedCompactions.computeIfAbsent(DataLevel.of(extent.tableId()), dl -> 
new HashMap<>())
+          .computeIfAbsent(extent, e -> new HashSet<>()).add(ecid);
+    });
+
+    groupedCompactions
+        .forEach((dataLevel, levelCompactions) -> 
compactionFailedForLevel(levelCompactions));
   }
 
-  void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent> 
compactions) {
+  void compactionFailedForLevel(Map<KeyExtent,Set<ExternalCompactionId>> 
compactions) {
 
     try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
-      compactions.forEach((ecid, extent) -> {
+      compactions.forEach((extent, ecids) -> {
         try {
           ctx.requireNotDeleted(extent.tableId());
-          
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
-              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
-
-                @Override
-                public boolean callWhenTabletDoesNotExists() {
-                  return true;
-                }
+          var mutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation();
+          ecids.forEach(mutator::requireCompaction);
+          ecids.forEach(mutator::deleteExternalCompaction);
+          mutator.submit(new RejectionHandler() {
+            @Override
+            public boolean callWhenTabletDoesNotExists() {
+              return true;
+            }
 
-                @Override
-                public boolean test(TabletMetadata tabletMetadata) {
-                  return tabletMetadata == null
-                      || 
!tabletMetadata.getExternalCompactions().containsKey(ecid);
-                }
+            @Override
+            public boolean test(TabletMetadata tabletMetadata) {
+              return tabletMetadata == null
+                  || 
Collections.disjoint(tabletMetadata.getExternalCompactions().keySet(), ecids);
+            }
 
-              });
+          });
         } catch (TableDeletedException e) {
           LOG.warn("Table {} was deleted, unable to update metadata for 
compaction failure.",
               extent.tableId());
@@ -820,10 +828,7 @@ public class CompactionCoordinator
           // this should try again later when the dead compaction detector 
runs, lets log it in case
           // its a persistent problem
           if (LOG.isDebugEnabled()) {
-            var ecid =
-                compactions.entrySet().stream().filter(entry -> 
entry.getValue().equals(extent))
-                    .findFirst().map(Map.Entry::getKey).orElse(null);
-            LOG.debug("Unable to remove failed compaction {} {}", extent, 
ecid);
+            LOG.debug("Unable to remove failed compaction {} {}", extent, 
compactions.get(extent));
           }
         } else {
           // compactionFailed is called from the Compactor when either a 
compaction fails or
@@ -833,8 +838,7 @@ public class CompactionCoordinator
           // that have a corresponding ecid in the name.
 
           ecidsForTablet.clear();
-          compactions.entrySet().stream().filter(e -> 
e.getValue().compareTo(extent) == 0)
-              .map(Entry::getKey).forEach(ecidsForTablet::add);
+          ecidsForTablet.addAll(compactions.get(extent));
 
           if (!ecidsForTablet.isEmpty()) {
             final TabletMetadata tm = ctx.getAmple().readTablet(extent, 
ColumnType.DIR);
@@ -875,7 +879,7 @@ public class CompactionCoordinator
       });
     }
 
-    compactions.forEach((k, v) -> recordCompletion(k));
+    compactions.values().forEach(ecids -> 
ecids.forEach(this::recordCompletion));
   }
 
   /**

Reply via email to