This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new fdd73fb8b2 Fix bug in compaction props (#4117) fdd73fb8b2 is described below commit fdd73fb8b27c5e49f91f0c1008457658a588a411 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Wed Dec 27 14:38:10 2023 -0500 Fix bug in compaction props (#4117) This fixes a bug in the compaction properties to ensure the replacement property is always preferred over the deprecated open.max compaction property when it is set. Add tests for maxOpen to override open.max: * Adds a test to ensure that setting the maxOpen option for a compaction service will override the deprecated `open.max` property if set * Condenses helper methods in planner tests * Uses CompactionPlannerInitParams for tests instead of custom test code * Adds test case for default compaction service used with deprecated property * Removes hardcoded maxOpen value with reference to default property value * Modifies the getFullyQualifiedOption to return the correct path for the `<service>.planner.opts.` properties This is a reapplication of #4092 after it was reverted, to use SiteConfiguration for testing overrides rather than modifications to ConfigurationCopy Changes made by ctubbsii that diverge from #4092: * Update commit log message to add detail and format it * Omit changes to ConfigurationCopy, including changes in #4112, which is now OBE, to add a parent to preserve its role as a simple "flat" configuration object for testing and simple operations * Use SiteConfiguration with overrides, instead of ConfigurationCopy with a parent, to test override behavior for DefaultCompactionPlannerTest Co-authored-by: Daniel Roberts <ddani...@gmail.com> --- .../spi/compaction/DefaultCompactionPlanner.java | 3 +- .../compaction/CompactionPlannerInitParams.java | 2 +- .../compaction/DefaultCompactionPlannerTest.java | 374 +++++++++++---------- 3 files changed, 204 insertions(+), 175 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index be8e25299b..9385806831 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -221,7 +221,8 @@ public class DefaultCompactionPlanner implements CompactionPlanner { this.maxFilesToCompact = Integer.parseInt(params.getServiceEnvironment().getConfiguration() .get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey())); } else { - this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", "10")); + this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", + Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue())); } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java index 0f79ce4df0..eb287153d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java @@ -60,7 +60,7 @@ public class CompactionPlannerInitParams implements CompactionPlanner.InitParame @Override public String getFullyQualifiedOption(String key) { - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key; + return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".planner.opts." + key; } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index 568b57cd2d..ab2003841b 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@ -22,27 +22,35 @@ import static com.google.common.collect.MoreCollectors.onlyElement; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration; import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder; +import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultCompactionPlannerTest { @@ -50,6 +58,12 @@ public class DefaultCompactionPlannerTest { return c.stream().collect(onlyElement()); } + private static final Configuration defaultConf = + new ConfigurationImpl(DefaultConfiguration.getInstance()); + private static final CompactionServiceId csid = CompactionServiceId.of("cs1"); + + private static final Logger log = LoggerFactory.getLogger(DefaultCompactionPlannerTest.class); + @Test public void testFindFilesToCompact() { @@ -130,7 +144,13 @@ public class DefaultCompactionPlannerTest { @Test public void testRunningCompaction() { - var planner = createPlanner(true); + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," + + "{'name':'huge','type': 'internal','numThreads':4}]"; + + var planner = createPlanner(defaultConf, executors); + var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", "F5", "13M"); var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M"); var compacting = @@ -152,12 +172,138 @@ public class DefaultCompactionPlannerTest { // planner should compact. var job = getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); + } + + /** + * Tests that the maxOpen property overrides the deprecated open.max property with the default + * service + */ + @Test + @SuppressWarnings("removal") + public void testOverrideMaxOpenDefaultService() { + Map<String,String> overrides = new HashMap<>(); + // Set old property and use that for max open files. + overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17"); + SiteConfiguration aconf = SiteConfiguration.empty().withOverrides(overrides).build(); + ConfigurationImpl config = new ConfigurationImpl(aconf); + + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(senv); + + // Use the CompactionServicesConfig to create options based on default property values + var compactionServices = new CompactionServicesConfig(aconf, log::warn); + var options = compactionServices.getOptions().get("default"); + + var initParams = + new CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv); + + var planner = new DefaultCompactionPlanner(); + planner.init(initParams); + + var all = createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", "F5", "14M", "F6", + "15M", "F7", "16M", "F8", "17M", "F9", "18M", "FA", "19M", "FB", "20M", "FC", "21M", "FD", + "22M", "FE", "23M", "FF", "24M", "FG", "25M", "FH", "26M"); + Set<CompactionJob> compacting = Set.of(); + var params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); + var plan = planner.makePlan(params); + var job = getOnlyElement(plan.getJobs()); + assertEquals(all, job.getFiles()); + assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"), "large"), + job.getExecutor()); + + overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "5"); + aconf = SiteConfiguration.empty().withOverrides(overrides).build(); + config = new ConfigurationImpl(aconf); + senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(senv); + + // Create new initParams so executor IDs can be reused + initParams = new CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv); + planner = new DefaultCompactionPlanner(); + planner.init(initParams); + + params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", "F5", "14M"), + job.getFiles()); + assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"), "medium"), + job.getExecutor()); + } + + /** + * Tests that the maxOpen property overrides the deprecated open.max property + */ + @Test + @SuppressWarnings("removal") + public void testOverrideMaxOpen() { + Map<String,String> overrides = new HashMap<>(); + // Set old property and use that for max open files. + overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17"); + SiteConfiguration aconf = SiteConfiguration.empty().withOverrides(overrides).build(); + ConfigurationImpl config = new ConfigurationImpl(aconf); + + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," + + "{'name':'huge','type': 'internal','numThreads':4}]"; + + var planner = createPlanner(config, executors); + var all = createCFs("F1", "1M", "F2", "2M", "F3", "4M", "F4", "8M", "F5", "16M", "F6", "32M", + "F7", "64M", "F8", "128M", "F9", "256M", "FA", "512M", "FB", "1G", "FC", "2G", "FD", "4G", + "FE", "8G", "FF", "16G", "FG", "32G", "FH", "64G"); + Set<CompactionJob> compacting = Set.of(); + var params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); + var plan = planner.makePlan(params); + var job = getOnlyElement(plan.getJobs()); + assertEquals(all, job.getFiles()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), job.getExecutor()); + + // Set new property that overrides the old property. + overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", + "15"); + aconf = SiteConfiguration.empty().withOverrides(overrides).build(); + config = new ConfigurationImpl(aconf); + planner = createPlanner(config, executors); + params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + + // 17 files that do not meet the compaction ratio. When max files to compact is 15, + // the plan should do 3 files then 15 + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); + + overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", + "5"); + aconf = SiteConfiguration.empty().withOverrides(overrides).build(); + // 17 files that do not meet the compaction ratio. When max files to compact is 5 should do 5, + // files then another 5, then the final 5. + config = new ConfigurationImpl(aconf); + planner = createPlanner(config, executors); + params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs("F4", "8M", "F3", "4M", "F2", "2M", "F1", "1M", "F5", "16M"), + job.getFiles()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); } @Test public void testUserCompaction() { - var planner = createPlanner(true); + ConfigurationCopy aconf = new ConfigurationCopy(DefaultConfiguration.getInstance()); + aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "15"); + ConfigurationImpl config = new ConfigurationImpl(aconf); + + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," + + "{'name':'huge','type': 'internal','numThreads':4}]"; + + var planner = createPlanner(config, executors); var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", "F5", "13M"); var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M"); var compacting = @@ -168,7 +314,7 @@ public class DefaultCompactionPlannerTest { // a running non-user compaction should not prevent a user compaction var job = getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); // should only run one user compaction at a time compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "3M", "F2", "3M"))); @@ -186,7 +332,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); // should compact all 15 all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", "64M", "F8", "128M", @@ -196,7 +342,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("huge"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), job.getExecutor()); // For user compaction, can compact a subset that meets the compaction ratio if there is also a // larger set of files the meets the compaction ratio @@ -206,7 +352,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); // There is a subset of small files that meets the compaction ratio, but the larger set does not // so compact everything to avoid doing more than logarithmic work @@ -215,13 +361,17 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); } @Test public void testMaxSize() { - var planner = createPlanner(false); + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; + + var planner = createPlanner(defaultConf, executors); var all = createCFs("F1", "128M", "F2", "129M", "F3", "130M", "F4", "131M", "F5", "132M"); var params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); var plan = planner.makePlan(params); @@ -229,14 +379,14 @@ public class DefaultCompactionPlannerTest { // should only compact files less than max size var job = getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), job.getExecutor()); // user compaction can exceed the max size params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER); plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), job.getExecutor()); } /** @@ -245,18 +395,12 @@ public class DefaultCompactionPlannerTest { @Test public void testErrorInternalTypeNoNumThreads() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); + String executors = "[{'name':'small','type':'internal','maxSize':'32M'}," + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type':'internal','maxSize':'512M','numThreads':3}]"; - String executors = getExecutors("'type': 'internal','maxSize':'32M'", - "'type': 'internal','maxSize':'128M','numThreads':2", - "'type': 'internal','maxSize':'512M','numThreads':3"); var e = assertThrows(NullPointerException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("numThreads"), "Error message didn't contain numThreads"); } @@ -266,18 +410,12 @@ public class DefaultCompactionPlannerTest { @Test public void testErrorExternalTypeNumThreads() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); + String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type':'external','maxSize':'512M','numThreads':3}]"; - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','maxSize':'128M','numThreads':2", - "'type': 'external','maxSize':'512M','numThreads':3"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("numThreads"), "Error message didn't contain numThreads"); } @@ -287,18 +425,12 @@ public class DefaultCompactionPlannerTest { @Test public void testErrorExternalNoQueue() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type':'external','maxSize':'512M'}]"; - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','maxSize':'128M','numThreads':2", - "'type': 'external','maxSize':'512M'"); var e = assertThrows(NullPointerException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("queue"), "Error message didn't contain queue"); } @@ -308,17 +440,12 @@ public class DefaultCompactionPlannerTest { @Test public void testErrorOnlyOneMaxSize() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," + + "{'name':'medium','type':'internal','numThreads':2}," + + "{'name':'large','type':'external','queue':'q1'}]"; - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','numThreads':2", "'type': 'external','queue':'q1'"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } @@ -328,69 +455,20 @@ public class DefaultCompactionPlannerTest { @Test public void testErrorDuplicateMaxSize() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type':'external','maxSize':'128M','queue':'q1'}]"; - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','maxSize':'128M','numThreads':2", - "'type': 'external','maxSize':'128M','queue':'q1'"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } - private CompactionPlanner.InitParameters getInitParams(ServiceEnvironment senv, - String executors) { - return new CompactionPlanner.InitParameters() { - - @Override - public ServiceEnvironment getServiceEnvironment() { - return senv; - } - - @Override - public Map<String,String> getOptions() { - return Map.of("executors", executors, "maxOpen", "15"); - } - - @Override - public String getFullyQualifiedOption(String key) { - assertEquals("maxOpen", key); - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; - } - - @Override - public ExecutorManager getExecutorManager() { - return new ExecutorManager() { - @Override - public CompactionExecutorId createExecutor(String name, int threads) { - return CompactionExecutorIdImpl.externalId(name); - } - - @Override - public CompactionExecutorId getExternalExecutor(String name) { - return CompactionExecutorIdImpl.externalId(name); - } - }; - } - }; - } - - private String getExecutors(String small, String medium, String large) { - String execBldr = "[{'name':'small'," + small + "},{'name':'medium'," + medium + "}," - + "{'name':'large'," + large + "}]"; - return execBldr.replaceAll("'", "\""); - } - private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all, Set<CompactableFile> files) { return new CompactionPlanImpl.BuilderImpl(kind, all, all) - .addJob((short) all.size(), CompactionExecutorIdImpl.externalId("small"), files).build() - .getJobs().iterator().next(); + .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, "small"), files) + .build().getJobs().iterator().next(); } private static Set<CompactableFile> createCFs(String... namesSizePairs) { @@ -486,80 +564,30 @@ public class DefaultCompactionPlannerTest { }; } - private static DefaultCompactionPlanner createPlanner(boolean withHugeExecutor) { - DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - - EasyMock.replay(conf, senv); + private static CompactionPlanner.InitParameters getInitParams(Configuration conf, + String executors) { - StringBuilder execBldr = - new StringBuilder("[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}"); + String maxOpen = + conf.get(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen"); + Map<String,String> options = new HashMap<>(); + options.put("executors", executors.replaceAll("'", "\"")); - if (withHugeExecutor) { - execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]"); - } else { - execBldr.append("]"); + if (maxOpen != null) { + options.put("maxOpen", maxOpen); } - String executors = execBldr.toString().replaceAll("'", "\""); - - planner.init(new CompactionPlanner.InitParameters() { - - @Override - public ServiceEnvironment getServiceEnvironment() { - return senv; - } - - @Override - public Map<String,String> getOptions() { - return Map.of("executors", executors, "maxOpen", "15"); - } + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(senv); - @Override - public String getFullyQualifiedOption(String key) { - assertEquals("maxOpen", key); - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; - } + return new CompactionPlannerInitParams(csid, options, senv); + } - @Override - public ExecutorManager getExecutorManager() { - return new ExecutorManager() { - @Override - public CompactionExecutorId createExecutor(String name, int threads) { - switch (name) { - case "small": - assertEquals(1, threads); - break; - case "medium": - assertEquals(2, threads); - break; - case "large": - assertEquals(3, threads); - break; - case "huge": - assertEquals(4, threads); - break; - default: - fail("Unexpected name " + name); - break; - } - return CompactionExecutorIdImpl.externalId(name); - } - - @Override - public CompactionExecutorId getExternalExecutor(String name) { - throw new UnsupportedOperationException(); - } - }; - } - }); + private static DefaultCompactionPlanner createPlanner(Configuration conf, String executors) { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + var initParams = getInitParams(conf, executors); + planner.init(initParams); return planner; } }