This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c7deced5e0 uses table id in compaction prioritization (#4142) c7deced5e0 is described below commit c7deced5e0190dd297b6168a58eacd83ba86c74e Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Jan 9 16:34:00 2024 -0500 uses table id in compaction prioritization (#4142) Makes the default compaction planner use the table id when prioritizing compactions. The root table id is always given the highest priority, followed by the metadata table, then user tables. Fixes #4033 --- .../spi/compaction/DefaultCompactionPlanner.java | 4 +- .../util/compaction/CompactionJobPrioritizer.java | 73 +++++++++---- .../compaction/DefaultCompactionPlannerTest.java | 3 + .../util/compaction/CompactionPrioritizerTest.java | 118 +++++++++++++++++---- 4 files changed, 159 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index 4cb1f3a478..2396ad85d3 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -432,8 +432,8 @@ public class DefaultCompactionPlanner implements CompactionPlanner { private static short createPriority(PlanningParameters params, Collection<CompactableFile> group) { - return CompactionJobPrioritizer.createPriority(params.getKind(), params.getAll().size(), - group.size()); + return CompactionJobPrioritizer.createPriority(params.getTableId(), params.getKind(), + params.getAll().size(), group.size()); } private long getMaxSizeToCompact(CompactionKind kind) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobPrioritizer.java index 261349cb6f..0232a6bcc7 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobPrioritizer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobPrioritizer.java @@ -20,40 +20,77 @@ package org.apache.accumulo.core.util.compaction; import java.util.Comparator; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; +import com.google.common.base.Preconditions; + public class CompactionJobPrioritizer { public static final Comparator<CompactionJob> JOB_COMPARATOR = Comparator.comparingInt(CompactionJob::getPriority) .thenComparingInt(job -> job.getFiles().size()).reversed(); - public static short createPriority(CompactionKind kind, int totalFiles, int compactingFiles) { + private static final short ROOT_USER_MAX = Short.MAX_VALUE; + private static final short ROOT_USER_MIN = ROOT_USER_MAX - 1000; + private static final short ROOT_SYSTEM_MAX = ROOT_USER_MIN - 1; + private static final short ROOT_SYSTEM_MIN = ROOT_SYSTEM_MAX - 1000; + private static final short METADATA_USER_MAX = ROOT_SYSTEM_MIN - 1; + private static final short METADATA_USER_MIN = METADATA_USER_MAX - 1000; + private static final short METADATA_SYSTEM_MAX = METADATA_USER_MIN - 1; + private static final short METADATA_SYSTEM_MIN = METADATA_SYSTEM_MAX - 1000; + private static final short USER_USER_MAX = METADATA_SYSTEM_MIN - 1; + private static final short USER_USER_MIN = USER_USER_MAX - 30768; + private static final short USER_SYSTEM_MAX = USER_USER_MIN - 1; + private static final short USER_SYSTEM_MIN = Short.MIN_VALUE; - int prio = totalFiles + compactingFiles; + public static short createPriority(TableId tableId, CompactionKind kind, int totalFiles, + int compactingFiles) { - switch (kind) { - case USER: - // user-initiated compactions will have a positive priority - // based on number of files - if (prio > Short.MAX_VALUE) { - return Short.MAX_VALUE; + Preconditions.checkArgument(totalFiles >= 0, "totalFiles is negative %s", totalFiles); + Preconditions.checkArgument(compactingFiles >= 0, "compactingFiles is negative %s", + compactingFiles); + + int min; + int max; + // This holds the two bits used to encode the priority of the table. + int tablePrefix; + + switch (Ample.DataLevel.of(tableId)) { + case ROOT: + if (kind == CompactionKind.USER) { + min = ROOT_USER_MIN; + max = ROOT_USER_MAX; + } else { + min = ROOT_SYSTEM_MIN; + max = ROOT_SYSTEM_MAX; } - return (short) prio; - case SELECTOR: - case SYSTEM: - // system-initiated compactions will have a negative priority - // starting at -32768 and increasing based on number of files - // maxing out at -1 - if (prio > Short.MAX_VALUE) { - return -1; + break; + case METADATA: + if (kind == CompactionKind.USER) { + min = METADATA_USER_MIN; + max = METADATA_USER_MAX; } else { - return (short) (Short.MIN_VALUE + prio); + min = METADATA_SYSTEM_MIN; + max = METADATA_SYSTEM_MAX; } + break; + case USER: + if (kind == CompactionKind.USER) { + min = USER_USER_MIN; + max = USER_USER_MAX; + } else { + min = USER_SYSTEM_MIN; + max = USER_SYSTEM_MAX; + } + break; default: - throw new AssertionError("Unknown kind " + kind); + throw new IllegalStateException("Unknown data level" + Ample.DataLevel.of(tableId)); } + + return (short) Math.min(max, min + totalFiles + compactingFiles); } } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index 265778f1af..d0f3e77eac 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder; import org.apache.accumulo.core.spi.compaction.CompactionPlanner.InitParameters; import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl; @@ -201,6 +202,8 @@ public class DefaultCompactionPlannerTest { var job = getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); assertEquals(CompactorGroupIdImpl.groupId("medium"), job.getGroup()); + assertEquals(CompactionJobPrioritizer.createPriority(TableId.of("42"), CompactionKind.USER, + all.size(), job.getFiles().size()), job.getPriority()); // should only run one user compaction at a time compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "3M", "F2", "3M"))); diff --git a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java index a112dbd6b1..f727b8512b 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java @@ -18,7 +18,10 @@ */ package org.apache.accumulo.core.util.compaction; +import static org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer.createPriority; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.util.ArrayList; @@ -28,6 +31,9 @@ import java.util.List; import java.util.Optional; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.junit.jupiter.api.Test; @@ -42,29 +48,103 @@ public class CompactionPrioritizerTest { .create(URI.create("hdfs://foonn/accumulo/tables/5/" + tablet + "/" + i + ".rf"), 4, 4)); } // TODO pass numFiles - return new CompactionJobImpl( - CompactionJobPrioritizer.createPriority(kind, totalFiles, numFiles), + return new CompactionJobImpl(createPriority(TableId.of("1"), kind, totalFiles, numFiles), CompactorGroupIdImpl.groupId("test"), files, kind, Optional.of(false)); } @Test - public void testPrioritizer() throws Exception { - assertEquals((short) 0, CompactionJobPrioritizer.createPriority(CompactionKind.USER, 0, 0)); - assertEquals((short) 10000, - CompactionJobPrioritizer.createPriority(CompactionKind.USER, 10000, 0)); - assertEquals((short) 32767, - CompactionJobPrioritizer.createPriority(CompactionKind.USER, 32767, 0)); - assertEquals((short) 32767, - CompactionJobPrioritizer.createPriority(CompactionKind.USER, Integer.MAX_VALUE, 0)); - - assertEquals((short) -32768, - CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, 0, 0)); - assertEquals((short) -22768, - CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, 10000, 0)); - assertEquals((short) -1, - CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, 32767, 0)); - assertEquals((short) -1, - CompactionJobPrioritizer.createPriority(CompactionKind.SYSTEM, Integer.MAX_VALUE, 0)); + public void testOrdering() { + short pr1 = createPriority(RootTable.ID, CompactionKind.USER, 10000, 1); + assertEquals(Short.MAX_VALUE, pr1); + short pr2 = createPriority(RootTable.ID, CompactionKind.USER, 100, 30); + assertTrue(pr1 > pr2); + short pr3 = createPriority(RootTable.ID, CompactionKind.USER, 100, 1); + assertTrue(pr2 > pr3); + short pr4 = createPriority(RootTable.ID, CompactionKind.USER, 1, 1); + assertTrue(pr3 > pr4); + short pr5 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 10000, 1); + assertTrue(pr4 > pr5); + short pr6 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 100, 30); + assertTrue(pr5 > pr6); + short pr7 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 100, 1); + assertTrue(pr6 > pr7); + short pr8 = createPriority(RootTable.ID, CompactionKind.SYSTEM, 1, 1); + assertTrue(pr7 > pr8); + + short pm1 = createPriority(MetadataTable.ID, CompactionKind.USER, 10000, 1); + assertTrue(pr8 > pm1); + short pm2 = createPriority(MetadataTable.ID, CompactionKind.USER, 100, 30); + assertTrue(pm1 > pm2); + short pm3 = createPriority(MetadataTable.ID, CompactionKind.USER, 100, 1); + assertTrue(pm2 > pm3); + short pm4 = createPriority(MetadataTable.ID, CompactionKind.USER, 1, 1); + assertTrue(pm3 > pm4); + short pm5 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 10000, 1); + assertTrue(pm4 > pm5); + short pm6 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 100, 30); + assertTrue(pm5 > pm6); + short pm7 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 100, 1); + assertTrue(pm6 > pm7); + short pm8 = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 1, 1); + assertTrue(pm7 > pm8); + + var userTable1 = TableId.of("1"); + var userTable2 = TableId.of("2"); + + short pu1 = createPriority(userTable1, CompactionKind.USER, 10000, 1); + assertTrue(pm8 > pu1); + short pu2 = createPriority(userTable2, CompactionKind.USER, 1000, 30); + assertTrue(pu1 > pu2); + short pu3 = createPriority(userTable1, CompactionKind.USER, 1000, 1); + assertTrue(pu2 > pu3); + short pu4 = createPriority(userTable2, CompactionKind.USER, 1, 1); + assertTrue(pu3 > pu4); + short pu5 = createPriority(userTable1, CompactionKind.SYSTEM, 10000, 1); + assertTrue(pu4 > pu5); + short pu6 = createPriority(userTable2, CompactionKind.SYSTEM, 1000, 30); + assertTrue(pu5 > pu6); + short pu7 = createPriority(userTable1, CompactionKind.SYSTEM, 1000, 1); + assertTrue(pu6 > pu7); + short pu8 = createPriority(userTable2, CompactionKind.SYSTEM, 1, 1); + assertTrue(pu7 > pu8); + assertEquals(Short.MIN_VALUE + 2, pu8); + } + + @Test + public void testBoundary() { + var userTable = TableId.of("1"); + + short minRootUser = createPriority(RootTable.ID, CompactionKind.USER, 1, 1); + short minRootSystem = createPriority(RootTable.ID, CompactionKind.SYSTEM, 1, 1); + short minMetaUser = createPriority(MetadataTable.ID, CompactionKind.USER, 1, 1); + short minMetaSystem = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, 1, 1); + short minUserUser = createPriority(userTable, CompactionKind.USER, 1, 1); + + // Test the boundary condition around the max number of files to encode. Ensure the next level + // is always greater no matter how many files. + for (int files = 1; files < 100_000; files += 1) { + short rootSystem = createPriority(RootTable.ID, CompactionKind.SYSTEM, files, 1); + assertTrue(minRootUser > rootSystem); + short metaUser = createPriority(MetadataTable.ID, CompactionKind.USER, files, 1); + assertTrue(minRootSystem > metaUser); + short metaSystem = createPriority(MetadataTable.ID, CompactionKind.SYSTEM, files, 1); + assertTrue(minMetaUser > metaSystem); + short userUser = createPriority(userTable, CompactionKind.USER, files, 1); + assertTrue(minMetaSystem > userUser); + short userSystem = createPriority(userTable, CompactionKind.SYSTEM, files, 1); + assertTrue(minUserUser > userSystem); + } + + } + + @Test + public void testNegative() { + for (var tableId : List.of(TableId.of("1"), TableId.of("2"), RootTable.ID, MetadataTable.ID)) { + for (var kind : CompactionKind.values()) { + assertThrows(IllegalArgumentException.class, () -> createPriority(tableId, kind, -5, 2)); + assertThrows(IllegalArgumentException.class, () -> createPriority(tableId, kind, 10, -5)); + } + } } @Test