This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 198a991533 Removes in memory set from dead compaction detector (#6283)
198a991533 is described below
commit 198a99153316d1a8ad0dfe99b7dd627dba40627d
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 15 10:13:23 2026 -0700
Removes in memory set from dead compaction detector (#6283)
Removed an in memory set of tables ids in the dead compaction detectors
that contained table ids that may have compaction tmp files that needed
cleanup. This set would be hard to maintain in multiple managers. Also
the set could lose track of tables if the process died.
Replaced the in memory set with a set in the metadata table. This set is
directly populated by the split and merge fate operations, so there is
no chance of losing track of things when a process dies. Also this set
is more narrow and allows looking for tmp files to cleanup in single
tablets dirs rather than scanning an entire tables dir.
Also made a change to the order in which tmp files are deleted for
failed compactions. They used to be deleted after the metadata for the
compaction was cleaned up, this could lead to losing track of the
cleanup if the process died after deleting the metadata but before
deleting the tmp file. Now the tmp files are deleted before the
metadata entry, so should no longer lose track in process death.
This change is needed by #6217
Co-authored-by: Daniel Roberts <[email protected]>
---
.../accumulo/core/metadata/schema/Ample.java | 21 ++++
.../core/metadata/schema/MetadataSchema.java | 32 ++++++
.../util/compaction/ExternalCompactionUtil.java | 3 +-
.../metadata/OrphanedCompactionStoreImpl.java | 103 +++++++++++++++++
.../accumulo/server/metadata/ServerAmpleImpl.java | 5 +
.../server/util/FindCompactionTmpFiles.java | 128 ++++++++++++++-------
.../coordinator/CompactionCoordinator.java | 82 +++----------
.../coordinator/DeadCompactionDetector.java | 77 ++++++-------
.../manager/tableOps/merge/MergeTablets.java | 17 +++
.../manager/tableOps/split/UpdateTablets.java | 10 ++
.../manager/tableOps/merge/MergeTabletsTest.java | 38 +++++-
.../manager/tableOps/split/UpdateTabletsTest.java | 26 +++++
.../test/compaction/ExternalCompaction2ITBase.java | 4 +-
.../test/compaction/ExternalCompaction_3_IT.java | 12 +-
14 files changed, 401 insertions(+), 157 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 4d06f3b1aa..7e66f37068 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
@@ -669,4 +670,24 @@ public interface Ample {
default ScanServerRefStore scanServerRefs() {
throw new UnsupportedOperationException();
}
+
+ record OrphanedCompaction(ExternalCompactionId id, TableId table, String
dir) {
+ }
+
+ /**
+ * Tracks compactions that were removed from the metadata table but may
still be running on
+ * compactors. The tmp files associated with these compactions can
eventually be removed when the
+ * compaction is no longer running.
+ */
+ interface OrphanedCompactionStore {
+ Stream<OrphanedCompaction> list();
+
+ void add(Collection<OrphanedCompaction> orphanedCompactions);
+
+ void delete(Collection<OrphanedCompaction> orphanedCompactions);
+ }
+
+ default OrphanedCompactionStore orphanedCompactions() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 39e1c35e3b..fa7a98ce38 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -547,4 +547,36 @@ public class MetadataSchema {
}
+ /**
+ * Holds information about compactions that were deleted from tablets
metadata by split or merge
+ * operations. These may still be running and may have tmp files that need
to be cleaned up.
+ */
+ public static class OrphanedCompactionSection {
+ private static final Section section =
+ new Section(RESERVED_PREFIX + "ocomp", true, RESERVED_PREFIX +
"ocomq", false);
+
+ public static Range getRange() {
+ return section.getRange();
+ }
+
+ public static String getRowPrefix() {
+ return section.getRowPrefix();
+ }
+
+ public static Ample.OrphanedCompaction decodeRow(String row) {
+ String[] fields = row.split("#");
+ Preconditions.checkArgument(fields.length == 4);
+ Preconditions.checkArgument(getRowPrefix().equals(fields[0]));
+ return new Ample.OrphanedCompaction(ExternalCompactionId.from(fields[1]),
+ TableId.of(fields[2]), fields[3]);
+ }
+
+ public static String encodeRow(Ample.OrphanedCompaction rc) {
+ // put the compaction id first in the row because its uuid will spread
out nicely and avoid
+ // hot spotting
+ return getRowPrefix() + "#" + rc.id().canonical() + "#" +
rc.table().canonical() + "#"
+ + rc.dir();
+
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 149c12e12b..4e842f3681 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -24,7 +24,6 @@ import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RU
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -262,7 +261,7 @@ public class ExternalCompactionUtil {
}
}
- public static Collection<ExternalCompactionId>
+ public static Set<ExternalCompactionId>
getCompactionIdsRunningOnCompactors(ClientContext context) {
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/OrphanedCompactionStoreImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/OrphanedCompactionStoreImpl.java
new file mode 100644
index 0000000000..4fe09966b9
--- /dev/null
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/OrphanedCompactionStoreImpl.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.OrphanedCompactionSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.ServerContext;
+
+import com.google.common.base.Preconditions;
+
+public class OrphanedCompactionStoreImpl implements
Ample.OrphanedCompactionStore {
+ private final ServerContext context;
+
+ public OrphanedCompactionStoreImpl(ServerContext context) {
+ this.context = context;
+ }
+
+ private Stream<Ample.OrphanedCompaction> createStream(String tableName) {
+ Scanner scanner = null;
+ try {
+ scanner = context.createScanner(tableName, Authorizations.EMPTY);
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ scanner.setRange(OrphanedCompactionSection.getRange());
+ return scanner.stream().map(e -> e.getKey().getRowData().toString())
+ .map(OrphanedCompactionSection::decodeRow).onClose(scanner::close);
+ }
+
+ @Override
+ public Stream<Ample.OrphanedCompaction> list() {
+ return Stream.concat(createStream(Ample.DataLevel.METADATA.metaTable()),
+ createStream(Ample.DataLevel.USER.metaTable()));
+ }
+
+ private void write(Collection<Ample.OrphanedCompaction> orphanedCompactions,
+ Function<Ample.OrphanedCompaction,Mutation> converter) {
+ if (orphanedCompactions.isEmpty()) {
+ return;
+ }
+
+ Map<Ample.DataLevel,List<Ample.OrphanedCompaction>> byLevel =
orphanedCompactions.stream()
+ .collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table())));
+ // Do not expect the root to split or merge so it should never have this
data
+ Preconditions.checkArgument(!byLevel.containsKey(Ample.DataLevel.ROOT));
+ byLevel.forEach((dl, removed) -> {
+ try (var writer = context.createBatchWriter(dl.metaTable())) {
+ for (var rc : removed) {
+ writer.addMutation(converter.apply(rc));
+ }
+ } catch (TableNotFoundException | MutationsRejectedException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ }
+
+ @Override
+ public void add(Collection<Ample.OrphanedCompaction> orphanedCompactions) {
+ write(orphanedCompactions, oc -> {
+ Mutation m = new Mutation(OrphanedCompactionSection.encodeRow(oc));
+ m.put("", "", "");
+ return m;
+ });
+
+ }
+
+ @Override
+ public void delete(Collection<Ample.OrphanedCompaction> orphanedCompactions)
{
+ write(orphanedCompactions, oc -> {
+ Mutation m = new Mutation(OrphanedCompactionSection.encodeRow(oc));
+ m.putDelete("", "");
+ return m;
+ });
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index a4eb470c66..ee17d962de 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -281,6 +281,11 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
return scanServerRefStore;
}
+ @Override
+ public OrphanedCompactionStore orphanedCompactions() {
+ return new OrphanedCompactionStoreImpl(getContext());
+ }
+
@VisibleForTesting
protected ServerContext getContext() {
return context;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java
b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java
index 06755d1807..7064b12de3 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java
@@ -18,11 +18,13 @@
*/
package org.apache.accumulo.server.util;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -31,8 +33,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ServerOpts;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -46,6 +51,7 @@ import org.apache.accumulo.start.spi.CommandGroup;
import org.apache.accumulo.start.spi.CommandGroups;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +59,7 @@ import org.slf4j.LoggerFactory;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.auto.service.AutoService;
+import com.google.common.util.concurrent.MoreExecutors;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
@@ -127,6 +134,7 @@ public class FindCompactionTmpFiles extends
ServerKeywordExecutable<FindOpts> {
});
}
}
+
LOG.trace("Final set of compaction tmp files after removing active
compactions: {}", matches);
return matches;
}
@@ -137,57 +145,99 @@ public class FindCompactionTmpFiles extends
ServerKeywordExecutable<FindOpts> {
public int error = 0;
}
- public static DeleteStats deleteTempFiles(ServerContext context, Set<Path>
filesToDelete)
- throws InterruptedException {
+ public static void findTmpFiles(ServerContext ctx, TableId tableId, String
dirName,
+ Set<ExternalCompactionId> ecidsForTablet, Consumer<Path> findConsumer) {
+ final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
+ for (Volume vol : vols) {
+ try {
+ final String volPath = vol.getBasePath() + Constants.HDFS_TABLES_DIR +
Path.SEPARATOR
+ + tableId.canonical() + Path.SEPARATOR + dirName;
+ final FileSystem fs = vol.getFileSystem();
+ for (ExternalCompactionId ecid : ecidsForTablet) {
+ final String fileSuffix = "_tmp_" + ecid.canonical();
+ FileStatus[] files = null;
+ try {
+ files = fs.listStatus(new Path(volPath), (path) ->
path.getName().endsWith(fileSuffix));
+ } catch (FileNotFoundException e) {
+ LOG.trace("Failed to list tablet dir {}", volPath, e);
+ }
+ if (files != null) {
+ for (FileStatus file : files) {
+ findConsumer.accept(file.getPath());
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Exception deleting compaction tmp files for table: {}",
tableId, e);
+ }
+ }
+ }
+
+ private static boolean deleteTmpFile(ServerContext context, Path p) throws
IOException {
+ if (context.getVolumeManager().exists(p)) {
+ boolean result = context.getVolumeManager().delete(p);
+ if (result) {
+ LOG.debug("Removed old temp file {}", p);
+ } else {
+ LOG.error("Unable to remove old temp file {}, operation returned false
with no exception",
+ p);
+ }
+ return result;
+ }
+ return true;
+ }
+
+ public static DeleteStats deleteTempFiles(ServerContext context, Set<Path>
filesToDelete) {
+
+ final ExecutorService delSvc;
+ if (filesToDelete.size() < 4) {
+ // Do not bother creating a thread pool and threads for a few files.
+ delSvc = MoreExecutors.newDirectExecutorService();
+ } else {
+ delSvc = Executors.newFixedThreadPool(8);
+ }
- final ExecutorService delSvc = Executors.newFixedThreadPool(8);
- final List<Future<Boolean>> futures = new
ArrayList<>(filesToDelete.size());
final DeleteStats stats = new DeleteStats();
+ // use a linked list to make removal from the middle of the list quick
+ final List<Future<Boolean>> futures = new LinkedList<>();
+
filesToDelete.forEach(p -> {
- futures.add(delSvc.submit(() -> {
- if (context.getVolumeManager().exists(p)) {
- boolean result = context.getVolumeManager().delete(p);
- if (result) {
- LOG.debug("Removed old temp file {}", p);
- } else {
- LOG.error(
- "Unable to remove old temp file {}, operation returned false
with no exception", p);
- }
- return result;
- }
- return true;
- }));
+ futures.add(delSvc.submit(() -> deleteTmpFile(context, p)));
});
delSvc.shutdown();
- int expectedResponses = filesToDelete.size();
- while (expectedResponses > 0) {
- Iterator<Future<Boolean>> iter = futures.iterator();
- while (iter.hasNext()) {
- Future<Boolean> future = iter.next();
- if (future.isDone()) {
- expectedResponses--;
- iter.remove();
- try {
- if (future.get()) {
- stats.success++;
- } else {
- stats.failure++;
+ try {
+ int expectedResponses = filesToDelete.size();
+ while (expectedResponses > 0) {
+ Iterator<Future<Boolean>> iter = futures.iterator();
+ while (iter.hasNext()) {
+ Future<Boolean> future = iter.next();
+ if (future.isDone()) {
+ expectedResponses--;
+ iter.remove();
+ try {
+ if (future.get()) {
+ stats.success++;
+ } else {
+ stats.failure++;
+ }
+ } catch (ExecutionException e) {
+ stats.error++;
+ LOG.error("Error deleting a compaction tmp file", e);
}
- } catch (ExecutionException e) {
- stats.error++;
- LOG.error("Error deleting a compaction tmp file", e);
}
}
+ if (expectedResponses > 0) {
+ LOG.debug("Waiting on {} background delete operations",
expectedResponses);
+ UtilWaitThread.sleep(1_000);
+ }
}
- LOG.debug("Waiting on {} background delete operations",
expectedResponses);
- if (expectedResponses > 0) {
- UtilWaitThread.sleep(3_000);
- }
+ delSvc.awaitTermination(10, TimeUnit.MINUTES);
+ return stats;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
}
- delSvc.awaitTermination(10, TimeUnit.MINUTES);
- return stats;
}
public FindCompactionTmpFiles() {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index cf157fe8ca..fe36b4a3c9 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -93,8 +93,6 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
-import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import
org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -108,7 +106,6 @@ import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
-import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.manager.Manager;
import
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
import
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
@@ -122,8 +119,8 @@ import
org.apache.accumulo.server.compaction.CompactionConfigStorage;
import org.apache.accumulo.server.compaction.CompactionPluginUtils;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -765,6 +762,23 @@ public class CompactionCoordinator
void compactionFailedForLevel(Map<KeyExtent,Set<ExternalCompactionId>>
compactions) {
+ // CompactionFailed is called from the Compactor when either a compaction
fails or is cancelled
+ // and it's called from the DeadCompactionDetector. Remove compaction tmp
files from the tablet
+ // directory that have a corresponding ecid in the name. Must delete any
tmp files before
+ // removing compaction entry from metadata table. This ensures that in the
event of process
+ // death that the dead compaction will be detected in the future and the
files removed then.
+ try (var tablets = ctx.getAmple().readTablets()
+ .forTablets(compactions.keySet(),
Optional.empty()).fetch(ColumnType.DIR).build()) {
+ Set<Path> tmpFilesToDelete = new HashSet<>();
+ for (TabletMetadata tm : tablets) {
+ var extent = tm.getExtent();
+ var ecidsForTablet = compactions.get(extent);
+ FindCompactionTmpFiles.findTmpFiles(ctx, extent.tableId(),
tm.getDirName(), ecidsForTablet,
+ tmpFilesToDelete::add);
+ }
+ FindCompactionTmpFiles.deleteTempFiles(ctx, tmpFilesToDelete);
+ }
+
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
compactions.forEach((extent, ecids) -> {
try {
@@ -791,78 +805,18 @@ public class CompactionCoordinator
}
});
- final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
tabletsMutator.process().forEach((extent, result) -> {
if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
-
// this should try again later when the dead compaction detector
runs, lets log it in case
// its a persistent problem
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove failed compaction {} {}", extent,
compactions.get(extent));
}
- } else {
- // compactionFailed is called from the Compactor when either a
compaction fails or
- // is cancelled and it's called from the DeadCompactionDetector.
This block is
- // entered when the conditional mutator above successfully deletes
an ecid from
- // the tablet metadata. Remove compaction tmp files from the tablet
directory
- // that have a corresponding ecid in the name.
-
- ecidsForTablet.clear();
- ecidsForTablet.addAll(compactions.get(extent));
-
- if (!ecidsForTablet.isEmpty()) {
- final TabletMetadata tm = ctx.getAmple().readTablet(extent,
ColumnType.DIR);
- if (tm != null) {
- final Collection<Volume> vols =
ctx.getVolumeManager().getVolumes();
- for (Volume vol : vols) {
- try {
- final String volPath =
- vol.getBasePath() + Constants.HDFS_TABLES_DIR +
Path.SEPARATOR
- + extent.tableId().canonical() + Path.SEPARATOR +
tm.getDirName();
- final FileSystem fs = vol.getFileSystem();
- for (ExternalCompactionId ecid : ecidsForTablet) {
- final String fileSuffix = "_tmp_" + ecid.canonical();
- FileStatus[] files = null;
- try {
- files = fs.listStatus(new Path(volPath),
- (path) -> path.getName().endsWith(fileSuffix));
- } catch (FileNotFoundException e) {
- LOG.trace("Failed to list tablet dir {}", volPath, e);
- }
- if (files != null) {
- for (FileStatus file : files) {
- if (!fs.delete(file.getPath(), false)) {
- LOG.warn("Unable to delete ecid tmp file: {}: ",
file.getPath());
- } else {
- LOG.debug("Deleted ecid tmp file: {}",
file.getPath());
- }
- }
- }
- }
- } catch (IOException e) {
- LOG.error("Exception deleting compaction tmp files for
tablet: {}", extent, e);
- }
- }
- } else {
- // TabletMetadata does not exist for the extent. This could be
due to a merge or
- // split operation. Use the utility to find tmp files at the
table level
- deadCompactionDetector.addTableId(extent.tableId());
- }
- }
}
});
}
}
- protected Set<ExternalCompactionId> readExternalCompactionIds() {
- try (TabletsMetadata tabletsMetadata =
- this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
- .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) {
- return tabletsMetadata.stream().flatMap(tm ->
tm.getExternalCompactions().keySet().stream())
- .collect(Collectors.toSet());
- }
- }
-
/* Method exists to be called from test */
public CompactionJobQueues getJobQueues() {
return jobQueues;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index ce04296a61..7f09a3efbc 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -18,11 +18,13 @@
*/
package org.apache.accumulo.manager.compaction.coordinator;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,11 +35,11 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -49,7 +51,6 @@ import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.FindCompactionTmpFiles;
-import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +63,6 @@ public class DeadCompactionDetector {
private final CompactionCoordinator coordinator;
private final ScheduledThreadPoolExecutor schedExecutor;
private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
- private final Set<TableId> tablesWithUnreferencedTmpFiles = new HashSet<>();
private final Function<FateInstanceType,FateClient<FateEnv>> fateClients;
public DeadCompactionDetector(ServerContext context, CompactionCoordinator
coordinator,
@@ -75,12 +75,6 @@ public class DeadCompactionDetector {
this.fateClients = fateClients;
}
- public void addTableId(TableId tableWithUnreferencedTmpFiles) {
- synchronized (tablesWithUnreferencedTmpFiles) {
- tablesWithUnreferencedTmpFiles.add(tableWithUnreferencedTmpFiles);
- }
- }
-
private void detectDeadCompactions() {
/*
@@ -162,6 +156,35 @@ public class DeadCompactionDetector {
});
}
+ // Get the list of compaction entries that were removed from the metadata
table by a split or
+ // merge operation. Must get this data before getting the running set of
compactions.
+ List<Ample.OrphanedCompaction> orphanedCompactions;
+ try (Stream<Ample.OrphanedCompaction> listing =
+ context.getAmple().orphanedCompactions().list()) {
+ orphanedCompactions =
listing.collect(Collectors.toCollection(ArrayList::new));
+ }
+
+ // Must get the set of running compactions after reading compaction ids
from the metadata table
+ Set<ExternalCompactionId> running = null;
+ if (!orphanedCompactions.isEmpty() || !tabletCompactions.isEmpty()) {
+ running =
ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context);
+ }
+
+ // Delete any tmp files related to compaction metadata entries that were
removed by split or
+ // merge and are no longer running.
+ if (!orphanedCompactions.isEmpty()) {
+ var runningSet = Objects.requireNonNull(running);
+ orphanedCompactions.removeIf(rc -> runningSet.contains(rc.id()));
+ Set<Path> tmpFilesToDelete = new HashSet<>();
+ orphanedCompactions.forEach(rc -> {
+ log.trace("attempting to find tmp files for removed compaction {}",
rc);
+ FindCompactionTmpFiles.findTmpFiles(context, rc.table(), rc.dir(),
Set.of(rc.id()),
+ tmpFilesToDelete::add);
+ });
+ FindCompactionTmpFiles.deleteTempFiles(context, tmpFilesToDelete);
+ context.getAmple().orphanedCompactions().delete(orphanedCompactions);
+ }
+
if (tabletCompactions.isEmpty()) {
// Clear out dead compactions, tservers don't think anything is running
log.trace("Clearing the dead compaction map, no tablets have compactions
running");
@@ -183,9 +206,6 @@ public class DeadCompactionDetector {
// In order for this overall algorithm to be correct and avoid race
conditions, the compactor
// must return ids covering the time period from before reservation
until after commit. If the
// ids do not cover this time period then legitimate running compactions
could be canceled.
- Collection<ExternalCompactionId> running =
- ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context);
-
running.forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.debug("Ignoring compaction {} that is running on a compactor",
ecid);
@@ -230,37 +250,6 @@ public class DeadCompactionDetector {
coordinator.compactionsFailed(tabletCompactions);
this.deadCompactions.keySet().removeAll(toFail);
}
-
- // Find and delete compaction tmp files that are unreferenced
- if (!tablesWithUnreferencedTmpFiles.isEmpty()) {
-
- Set<TableId> copy = new HashSet<>();
- synchronized (tablesWithUnreferencedTmpFiles) {
- copy.addAll(tablesWithUnreferencedTmpFiles);
- tablesWithUnreferencedTmpFiles.clear();
- }
-
- log.debug("Tables that may have unreferenced compaction tmp files: {}",
copy);
- for (TableId tid : copy) {
- try {
- final Set<Path> matches =
FindCompactionTmpFiles.findTempFiles(context, tid.canonical());
- log.debug("Found the following compaction tmp files for table {}:",
tid);
- matches.forEach(p -> log.debug("{}", p));
-
- if (!matches.isEmpty()) {
- log.debug("Deleting compaction tmp files for table {}...", tid);
- DeleteStats stats =
FindCompactionTmpFiles.deleteTempFiles(context, matches);
- log.debug(
- "Deletion of compaction tmp files for table {} complete.
Success:{}, Failure:{}, Error:{}",
- tid, stats.success, stats.failure, stats.error);
- }
- } catch (InterruptedException e) {
- log.error("Interrupted while finding compaction tmp files for table:
{}", tid.canonical(),
- e);
- }
- }
- }
-
}
public void start() {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
index 8e8241fb92..d184b39cb4 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
@@ -75,6 +75,7 @@ public class MergeTablets extends AbstractFateOperation {
Map<StoredTabletFile,DataFileValue> newFiles = new HashMap<>();
TabletMetadata firstTabletMeta = null;
TabletMetadata lastTabletMeta = null;
+ List<Ample.OrphanedCompaction> orphanedCompactions = new ArrayList<>();
try (var tabletsMetadata =
env.getContext().getAmple().readTablets().forTable(range.tableId())
.overlapping(range.prevEndRow(), range.endRow()).build()) {
@@ -141,6 +142,19 @@ public class MergeTablets extends AbstractFateOperation {
dirs.clear();
}
}
+
+ // These compaction metadata entries will be deleted, queue up removal
of the tmp file once
+ // the compaction is no longer running
+ tabletMeta.getExternalCompactions().keySet().stream()
+ .map(ecid -> new Ample.OrphanedCompaction(ecid,
tabletMeta.getExtent().tableId(),
+ tabletMeta.getDirName()))
+ .forEach(orphanedCompactions::add);
+ if (orphanedCompactions.size() > 1000 && tabletsSeen > 1) {
+ orphanedCompactions
+ .forEach(rc -> log.trace("{} adding removed compaction {}",
fateId, rc));
+
env.getContext().getAmple().orphanedCompactions().add(orphanedCompactions);
+ orphanedCompactions.clear();
+ }
}
if (tabletsSeen == 1) {
@@ -154,6 +168,9 @@ public class MergeTablets extends AbstractFateOperation {
lastTabletMeta);
}
+ orphanedCompactions.forEach(rc -> log.trace("{} adding removed compaction
{}", fateId, rc));
+ env.getContext().getAmple().orphanedCompactions().add(orphanedCompactions);
+
log.info("{} merge low tablet {}", fateId, firstTabletMeta.getExtent());
log.info("{} merge high tablet {}", fateId, lastTabletMeta.getExtent());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
index c12386a372..f4d2d6df00 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
@@ -266,6 +266,16 @@ public class UpdateTablets extends AbstractFateOperation {
private void updateExistingTablet(FateId fateId, ServerContext ctx,
TabletMetadata tabletMetadata,
TabletOperationId opid, NavigableMap<KeyExtent,TabletMergeability>
newTablets,
Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) {
+
+ // queue up the tmp files related to these compaction metadata entries to
be eventually deleted
+ // once the compaction is no longer running
+ var removedCompactions =
tabletMetadata.getExternalCompactions().keySet().stream()
+ .map(ecid -> new Ample.OrphanedCompaction(ecid,
tabletMetadata.getExtent().tableId(),
+ tabletMetadata.getDirName()))
+ .toList();
+ removedCompactions.forEach(rc -> log.trace("{} adding removed compaction
{}", fateId, rc));
+ ctx.getAmple().orphanedCompactions().add(removedCompactions);
+
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var newExtent = newTablets.navigableKeySet().last();
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
index 7bd6824f2f..002ffbeea8 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
@@ -44,11 +44,15 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED;
import static
org.apache.accumulo.manager.tableOps.split.UpdateTabletsTest.newSTF;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
@@ -56,6 +60,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.data.NamespaceId;
@@ -202,9 +207,12 @@ public class MergeTabletsTest {
EasyMock.expect(lastTabletMeta.getUnSplittable()).andReturn(unsplittableMeta).atLeastOnce();
EasyMock.expect(lastTabletMeta.getTabletMergeability()).andReturn(mergeability).atLeastOnce();
EasyMock.expect(lastTabletMeta.getMigration()).andReturn(migration).atLeastOnce();
+
EasyMock.expect(lastTabletMeta.getDirName()).andReturn("td3").atLeastOnce();
EasyMock.replay(lastTabletMeta, compactions);
+ Set<Ample.OrphanedCompaction> orphanedCompactions = new HashSet<>();
+
testMerge(List.of(tablet1, tablet2, lastTabletMeta), tableId, null, null,
tabletMutator -> {
EasyMock.expect(tabletMutator.putTime(MetadataTime.parse("L30"))).andReturn(tabletMutator)
.once();
@@ -243,9 +251,13 @@ public class MergeTabletsTest {
.andReturn(tabletMutator).once();
EasyMock.expect(tabletMutator.deleteMigration()).andReturn(tabletMutator);
- });
+ }, orphanedCompactions::add);
EasyMock.verify(lastTabletMeta, compactions);
+
+ assertEquals(Set.of(new Ample.OrphanedCompaction(cid1, tableId, "td3"),
+ new Ample.OrphanedCompaction(cid2, tableId, "td3"),
+ new Ample.OrphanedCompaction(cid3, tableId, "td3")),
orphanedCompactions);
}
@Test
@@ -420,6 +432,12 @@ public class MergeTabletsTest {
private static void testMerge(List<TabletMetadata> inputTablets, TableId
tableId, String start,
String end, Consumer<ConditionalTabletMutatorImpl> expectationsSetter)
throws Exception {
+ testMerge(inputTablets, tableId, start, end, expectationsSetter,
removedCompaction -> fail());
+ }
+
+ private static void testMerge(List<TabletMetadata> inputTablets, TableId
tableId, String start,
+ String end, Consumer<ConditionalTabletMutatorImpl> expectationsSetter,
+ Consumer<Ample.OrphanedCompaction> orphanedCompactionConsumer) throws
Exception {
MergeInfo mergeInfo =
new MergeInfo(tableId, NamespaceId.of("1"), start == null ? null :
start.getBytes(UTF_8),
end == null ? null : end.getBytes(UTF_8),
MergeInfo.Operation.MERGE);
@@ -434,6 +452,24 @@ public class MergeTabletsTest {
EasyMock.mock(ConditionalTabletsMutatorImpl.class);
ConditionalTabletMutatorImpl tabletMutator =
EasyMock.mock(ConditionalTabletMutatorImpl.class);
+ Ample.OrphanedCompactionStore orphanedCompactionStore = new
Ample.OrphanedCompactionStore() {
+ @Override
+ public Stream<Ample.OrphanedCompaction> list() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void add(Collection<Ample.OrphanedCompaction> removedCompactions)
{
+ removedCompactions.forEach(orphanedCompactionConsumer);
+ }
+
+ @Override
+ public void delete(Collection<Ample.OrphanedCompaction>
removedCompactions) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
EasyMock.expect(ample.orphanedCompactions()).andReturn(orphanedCompactionStore);
+
ServiceLock managerLock = EasyMock.mock(ServiceLock.class);
EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes();
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
index dad40a0cb1..13ad3bde70 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
@@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
@@ -37,6 +39,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
@@ -250,6 +253,24 @@ public class UpdateTabletsTest {
EasyMock.expect(fateEnv.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce();
EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(100_000,
TimeUnit.SECONDS))
.atLeastOnce();
+ Set<Ample.OrphanedCompaction> orphanedCompactionSet = new HashSet<>();
+ Ample.OrphanedCompactionStore ocs = new Ample.OrphanedCompactionStore() {
+ @Override
+ public Stream<Ample.OrphanedCompaction> list() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void add(Collection<Ample.OrphanedCompaction> removedCompactions)
{
+ orphanedCompactionSet.addAll(removedCompactions);
+ }
+
+ @Override
+ public void delete(Collection<Ample.OrphanedCompaction>
removedCompactions) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ EasyMock.expect(ample.orphanedCompactions()).andReturn(ocs).atLeastOnce();
ServiceLock managerLock = EasyMock.mock(ServiceLock.class);
EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes();
@@ -289,6 +310,7 @@ public class UpdateTabletsTest {
UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002,
tabletFiles.keySet());
EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce();
EasyMock.expect(tabletMeta.getMigration()).andReturn(migration).atLeastOnce();
+ EasyMock.expect(tabletMeta.getDirName()).andReturn("td1").atLeastOnce();
EasyMock.expect(ample.readTablet(origExtent)).andReturn(tabletMeta);
@@ -408,6 +430,10 @@ public class UpdateTabletsTest {
EasyMock.verify(fateEnv, context, ample, tabletMeta, fileRangeCache,
tabletsMutator,
tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
+
+ assertEquals(Set.of(new Ample.OrphanedCompaction(cid1, tableId, "td1"),
+ new Ample.OrphanedCompaction(cid2, tableId, "td1"),
+ new Ample.OrphanedCompaction(cid3, tableId, "td1")),
orphanedCompactionSet);
}
@Test
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java
index ad7b19325b..fd4d228f6c 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java
@@ -138,7 +138,7 @@ public abstract class ExternalCompaction2ITBase extends
SharedMiniClusterBase {
// Verify that the tmp file are cleaned up
Wait.waitFor(() -> FindCompactionTmpFiles
- .findTempFiles(getCluster().getServerContext(),
tid.canonical()).size() == 0, 60_000);
+ .findTempFiles(getCluster().getServerContext(),
tid.canonical()).isEmpty(), 60_000);
}
}
@@ -200,7 +200,7 @@ public abstract class ExternalCompaction2ITBase extends
SharedMiniClusterBase {
// Verify that the tmp file are cleaned up
Wait.waitFor(() -> FindCompactionTmpFiles
- .findTempFiles(getCluster().getServerContext(),
tid.canonical()).size() == 0);
+ .findTempFiles(getCluster().getServerContext(),
tid.canonical()).isEmpty());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
index 0101865b3f..b011f3fb9f 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
@@ -150,14 +150,16 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
.collect(Collectors.toSet());
}
}
- // We need to cancel the compaction or delete the table here because we
initiate a user
- // compaction above in the test. Even though the external compaction was
cancelled
- // because we split the table, FaTE will continue to queue up a
compaction
- client.tableOperations().delete(table1);
// Verify that the tmp file are cleaned up
Wait.waitFor(() -> FindCompactionTmpFiles
- .findTempFiles(getCluster().getServerContext(),
tid.canonical()).size() == 0);
+ .findTempFiles(getCluster().getServerContext(),
tid.canonical()).isEmpty());
+
+ // We need to cancel the compaction or delete the table here because we
initiate a user
+ // compaction above in the test. Even though the external compaction was
cancelled
+ // because we split the table, FaTE would continue to queue up a
compaction
+ client.tableOperations().cancelCompaction(table1);
+
} finally {
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
getCluster().getClusterControl().start(ServerType.COMPACTOR);