This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 4836824a91 fixes external compaction race condition (#5570)
4836824a91 is described below

commit 4836824a91768e93e06375419a0f02ddcf106f2f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu May 22 11:41:41 2025 -0400

    fixes external compaction race condition (#5570)
    
    * fixes external compaction race condition
    
    In there tablet server there was a race condition between the set of
    online tablets and set of external compactions.  When a tablet loaded w/
    existing external compaction it would add to the external compaction set
    first and the online tablet set second. This could cause cause code that
    cleans up external compactions to remove something it saw in  one set
    and not the other.  Considered opening tablets to avoid this race.  Also
    adjusted rpc code to avoid the race.
    
    * fix race condition in fix and improve logging
    
    * improve logging
    
    * avoid copying immutable set while lock is held
---
 .../tserver/OpeningAndOnlineCompactables.java      | 36 +++++++++++++
 .../org/apache/accumulo/tserver/TabletServer.java  | 17 +++++--
 .../tserver/compactions/CompactionManager.java     | 59 ++++++++++++++++++----
 .../accumulo/tserver/tablet/CompactableImpl.java   | 10 ++--
 4 files changed, 103 insertions(+), 19 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/OpeningAndOnlineCompactables.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/OpeningAndOnlineCompactables.java
new file mode 100644
index 0000000000..728ac38efd
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/OpeningAndOnlineCompactables.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.tserver.tablet.Tablet;
+
+public class OpeningAndOnlineCompactables {
+  public final Set<KeyExtent> opening;
+  public final Map<KeyExtent,Tablet> online;
+
+  public OpeningAndOnlineCompactables(Set<KeyExtent> openingSnapshot,
+      Map<KeyExtent,Tablet> onlineSnapshot) {
+    this.opening = openingSnapshot;
+    this.online = onlineSnapshot;
+  }
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c70f5f48e4..4ef6d7781f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Durability;
@@ -169,7 +170,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
 
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
@@ -767,9 +767,18 @@ public class TabletServer extends AbstractServer
     metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(), 
getApplicationName(),
         clientAddress, ""));
 
-    this.compactionManager = new CompactionManager(() -> Iterators
-        .transform(onlineTablets.snapshot().values().iterator(), 
Tablet::asCompactable),
-        getContext(), ceMetrics);
+    Supplier<OpeningAndOnlineCompactables> compactables = () -> {
+      // synchronize on both sets to get a consistent snapshot across both 
sets, this avoids missing
+      // any tablets that are moving between sets
+      synchronized (openingTablets) {
+        synchronized (onlineTablets) {
+          return new OpeningAndOnlineCompactables(Set.copyOf(openingTablets),
+              onlineTablets.snapshot());
+        }
+      }
+    };
+
+    this.compactionManager = new CompactionManager(compactables, getContext(), 
ceMetrics);
     compactionManager.start();
 
     announceExistence();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index e9dc2c5d5d..5b890f8387 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.conf.Property;
@@ -48,6 +49,7 @@ import 
org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.tserver.OpeningAndOnlineCompactables;
 import org.apache.accumulo.tserver.compactions.CompactionExecutor.CType;
 import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
 import org.apache.accumulo.tserver.tablet.Tablet;
@@ -63,7 +65,7 @@ public class CompactionManager {
 
   private static final Logger log = 
LoggerFactory.getLogger(CompactionManager.class);
 
-  private final Iterable<Compactable> compactables;
+  private final Supplier<OpeningAndOnlineCompactables> compactables;
   private volatile Map<CompactionServiceId,CompactionService> services;
 
   private final LinkedBlockingQueue<Compactable> compactablesToCheck = new 
LinkedBlockingQueue<>();
@@ -120,18 +122,33 @@ public class CompactionManager {
         long passed = NANOSECONDS.toMillis(System.nanoTime() - 
lastCheckAllTime);
         if (passed >= maxTimeBetweenChecks) {
           // take a snapshot of what is currently running
-          HashSet<ExternalCompactionId> runningEcids =
-              new HashSet<>(runningExternalCompactions.keySet());
-          for (Compactable compactable : compactables) {
+          HashMap<ExternalCompactionId,ExtCompInfo> runningEcids =
+              new HashMap<>(runningExternalCompactions);
+          // Get a snapshot of the tablets that are online and opening, this 
must be obtained after
+          // getting the runningExternalCompactions snapshot above. If it were 
obtained before then
+          // an opening tablet could add itself to runningExternalCompactions 
after this code
+          // obtained the snapshot of opening and online tablets and this code 
would remove it.
+          var compactablesSnapshot = compactables.get();
+          for (Tablet tablet : compactablesSnapshot.online.values()) {
+            Compactable compactable = tablet.asCompactable();
             last = compactable;
             submitCompaction(compactable);
             // remove anything from snapshot that tablets know are running
             compactable.getExternalCompactionIds(runningEcids::remove);
           }
           lastCheckAllTime = System.nanoTime();
+
+          // remove any tablets that are currently opening, these may have 
been added to
+          // runningExternalCompactions while in the process of opening
+          runningEcids.values()
+              .removeIf(extCompInfo -> 
compactablesSnapshot.opening.contains(extCompInfo.extent));
+
           // anything left in the snapshot is unknown to any tablet and should 
be removed if it
           // still exists
-          runningExternalCompactions.keySet().removeAll(runningEcids);
+          runningEcids.forEach((ecid, info) -> log.debug(
+              "Removing unknown external compaction {} {} from 
runningExternalCompactions", ecid,
+              info.extent));
+          runningExternalCompactions.keySet().removeAll(runningEcids.keySet());
         } else {
           var compactable = compactablesToCheck.poll(maxTimeBetweenChecks - 
passed, MILLISECONDS);
           if (compactable != null) {
@@ -192,8 +209,8 @@ public class CompactionManager {
     }
   }
 
-  public CompactionManager(Iterable<Compactable> compactables, ServerContext 
context,
-      CompactionExecutorsMetrics ceMetrics) {
+  public CompactionManager(Supplier<OpeningAndOnlineCompactables> compactables,
+      ServerContext context, CompactionExecutorsMetrics ceMetrics) {
     this.compactables = compactables;
 
     this.currentCfg =
@@ -338,7 +355,8 @@ public class CompactionManager {
     if (ecJob != null) {
       runningExternalCompactions.put(ecJob.getExternalCompactionId(),
           new ExtCompInfo(ecJob.getExtent(), extCE.getId()));
-      log.debug("Reserved external compaction {}", 
ecJob.getExternalCompactionId());
+      log.debug("Reserved external compaction {} {}", 
ecJob.getExternalCompactionId(),
+          ecJob.getExtent());
     }
     return ecJob;
   }
@@ -354,6 +372,7 @@ public class CompactionManager {
   public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent 
extent,
       CompactionExecutorId ceid) {
     runningExternalCompactions.put(ecid, new ExtCompInfo(extent, ceid));
+    log.trace("registered external compaction {} {}", ecid, extent);
   }
 
   public void commitExternalCompaction(ExternalCompactionId extCompactionId,
@@ -365,10 +384,19 @@ public class CompactionManager {
           "Unexpected extent seen on compaction commit %s %s", ecInfo.extent, 
extentCompacted);
       Tablet tablet = currentTablets.get(ecInfo.extent);
       if (tablet != null) {
+        log.debug("Attempting to commit external compaction {} {}", 
extCompactionId,
+            tablet.getExtent());
         tablet.asCompactable().commitExternalCompaction(extCompactionId, 
fileSize, entries);
         compactablesToCheck.add(tablet.asCompactable());
+        runningExternalCompactions.remove(extCompactionId);
+        log.trace("Committed external compaction {} {}", extCompactionId, 
tablet.getExtent());
+      } else {
+        log.debug("Ignoring request to commit {} {} because its not in set of 
known tablets",
+            extCompactionId, ecInfo.extent);
       }
-      runningExternalCompactions.remove(extCompactionId);
+    } else {
+      log.debug("Ignoring request to commit {} because its not in 
runningExternalCompactions",
+          extCompactionId);
     }
   }
 
@@ -380,10 +408,17 @@ public class CompactionManager {
           "Unexpected extent seen on compaction commit %s %s", ecInfo.extent, 
extentCompacted);
       Tablet tablet = currentTablets.get(ecInfo.extent);
       if (tablet != null) {
+        log.debug("Attempting to fail external compaction {} {}", ecid, 
tablet.getExtent());
         tablet.asCompactable().externalCompactionFailed(ecid);
         compactablesToCheck.add(tablet.asCompactable());
+        runningExternalCompactions.remove(ecid);
+        log.trace("Failed external compaction {} {}", ecid, 
tablet.getExtent());
+      } else {
+        log.debug("Ignoring request to fail {} {} because its not in set of 
known tablets", ecid,
+            ecInfo.extent);
       }
-      runningExternalCompactions.remove(ecid);
+    } else {
+      log.debug("Ignoring request to fail {} because its not in 
runningExternalCompactions", ecid);
     }
   }
 
@@ -424,6 +459,10 @@ public class CompactionManager {
   public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> 
servicesUsed,
       Set<ExternalCompactionId> ecids) {
     runningExternalCompactions.keySet().removeAll(ecids);
+    if (log.isTraceEnabled()) {
+      ecids.forEach(ecid -> log.trace(
+          "Removed {} from runningExternalCompactions for {} as part of 
close", ecid, extent));
+    }
     servicesUsed.stream().map(services::get).filter(Objects::nonNull)
         .forEach(compService -> compService.compactableClosed(extent));
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 6d0d4b3446..258529fba2 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -1429,7 +1429,6 @@ public class CompactableImpl implements Compactable {
       ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
 
       if (ecInfo != null) {
-        log.debug("Attempting to commit external compaction {}", 
extCompactionId);
         Optional<StoredTabletFile> metaFile = Optional.empty();
         boolean successful = false;
         try {
@@ -1448,11 +1447,11 @@ public class CompactableImpl implements Compactable {
         } finally {
           completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile, 
successful);
           externalCompactions.remove(extCompactionId);
-          log.debug("Completed commit of external compaction {}", 
extCompactionId);
+          log.debug("Completed commit of external compaction {} {}", 
extCompactionId, getExtent());
         }
       } else {
-        log.debug("Ignoring request to commit external compaction that is 
unknown {}",
-            extCompactionId);
+        log.debug("Ignoring request to commit external compaction that is 
unknown {} {}",
+            extCompactionId, getExtent());
       }
 
       
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
@@ -1487,7 +1486,8 @@ public class CompactableImpl implements Compactable {
         externalCompactions.remove(ecid);
         log.debug("Processed external compaction failure: id: {}, extent: {}", 
ecid, getExtent());
       } else {
-        log.debug("Ignoring request to fail external compaction that is 
unknown {}", ecid);
+        log.debug("Ignoring request to fail external compaction that is 
unknown {} {}", ecid,
+            getExtent());
       }
 
       
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid));

Reply via email to