This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 1bf7dcb184cb2c2ed9e9a4768370a7980a6c22bd Merge: 8960452814 b8869fb72e Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Wed Dec 27 23:37:54 2023 +0000 Merge branch 'main' into elasticity .../spi/compaction/DefaultCompactionPlanner.java | 6 +- .../compaction/CompactionPlannerInitParams.java | 2 +- .../compaction/DefaultCompactionPlannerTest.java | 365 +++++++-------------- 3 files changed, 120 insertions(+), 253 deletions(-) diff --cc core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index e25b7c519b,bafda93e3e..6447a2d147 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@@ -29,15 -27,16 +28,18 @@@ import static org.junit.jupiter.api.Ass import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Collections; + import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; + import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; + import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.spi.common.ServiceEnvironment; @@@ -243,152 -268,9 +271,167 @@@ public class DefaultCompactionPlannerTe plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), job.getExecutor()); } + @Test + public void testMultipleCompactions() { + // This test validates that when a tablet has many files that multiple compaction jobs can be + // issued at the same time. ++ String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," ++ + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," ++ + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; ++ + for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) { - var planner = createPlanner(false); ++ var planner = createPlanner(defaultConf, executors); + var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()); + // simulate 10 larger files, these should not compact at the same time as the smaller files. + // Its more optimal to wait for all of the smaller files to compact and them compact the + // output of compacting the smaller files with the larger files. + IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add); + var params = createPlanningParams(all, all, Set.of(), 2, kind); + var plan = planner.makePlan(params); + + // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each. + assertEquals(66, plan.getJobs().size()); + Set<CompactableFile> filesSeen = new HashSet<>(); + plan.getJobs().forEach(job -> { + assertEquals(15, job.getFiles().size()); + assertEquals(kind, job.getKind()); - assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); ++ assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); + // ensure the files across all of the jobs are disjoint + job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf))); + }); + + // Ensure all of the smaller files are scheduled for compaction. Should not see any of the + // larger files. + assertEquals(IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()), + filesSeen); + } + } + + @Test + public void testMultipleCompactionsAndLargeCompactionRatio() { - var planner = createPlanner(false); ++ ++ String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," ++ + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," ++ + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; ++ var planner = createPlanner(defaultConf, executors); + var all = IntStream.range(0, 65).mapToObj(i -> createCF("F" + i, i + 1)).collect(toSet()); + // This compaction ratio would not cause a system compaction, how a user compaction must compact + // all of the files so it should generate some compactions. + var params = createPlanningParams(all, all, Set.of(), 100, CompactionKind.USER); + var plan = planner.makePlan(params); + + assertEquals(3, plan.getJobs().size()); + + var iterator = plan.getJobs().iterator(); + var job1 = iterator.next(); + var job2 = iterator.next(); + var job3 = iterator.next(); + assertTrue(Collections.disjoint(job1.getFiles(), job2.getFiles())); + assertTrue(Collections.disjoint(job1.getFiles(), job3.getFiles())); + assertTrue(Collections.disjoint(job2.getFiles(), job3.getFiles())); + + for (var job : plan.getJobs()) { + assertEquals(15, job.getFiles().size()); + assertEquals(CompactionKind.USER, job.getKind()); + assertTrue(all.containsAll(job.getFiles())); + // Should select three sets of files that are from the smallest 45 files. + assertTrue(job.getFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum() + <= IntStream.range(1, 46).sum()); + } + } + + @Test + public void testMultipleCompactionsAndRunningCompactions() { + // This test validates that when a tablet has many files that multiple compaction jobs can be + // issued at the same time even if there are running compaction as long everything meets the + // compaction ratio. ++ String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," ++ + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," ++ + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; + for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) { - var planner = createPlanner(false); ++ var planner = createPlanner(defaultConf, executors); + var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()); + // simulate 10 larger files, these should not compact at the same time as the smaller files. + // Its more optimal to wait for all of the smaller files to compact and them compact the + // output of compacting the smaller files with the larger files. + IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add); + // 30 files are compacting, so they will not be in the candidate set. + var candidates = + IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()); + // create two jobs covering the first 30 files + var job1 = createJob(kind, all, + IntStream.range(0, 15).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet())); + var job2 = createJob(kind, all, + IntStream.range(15, 30).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet())); + var params = createPlanningParams(all, candidates, Set.of(job1, job2), 2, kind); + var plan = planner.makePlan(params); + + // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each. + assertEquals(64, plan.getJobs().size()); + Set<CompactableFile> filesSeen = new HashSet<>(); + plan.getJobs().forEach(job -> { + assertEquals(15, job.getFiles().size()); + assertEquals(kind, job.getKind()); - assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); ++ assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); + // ensure the files across all of the jobs are disjoint + job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf))); + }); + + // Ensure all of the smaller files are scheduled for compaction. Should not see any of the + // larger files. + assertEquals(IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()), + filesSeen); + } + } + + @Test + public void testUserCompactionDoesNotWaitOnSystemCompaction() { + // this test ensure user compactions do not wait on system compactions to complete - var planner = createPlanner(true); ++ ++ String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," ++ + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," ++ + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; ++ var planner = createPlanner(defaultConf, executors); + var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "3M", "F5", "3M", "F6", "3M", + "F7", "20M"); + var candidates = createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"); + var compacting = Set + .of(createJob(CompactionKind.SYSTEM, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M"))); + var params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM); + var plan = planner.makePlan(params); + // The planning of the system compaction should find its most optimal to wait on the running + // system compaction and emit zero jobs. + assertEquals(0, plan.getJobs().size()); + + params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + // The planning of user compaction should not take the running system compaction into + // consideration and should create a compaction job. + assertEquals(1, plan.getJobs().size()); + assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"), + getOnlyElement(plan.getJobs()).getFiles()); + + // Reverse the situation and turn the running compaction into a user compaction + compacting = + Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M"))); + params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM); + plan = planner.makePlan(params); + // The planning of a system compaction should not take the running user compaction into account + // and should emit a job + assertEquals(1, plan.getJobs().size()); + assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M"), + getOnlyElement(plan.getJobs()).getFiles()); + + params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + // The planning of the user compaction should decide the most optimal thing to do is to wait on + // the running user compaction and should not emit any jobs. + assertEquals(0, plan.getJobs().size()); + } + @Test public void testQueueCreation() throws Exception { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); @@@ -505,19 -362,13 +523,13 @@@ @Test public void testErrorExternalNoQueue() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); + String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type':'external','maxSize':'512M'}]"; - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','maxSize':'128M','numThreads':2", - "'type': 'external','maxSize':'512M'"); var e = assertThrows(NullPointerException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); - assertTrue(e.getMessage().contains("queue"), "Error message didn't contain queue"); + assertTrue(e.getMessage().contains("group"), "Error message didn't contain group"); } /** @@@ -579,17 -416,12 +577,11 @@@ @Test public void testErrorOnlyOneMaxSize() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','numThreads':2", "'type': 'external','group':'q1'"); + String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," + + "{'name':'medium','type':'internal','numThreads':2}," - + "{'name':'large','type':'external','queue':'q1'}]"; - ++ + "{'name':'large','type':'internal','numThreads':'3'}]"; var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } @@@ -599,118 -431,24 +591,32 @@@ @Test public void testErrorDuplicateMaxSize() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," - + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type':'external','maxSize':'128M','queue':'q1'}]"; ++ String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," ++ + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," ++ + "{'name':'large','type': 'internal','maxSize':'128M','numThreads':3}]"; - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','maxSize':'128M','numThreads':2", - "'type': 'external','maxSize':'128M','group':'q1'"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all, Set<CompactableFile> files) { return new CompactionPlanImpl.BuilderImpl(kind, all, all) - .addJob((short) all.size(), CompactionExecutorIdImpl.externalId("small"), files).build() - .getJobs().iterator().next(); + .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, "small"), files) + .build().getJobs().iterator().next(); } - private static Set<CompactableFile> createCFs(String... namesSizePairs) - throws URISyntaxException { + private static CompactableFile createCF(String name, long size) { + try { + return CompactableFile + .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static Set<CompactableFile> createCFs(String... namesSizePairs) { Set<CompactableFile> files = new HashSet<>(); for (int i = 0; i < namesSizePairs.length; i += 2) { @@@ -798,80 -537,47 +704,39 @@@ }; } - private static DefaultCompactionPlanner createPlanner(boolean withHugeExecutor) { - DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + private static CompactionPlanner.InitParameters getInitParamQueues(Configuration conf, + String queues) { + - String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen"); + Map<String,String> options = new HashMap<>(); + options.put("queues", queues.replaceAll("'", "\"")); - - if (maxOpen != null) { - options.put("maxOpen", maxOpen); - } ++ options.put("maxOpen", "15"); ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(senv); - EasyMock.replay(conf, senv); - - StringBuilder execBldr = - new StringBuilder("[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}"); - - if (withHugeExecutor) { - execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]"); - } else { - execBldr.append("]"); - } + return new CompactionPlannerInitParams(csid, prefix, options, senv); + } - String executors = execBldr.toString().replaceAll("'", "\""); + private static CompactionPlanner.InitParameters getInitParams(Configuration conf, + String executors) { - planner.init(new CompactionPlanner.InitParameters() { - String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen"); + Map<String,String> options = new HashMap<>(); + options.put("executors", executors.replaceAll("'", "\"")); - - if (maxOpen != null) { - options.put("maxOpen", maxOpen); - } ++ options.put("maxOpen", "15"); - @Override - public ServiceEnvironment getServiceEnvironment() { - return senv; - } - - @Override - public Map<String,String> getOptions() { - return Map.of("executors", executors, "maxOpen", "15"); - } + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(senv); - @Override - public String getFullyQualifiedOption(String key) { - assertEquals("maxOpen", key); - return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; - } + return new CompactionPlannerInitParams(csid, prefix, options, senv); + } - @Override - public ExecutorManager getExecutorManager() { - return new ExecutorManager() { - @Override - public CompactionExecutorId createExecutor(String name, int threads) { - switch (name) { - case "small": - assertEquals(1, threads); - break; - case "medium": - assertEquals(2, threads); - break; - case "large": - assertEquals(3, threads); - break; - case "huge": - assertEquals(4, threads); - break; - default: - fail("Unexpected name " + name); - break; - } - return CompactionExecutorIdImpl.externalId(name); - } - - @Override - public CompactionExecutorId getExternalExecutor(String name) { - throw new UnsupportedOperationException(); - } - }; - } - }); + private static DefaultCompactionPlanner createPlanner(Configuration conf, String executors) { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + var initParams = getInitParams(conf, executors); + planner.init(initParams); return planner; } }