This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 4652c5c479 Looks ahead more than once in compaction planning. (#5588)
4652c5c479 is described below
commit 4652c5c479e61d34d76bad9baf8806cbefc893c7
Author: Keith Turner <[email protected]>
AuthorDate: Mon Jun 9 16:00:04 2025 -0400
Looks ahead more than once in compaction planning. (#5588)
When planning compactions for a tablet that currently has compactions
running, then planning process will estimate what the size of the file
the running compactions would produce and see if its more optimal to
wait for these. When doing this it only looks ahead once. This change
makes the planning process look ahead more for files to compact.
---
.../spi/compaction/DefaultCompactionPlanner.java | 36 ++++++++----
.../compaction/DefaultCompactionPlannerTest.java | 66 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index e0985329eb..38af298b6e 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -285,7 +286,7 @@ public class DefaultCompactionPlanner implements
CompactionPlanner {
// more than logarithmic work across multiple comapctions.
filesCopy.removeAll(group);
- filesCopy.add(getExpected(group, 0));
+ filesCopy.add(getExpectedFile(group, new AtomicInteger(0)));
if (findDataFilesToCompact(filesCopy, params.getRatio(),
maxFilesToCompact,
maxSizeToCompact).isEmpty()) {
@@ -302,7 +303,8 @@ public class DefaultCompactionPlanner implements
CompactionPlanner {
// to complete.
// The set of files running compactions may produce
- var expectedFiles = getExpected(params.getRunningCompactions());
+ AtomicInteger nextExpected = new AtomicInteger(0);
+ var expectedFiles = getExpected(params.getRunningCompactions(),
nextExpected);
if (!Collections.disjoint(filesCopy, expectedFiles)) {
throw new AssertionError();
@@ -313,6 +315,20 @@ public class DefaultCompactionPlanner implements
CompactionPlanner {
group = findDataFilesToCompact(filesCopy, params.getRatio(),
maxFilesToCompact,
maxSizeToCompact);
+ while (!group.isEmpty() && !Collections.disjoint(group,
expectedFiles)) {
+ // remove these files as compaction candidates because they include
a file that a running
+ // compaction would produce
+ filesCopy.removeAll(group);
+ // Create a fake file+size entry that predicts what this projected
compaction would
+ // produce
+ var futureFile = getExpectedFile(group, nextExpected);
+ Preconditions.checkState(expectedFiles.add(futureFile), "Unexpected
duplicate %s in %s",
+ futureFile, expectedFiles);
+ // look for any compaction work in the remaining set of files
+ group = findDataFilesToCompact(filesCopy, params.getRatio(),
maxFilesToCompact,
+ maxSizeToCompact);
+ }
+
if (!Collections.disjoint(group, expectedFiles)) {
// file produced by running compaction will eventually compact with
existing files, so
// wait.
@@ -461,12 +477,12 @@ public class DefaultCompactionPlanner implements
CompactionPlanner {
return Long.MAX_VALUE;
}
- private CompactableFile getExpected(Collection<CompactableFile> files, int
count) {
+ private CompactableFile getExpectedFile(Collection<CompactableFile> files,
AtomicInteger next) {
long size =
files.stream().mapToLong(CompactableFile::getEstimatedSize).sum();
try {
- return CompactableFile.create(
- new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" +
count + ".rf"), size,
- 0);
+ return CompactableFile.create(new URI(
+ "hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" +
next.getAndIncrement() + ".rf"),
+ size, 0);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
@@ -475,15 +491,13 @@ public class DefaultCompactionPlanner implements
CompactionPlanner {
/**
* @return the expected files sizes for sets of compacting files.
*/
- private Set<CompactableFile> getExpected(Collection<CompactionJob>
compacting) {
+ private Set<CompactableFile> getExpected(Collection<CompactionJob>
compacting,
+ AtomicInteger next) {
Set<CompactableFile> expected = new HashSet<>();
- int count = 0;
-
for (CompactionJob job : compacting) {
- count++;
- expected.add(getExpected(job.getFiles(), count));
+ expected.add(getExpectedFile(job.getFiles(), next));
}
return expected;
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index b2cf3287f5..009909f106 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -63,6 +63,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
public class DefaultCompactionPlannerTest {
@@ -187,6 +188,71 @@ public class DefaultCompactionPlannerTest {
assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"),
job.getExecutor());
}
+ @Test
+ public void testRunningCompactionLookAhead() {
+ String executors = "[{'name':'small','type':
'internal','maxSize':'32M','numThreads':1},"
+ + "{'name':'medium','type':
'internal','maxSize':'128M','numThreads':2},"
+ + "{'name':'large','type':
'internal','maxSize':'512M','numThreads':3},"
+ + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+ var planner = createPlanner(defaultConf, executors);
+
+ int count = 0;
+
+ // create 4 files of size 10 as compacting
+ List<String> compactingString = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ compactingString.add("F" + count++);
+ compactingString.add(10 + "");
+ }
+
+ // create 4 files of size 100,1000,10_000, and 100_000 as the tablets files
+ List<String> candidateStrings = new ArrayList<>();
+ for (int size = 100; size < 1_000_000; size *= 10) {
+ for (int i = 0; i < 4; i++) {
+ candidateStrings.add("F" + count++);
+ candidateStrings.add(size + "");
+ }
+ }
+
+ var compacting = createCFs(compactingString.toArray(new String[0]));
+ var candidates = createCFs(candidateStrings.toArray(new String[0]));
+ var all = Sets.union(compacting, candidates);
+ var jobs = Set.of(createJob(CompactionKind.SYSTEM, all, compacting));
+ var params = createPlanningParams(all, candidates, jobs, 2,
CompactionKind.SYSTEM);
+ var plan = planner.makePlan(params);
+
+ // the size 100 files should be excluded because the job running over size
10 files will produce
+ // a file in their size range, so should see the 1000 size files planned
for compaction
+ var job = getOnlyElement(plan.getJobs());
+ assertEquals(4, job.getFiles().size());
+ assertTrue(job.getFiles().stream().allMatch(f -> f.getEstimatedSize() ==
1_000));
+
+ // try planning again incorporating the job returned from previous plan
+ var jobs2 = Sets.union(jobs, Set.copyOf(plan.getJobs()));
+ var candidates2 = new HashSet<>(candidates);
+ candidates2.removeAll(job.getFiles());
+ params = createPlanningParams(all, candidates2, jobs2, 2,
CompactionKind.SYSTEM);
+ plan = planner.makePlan(params);
+
+ // The two running jobs are over 10 and 1000 sized files. The jobs should
exclude 100 and 10_000
+ // sized files because they would produce a file in those size ranges.
This leaves the 100_000
+ // sized files available to compact.
+ job = getOnlyElement(plan.getJobs());
+ assertEquals(4, job.getFiles().size());
+ assertTrue(job.getFiles().stream().allMatch(f -> f.getEstimatedSize() ==
100_000));
+
+ // try planning again incorporating the job returned from previous plan
+ var jobs3 = Sets.union(jobs2, Set.copyOf(plan.getJobs()));
+ var candidates3 = new HashSet<>(candidates2);
+ candidates3.removeAll(job.getFiles());
+ params = createPlanningParams(all, candidates3, jobs3, 2,
CompactionKind.SYSTEM);
+ plan = planner.makePlan(params);
+
+ // should find nothing to compact at this point
+ assertEquals(0, plan.getJobs().size());
+ }
+
/**
* Tests that the maxOpen property overrides the deprecated open.max
property with the default
* service