This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new fdd73fb8b2 Fix bug in compaction props (#4117)
fdd73fb8b2 is described below

commit fdd73fb8b27c5e49f91f0c1008457658a588a411
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Wed Dec 27 14:38:10 2023 -0500

    Fix bug in compaction props (#4117)
    
    This fixes a bug in the compaction properties to ensure the replacement
    property is always preferred over the deprecated open.max compaction
    property when it is set.
    
    Add tests for maxOpen to override open.max:
    * Adds a test to ensure that setting the maxOpen option for a compaction
      service will override the deprecated `open.max` property if set
    * Condenses helper methods in planner tests
    * Uses CompactionPlannerInitParams for tests instead of custom test code
    * Adds test case for default compaction service used with deprecated
      property
    * Removes hardcoded maxOpen value with reference to default property
      value
    * Modifies the getFullyQualifiedOption to return the correct path for
      the `<service>.planner.opts.` properties
    
    This is a reapplication of #4092 after it was reverted, to use
    SiteConfiguration for testing overrides rather than modifications to
    ConfigurationCopy
    
    Changes made by ctubbsii that diverge from #4092:
    * Update commit log message to add detail and format it
    * Omit changes to ConfigurationCopy, including changes in #4112, which
      is now OBE, to add a parent to preserve its role as a simple "flat"
      configuration object for testing and simple operations
    * Use SiteConfiguration with overrides, instead of ConfigurationCopy
      with a parent, to test override behavior for
      DefaultCompactionPlannerTest
    
    Co-authored-by: Daniel Roberts <ddani...@gmail.com>
---
 .../spi/compaction/DefaultCompactionPlanner.java   |   3 +-
 .../compaction/CompactionPlannerInitParams.java    |   2 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 374 +++++++++++----------
 3 files changed, 204 insertions(+), 175 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index be8e25299b..9385806831 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -221,7 +221,8 @@ public class DefaultCompactionPlanner implements 
CompactionPlanner {
       this.maxFilesToCompact = 
Integer.parseInt(params.getServiceEnvironment().getConfiguration()
           .get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()));
     } else {
-      this.maxFilesToCompact = 
Integer.parseInt(params.getOptions().getOrDefault("maxOpen", "10"));
+      this.maxFilesToCompact = 
Integer.parseInt(params.getOptions().getOrDefault("maxOpen",
+          
Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue()));
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
index 0f79ce4df0..eb287153d6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
@@ -60,7 +60,7 @@ public class CompactionPlannerInitParams implements 
CompactionPlanner.InitParame
 
   @Override
   public String getFullyQualifiedOption(String key) {
-    return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + 
".opts." + key;
+    return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + 
".planner.opts." + key;
   }
 
   @Override
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 568b57cd2d..ab2003841b 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -22,27 +22,35 @@ import static 
com.google.common.collect.MoreCollectors.onlyElement;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration;
 import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder;
+import org.apache.accumulo.core.util.ConfigurationImpl;
 import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 import org.easymock.EasyMock;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DefaultCompactionPlannerTest {
 
@@ -50,6 +58,12 @@ public class DefaultCompactionPlannerTest {
     return c.stream().collect(onlyElement());
   }
 
+  private static final Configuration defaultConf =
+      new ConfigurationImpl(DefaultConfiguration.getInstance());
+  private static final CompactionServiceId csid = 
CompactionServiceId.of("cs1");
+
+  private static final Logger log = 
LoggerFactory.getLogger(DefaultCompactionPlannerTest.class);
+
   @Test
   public void testFindFilesToCompact() {
 
@@ -130,7 +144,13 @@ public class DefaultCompactionPlannerTest {
 
   @Test
   public void testRunningCompaction() {
-    var planner = createPlanner(true);
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+        + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+    var planner = createPlanner(defaultConf, executors);
+
     var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", 
"F5", "13M");
     var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M");
     var compacting =
@@ -152,12 +172,138 @@ public class DefaultCompactionPlannerTest {
     // planner should compact.
     var job = getOnlyElement(plan.getJobs());
     assertEquals(candidates, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
+  }
+
+  /**
+   * Tests that the maxOpen property overrides the deprecated open.max 
property with the default
+   * service
+   */
+  @Test
+  @SuppressWarnings("removal")
+  public void testOverrideMaxOpenDefaultService() {
+    Map<String,String> overrides = new HashMap<>();
+    // Set old property and use that for max open files.
+    overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17");
+    SiteConfiguration aconf = 
SiteConfiguration.empty().withOverrides(overrides).build();
+    ConfigurationImpl config = new ConfigurationImpl(aconf);
+
+    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes();
+    EasyMock.replay(senv);
+
+    // Use the CompactionServicesConfig to create options based on default 
property values
+    var compactionServices = new CompactionServicesConfig(aconf, log::warn);
+    var options = compactionServices.getOptions().get("default");
+
+    var initParams =
+        new CompactionPlannerInitParams(CompactionServiceId.of("default"), 
options, senv);
+
+    var planner = new DefaultCompactionPlanner();
+    planner.init(initParams);
+
+    var all = createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", 
"F5", "14M", "F6",
+        "15M", "F7", "16M", "F8", "17M", "F9", "18M", "FA", "19M", "FB", 
"20M", "FC", "21M", "FD",
+        "22M", "FE", "23M", "FF", "24M", "FG", "25M", "FH", "26M");
+    Set<CompactionJob> compacting = Set.of();
+    var params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    var plan = planner.makePlan(params);
+    var job = getOnlyElement(plan.getJobs());
+    assertEquals(all, job.getFiles());
+    
assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"),
 "large"),
+        job.getExecutor());
+
+    overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "5");
+    aconf = SiteConfiguration.empty().withOverrides(overrides).build();
+    config = new ConfigurationImpl(aconf);
+    senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes();
+    EasyMock.replay(senv);
+
+    // Create new initParams so executor IDs can be reused
+    initParams = new 
CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv);
+    planner = new DefaultCompactionPlanner();
+    planner.init(initParams);
+
+    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    plan = planner.makePlan(params);
+    job = getOnlyElement(plan.getJobs());
+    assertEquals(createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", 
"F5", "14M"),
+        job.getFiles());
+    
assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"),
 "medium"),
+        job.getExecutor());
+  }
+
+  /**
+   * Tests that the maxOpen property overrides the deprecated open.max property
+   */
+  @Test
+  @SuppressWarnings("removal")
+  public void testOverrideMaxOpen() {
+    Map<String,String> overrides = new HashMap<>();
+    // Set old property and use that for max open files.
+    overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17");
+    SiteConfiguration aconf = 
SiteConfiguration.empty().withOverrides(overrides).build();
+    ConfigurationImpl config = new ConfigurationImpl(aconf);
+
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+        + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+    var planner = createPlanner(config, executors);
+    var all = createCFs("F1", "1M", "F2", "2M", "F3", "4M", "F4", "8M", "F5", 
"16M", "F6", "32M",
+        "F7", "64M", "F8", "128M", "F9", "256M", "FA", "512M", "FB", "1G", 
"FC", "2G", "FD", "4G",
+        "FE", "8G", "FF", "16G", "FG", "32G", "FH", "64G");
+    Set<CompactionJob> compacting = Set.of();
+    var params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    var plan = planner.makePlan(params);
+    var job = getOnlyElement(plan.getJobs());
+    assertEquals(all, job.getFiles());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), 
job.getExecutor());
+
+    // Set new property that overrides the old property.
+    overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen",
+        "15");
+    aconf = SiteConfiguration.empty().withOverrides(overrides).build();
+    config = new ConfigurationImpl(aconf);
+    planner = createPlanner(config, executors);
+    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    plan = planner.makePlan(params);
+
+    // 17 files that do not meet the compaction ratio. When max files to 
compact is 15,
+    // the plan should do 3 files then 15
+    job = getOnlyElement(plan.getJobs());
+    assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), 
job.getFiles());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
+
+    overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen",
+        "5");
+    aconf = SiteConfiguration.empty().withOverrides(overrides).build();
+    // 17 files that do not meet the compaction ratio. When max files to 
compact is 5 should do 5,
+    // files then another 5, then the final 5.
+    config = new ConfigurationImpl(aconf);
+    planner = createPlanner(config, executors);
+    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    plan = planner.makePlan(params);
+    job = getOnlyElement(plan.getJobs());
+    assertEquals(createCFs("F4", "8M", "F3", "4M", "F2", "2M", "F1", "1M", 
"F5", "16M"),
+        job.getFiles());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
   }
 
   @Test
   public void testUserCompaction() {
-    var planner = createPlanner(true);
+    ConfigurationCopy aconf = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
+    aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "15");
+    ConfigurationImpl config = new ConfigurationImpl(aconf);
+
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+        + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+    var planner = createPlanner(config, executors);
     var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", 
"F5", "13M");
     var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M");
     var compacting =
@@ -168,7 +314,7 @@ public class DefaultCompactionPlannerTest {
     // a running non-user compaction should not prevent a user compaction
     var job = getOnlyElement(plan.getJobs());
     assertEquals(candidates, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
 
     // should only run one user compaction at a time
     compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", 
"3M", "F2", "3M")));
@@ -186,7 +332,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), 
job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 
     // should compact all 15
     all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", 
"64M", "F8", "128M",
@@ -196,7 +342,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("huge"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), 
job.getExecutor());
 
     // For user compaction, can compact a subset that meets the compaction 
ratio if there is also a
     // larger set of files the meets the compaction ratio
@@ -206,7 +352,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), 
job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 
     // There is a subset of small files that meets the compaction ratio, but 
the larger set does not
     // so compact everything to avoid doing more than logarithmic work
@@ -215,13 +361,17 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
 
   }
 
   @Test
   public void testMaxSize() {
-    var planner = createPlanner(false);
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
+
+    var planner = createPlanner(defaultConf, executors);
     var all = createCFs("F1", "128M", "F2", "129M", "F3", "130M", "F4", 
"131M", "F5", "132M");
     var params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
     var plan = planner.makePlan(params);
@@ -229,14 +379,14 @@ public class DefaultCompactionPlannerTest {
     // should only compact files less than max size
     var job = getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), 
job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), 
job.getExecutor());
 
     // user compaction can exceed the max size
     params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER);
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), 
job.getExecutor());
   }
 
   /**
@@ -245,18 +395,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorInternalTypeNoNumThreads() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
-
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M'},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + 
"{'name':'large','type':'internal','maxSize':'512M','numThreads':3}]";
 
-    String executors = getExecutors("'type': 'internal','maxSize':'32M'",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'internal','maxSize':'512M','numThreads':3");
     var e = assertThrows(NullPointerException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("numThreads"), "Error message didn't 
contain numThreads");
   }
 
@@ -266,18 +410,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorExternalTypeNumThreads() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
-
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + 
"{'name':'large','type':'external','maxSize':'512M','numThreads':3}]";
 
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'external','maxSize':'512M','numThreads':3");
     var e = assertThrows(IllegalArgumentException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("numThreads"), "Error message didn't 
contain numThreads");
   }
 
@@ -287,18 +425,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorExternalNoQueue() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type':'external','maxSize':'512M'}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'external','maxSize':'512M'");
     var e = assertThrows(NullPointerException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("queue"), "Error message didn't contain 
queue");
   }
 
@@ -308,17 +440,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorOnlyOneMaxSize() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + "{'name':'medium','type':'internal','numThreads':2},"
+        + "{'name':'large','type':'external','queue':'q1'}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','numThreads':2", "'type': 
'external','queue':'q1'");
     var e = assertThrows(IllegalArgumentException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
   }
 
@@ -328,69 +455,20 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorDuplicateMaxSize() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type':'external','maxSize':'128M','queue':'q1'}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'external','maxSize':'128M','queue':'q1'");
     var e = assertThrows(IllegalArgumentException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
   }
 
-  private CompactionPlanner.InitParameters getInitParams(ServiceEnvironment 
senv,
-      String executors) {
-    return new CompactionPlanner.InitParameters() {
-
-      @Override
-      public ServiceEnvironment getServiceEnvironment() {
-        return senv;
-      }
-
-      @Override
-      public Map<String,String> getOptions() {
-        return Map.of("executors", executors, "maxOpen", "15");
-      }
-
-      @Override
-      public String getFullyQualifiedOption(String key) {
-        assertEquals("maxOpen", key);
-        return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts." + key;
-      }
-
-      @Override
-      public ExecutorManager getExecutorManager() {
-        return new ExecutorManager() {
-          @Override
-          public CompactionExecutorId createExecutor(String name, int threads) 
{
-            return CompactionExecutorIdImpl.externalId(name);
-          }
-
-          @Override
-          public CompactionExecutorId getExternalExecutor(String name) {
-            return CompactionExecutorIdImpl.externalId(name);
-          }
-        };
-      }
-    };
-  }
-
-  private String getExecutors(String small, String medium, String large) {
-    String execBldr = "[{'name':'small'," + small + "},{'name':'medium'," + 
medium + "},"
-        + "{'name':'large'," + large + "}]";
-    return execBldr.replaceAll("'", "\"");
-  }
-
   private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
       Set<CompactableFile> files) {
     return new CompactionPlanImpl.BuilderImpl(kind, all, all)
-        .addJob((short) all.size(), 
CompactionExecutorIdImpl.externalId("small"), files).build()
-        .getJobs().iterator().next();
+        .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, 
"small"), files)
+        .build().getJobs().iterator().next();
   }
 
   private static Set<CompactableFile> createCFs(String... namesSizePairs) {
@@ -486,80 +564,30 @@ public class DefaultCompactionPlannerTest {
     };
   }
 
-  private static DefaultCompactionPlanner createPlanner(boolean 
withHugeExecutor) {
-    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
-
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-
-    EasyMock.replay(conf, senv);
+  private static CompactionPlanner.InitParameters getInitParams(Configuration 
conf,
+      String executors) {
 
-    StringBuilder execBldr =
-        new StringBuilder("[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
-            + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
-            + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}");
+    String maxOpen =
+        conf.get(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen");
+    Map<String,String> options = new HashMap<>();
+    options.put("executors", executors.replaceAll("'", "\""));
 
-    if (withHugeExecutor) {
-      execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]");
-    } else {
-      execBldr.append("]");
+    if (maxOpen != null) {
+      options.put("maxOpen", maxOpen);
     }
 
-    String executors = execBldr.toString().replaceAll("'", "\"");
-
-    planner.init(new CompactionPlanner.InitParameters() {
-
-      @Override
-      public ServiceEnvironment getServiceEnvironment() {
-        return senv;
-      }
-
-      @Override
-      public Map<String,String> getOptions() {
-        return Map.of("executors", executors, "maxOpen", "15");
-      }
+    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+    EasyMock.replay(senv);
 
-      @Override
-      public String getFullyQualifiedOption(String key) {
-        assertEquals("maxOpen", key);
-        return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts." + key;
-      }
+    return new CompactionPlannerInitParams(csid, options, senv);
+  }
 
-      @Override
-      public ExecutorManager getExecutorManager() {
-        return new ExecutorManager() {
-          @Override
-          public CompactionExecutorId createExecutor(String name, int threads) 
{
-            switch (name) {
-              case "small":
-                assertEquals(1, threads);
-                break;
-              case "medium":
-                assertEquals(2, threads);
-                break;
-              case "large":
-                assertEquals(3, threads);
-                break;
-              case "huge":
-                assertEquals(4, threads);
-                break;
-              default:
-                fail("Unexpected name " + name);
-                break;
-            }
-            return CompactionExecutorIdImpl.externalId(name);
-          }
-
-          @Override
-          public CompactionExecutorId getExternalExecutor(String name) {
-            throw new UnsupportedOperationException();
-          }
-        };
-      }
-    });
+  private static DefaultCompactionPlanner createPlanner(Configuration conf, 
String executors) {
+    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+    var initParams = getInitParams(conf, executors);
 
+    planner.init(initParams);
     return planner;
   }
 }

Reply via email to