This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 4836824a91 fixes external compaction race condition (#5570)
4836824a91 is described below
commit 4836824a91768e93e06375419a0f02ddcf106f2f
Author: Keith Turner <[email protected]>
AuthorDate: Thu May 22 11:41:41 2025 -0400
fixes external compaction race condition (#5570)
* fixes external compaction race condition
In there tablet server there was a race condition between the set of
online tablets and set of external compactions. When a tablet loaded w/
existing external compaction it would add to the external compaction set
first and the online tablet set second. This could cause cause code that
cleans up external compactions to remove something it saw in one set
and not the other. Considered opening tablets to avoid this race. Also
adjusted rpc code to avoid the race.
* fix race condition in fix and improve logging
* improve logging
* avoid copying immutable set while lock is held
---
.../tserver/OpeningAndOnlineCompactables.java | 36 +++++++++++++
.../org/apache/accumulo/tserver/TabletServer.java | 17 +++++--
.../tserver/compactions/CompactionManager.java | 59 ++++++++++++++++++----
.../accumulo/tserver/tablet/CompactableImpl.java | 10 ++--
4 files changed, 103 insertions(+), 19 deletions(-)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/OpeningAndOnlineCompactables.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/OpeningAndOnlineCompactables.java
new file mode 100644
index 0000000000..728ac38efd
--- /dev/null
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/OpeningAndOnlineCompactables.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.tserver.tablet.Tablet;
+
+public class OpeningAndOnlineCompactables {
+ public final Set<KeyExtent> opening;
+ public final Map<KeyExtent,Tablet> online;
+
+ public OpeningAndOnlineCompactables(Set<KeyExtent> openingSnapshot,
+ Map<KeyExtent,Tablet> onlineSnapshot) {
+ this.opening = openingSnapshot;
+ this.online = onlineSnapshot;
+ }
+}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c70f5f48e4..4ef6d7781f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Durability;
@@ -169,7 +170,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
@@ -767,9 +767,18 @@ public class TabletServer extends AbstractServer
metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(),
getApplicationName(),
clientAddress, ""));
- this.compactionManager = new CompactionManager(() -> Iterators
- .transform(onlineTablets.snapshot().values().iterator(),
Tablet::asCompactable),
- getContext(), ceMetrics);
+ Supplier<OpeningAndOnlineCompactables> compactables = () -> {
+ // synchronize on both sets to get a consistent snapshot across both
sets, this avoids missing
+ // any tablets that are moving between sets
+ synchronized (openingTablets) {
+ synchronized (onlineTablets) {
+ return new OpeningAndOnlineCompactables(Set.copyOf(openingTablets),
+ onlineTablets.snapshot());
+ }
+ }
+ };
+
+ this.compactionManager = new CompactionManager(compactables, getContext(),
ceMetrics);
compactionManager.start();
announceExistence();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index e9dc2c5d5d..5b890f8387 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.accumulo.core.conf.Property;
@@ -48,6 +49,7 @@ import
org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.tserver.OpeningAndOnlineCompactables;
import org.apache.accumulo.tserver.compactions.CompactionExecutor.CType;
import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
import org.apache.accumulo.tserver.tablet.Tablet;
@@ -63,7 +65,7 @@ public class CompactionManager {
private static final Logger log =
LoggerFactory.getLogger(CompactionManager.class);
- private final Iterable<Compactable> compactables;
+ private final Supplier<OpeningAndOnlineCompactables> compactables;
private volatile Map<CompactionServiceId,CompactionService> services;
private final LinkedBlockingQueue<Compactable> compactablesToCheck = new
LinkedBlockingQueue<>();
@@ -120,18 +122,33 @@ public class CompactionManager {
long passed = NANOSECONDS.toMillis(System.nanoTime() -
lastCheckAllTime);
if (passed >= maxTimeBetweenChecks) {
// take a snapshot of what is currently running
- HashSet<ExternalCompactionId> runningEcids =
- new HashSet<>(runningExternalCompactions.keySet());
- for (Compactable compactable : compactables) {
+ HashMap<ExternalCompactionId,ExtCompInfo> runningEcids =
+ new HashMap<>(runningExternalCompactions);
+ // Get a snapshot of the tablets that are online and opening, this
must be obtained after
+ // getting the runningExternalCompactions snapshot above. If it were
obtained before then
+ // an opening tablet could add itself to runningExternalCompactions
after this code
+ // obtained the snapshot of opening and online tablets and this code
would remove it.
+ var compactablesSnapshot = compactables.get();
+ for (Tablet tablet : compactablesSnapshot.online.values()) {
+ Compactable compactable = tablet.asCompactable();
last = compactable;
submitCompaction(compactable);
// remove anything from snapshot that tablets know are running
compactable.getExternalCompactionIds(runningEcids::remove);
}
lastCheckAllTime = System.nanoTime();
+
+ // remove any tablets that are currently opening, these may have
been added to
+ // runningExternalCompactions while in the process of opening
+ runningEcids.values()
+ .removeIf(extCompInfo ->
compactablesSnapshot.opening.contains(extCompInfo.extent));
+
// anything left in the snapshot is unknown to any tablet and should
be removed if it
// still exists
- runningExternalCompactions.keySet().removeAll(runningEcids);
+ runningEcids.forEach((ecid, info) -> log.debug(
+ "Removing unknown external compaction {} {} from
runningExternalCompactions", ecid,
+ info.extent));
+ runningExternalCompactions.keySet().removeAll(runningEcids.keySet());
} else {
var compactable = compactablesToCheck.poll(maxTimeBetweenChecks -
passed, MILLISECONDS);
if (compactable != null) {
@@ -192,8 +209,8 @@ public class CompactionManager {
}
}
- public CompactionManager(Iterable<Compactable> compactables, ServerContext
context,
- CompactionExecutorsMetrics ceMetrics) {
+ public CompactionManager(Supplier<OpeningAndOnlineCompactables> compactables,
+ ServerContext context, CompactionExecutorsMetrics ceMetrics) {
this.compactables = compactables;
this.currentCfg =
@@ -338,7 +355,8 @@ public class CompactionManager {
if (ecJob != null) {
runningExternalCompactions.put(ecJob.getExternalCompactionId(),
new ExtCompInfo(ecJob.getExtent(), extCE.getId()));
- log.debug("Reserved external compaction {}",
ecJob.getExternalCompactionId());
+ log.debug("Reserved external compaction {} {}",
ecJob.getExternalCompactionId(),
+ ecJob.getExtent());
}
return ecJob;
}
@@ -354,6 +372,7 @@ public class CompactionManager {
public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent
extent,
CompactionExecutorId ceid) {
runningExternalCompactions.put(ecid, new ExtCompInfo(extent, ceid));
+ log.trace("registered external compaction {} {}", ecid, extent);
}
public void commitExternalCompaction(ExternalCompactionId extCompactionId,
@@ -365,10 +384,19 @@ public class CompactionManager {
"Unexpected extent seen on compaction commit %s %s", ecInfo.extent,
extentCompacted);
Tablet tablet = currentTablets.get(ecInfo.extent);
if (tablet != null) {
+ log.debug("Attempting to commit external compaction {} {}",
extCompactionId,
+ tablet.getExtent());
tablet.asCompactable().commitExternalCompaction(extCompactionId,
fileSize, entries);
compactablesToCheck.add(tablet.asCompactable());
+ runningExternalCompactions.remove(extCompactionId);
+ log.trace("Committed external compaction {} {}", extCompactionId,
tablet.getExtent());
+ } else {
+ log.debug("Ignoring request to commit {} {} because its not in set of
known tablets",
+ extCompactionId, ecInfo.extent);
}
- runningExternalCompactions.remove(extCompactionId);
+ } else {
+ log.debug("Ignoring request to commit {} because its not in
runningExternalCompactions",
+ extCompactionId);
}
}
@@ -380,10 +408,17 @@ public class CompactionManager {
"Unexpected extent seen on compaction commit %s %s", ecInfo.extent,
extentCompacted);
Tablet tablet = currentTablets.get(ecInfo.extent);
if (tablet != null) {
+ log.debug("Attempting to fail external compaction {} {}", ecid,
tablet.getExtent());
tablet.asCompactable().externalCompactionFailed(ecid);
compactablesToCheck.add(tablet.asCompactable());
+ runningExternalCompactions.remove(ecid);
+ log.trace("Failed external compaction {} {}", ecid,
tablet.getExtent());
+ } else {
+ log.debug("Ignoring request to fail {} {} because its not in set of
known tablets", ecid,
+ ecInfo.extent);
}
- runningExternalCompactions.remove(ecid);
+ } else {
+ log.debug("Ignoring request to fail {} because its not in
runningExternalCompactions", ecid);
}
}
@@ -424,6 +459,10 @@ public class CompactionManager {
public void compactableClosed(KeyExtent extent, Set<CompactionServiceId>
servicesUsed,
Set<ExternalCompactionId> ecids) {
runningExternalCompactions.keySet().removeAll(ecids);
+ if (log.isTraceEnabled()) {
+ ecids.forEach(ecid -> log.trace(
+ "Removed {} from runningExternalCompactions for {} as part of
close", ecid, extent));
+ }
servicesUsed.stream().map(services::get).filter(Objects::nonNull)
.forEach(compService -> compService.compactableClosed(extent));
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 6d0d4b3446..258529fba2 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -1429,7 +1429,6 @@ public class CompactableImpl implements Compactable {
ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
if (ecInfo != null) {
- log.debug("Attempting to commit external compaction {}",
extCompactionId);
Optional<StoredTabletFile> metaFile = Optional.empty();
boolean successful = false;
try {
@@ -1448,11 +1447,11 @@ public class CompactableImpl implements Compactable {
} finally {
completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile,
successful);
externalCompactions.remove(extCompactionId);
- log.debug("Completed commit of external compaction {}",
extCompactionId);
+ log.debug("Completed commit of external compaction {} {}",
extCompactionId, getExtent());
}
} else {
- log.debug("Ignoring request to commit external compaction that is
unknown {}",
- extCompactionId);
+ log.debug("Ignoring request to commit external compaction that is
unknown {} {}",
+ extCompactionId, getExtent());
}
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
@@ -1487,7 +1486,8 @@ public class CompactableImpl implements Compactable {
externalCompactions.remove(ecid);
log.debug("Processed external compaction failure: id: {}, extent: {}",
ecid, getExtent());
} else {
- log.debug("Ignoring request to fail external compaction that is
unknown {}", ecid);
+ log.debug("Ignoring request to fail external compaction that is
unknown {} {}", ecid,
+ getExtent());
}
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid));