This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 1bf7dcb184cb2c2ed9e9a4768370a7980a6c22bd
Merge: 8960452814 b8869fb72e
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Dec 27 23:37:54 2023 +0000

    Merge branch 'main' into elasticity

 .../spi/compaction/DefaultCompactionPlanner.java   |   6 +-
 .../compaction/CompactionPlannerInitParams.java    |   2 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 365 +++++++--------------
 3 files changed, 120 insertions(+), 253 deletions(-)

diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index e25b7c519b,bafda93e3e..6447a2d147
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@@ -29,15 -27,16 +28,18 @@@ import static org.junit.jupiter.api.Ass
  import java.net.URI;
  import java.net.URISyntaxException;
  import java.util.Collection;
 +import java.util.Collections;
+ import java.util.HashMap;
  import java.util.HashSet;
 +import java.util.List;
  import java.util.Map;
  import java.util.Set;
 -import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  
  import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+ import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+ import org.apache.accumulo.core.conf.DefaultConfiguration;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.spi.common.ServiceEnvironment;
@@@ -243,152 -268,9 +271,167 @@@ public class DefaultCompactionPlannerTe
      plan = planner.makePlan(params);
      job = getOnlyElement(plan.getJobs());
      assertEquals(all, job.getFiles());
-     assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
+     assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), 
job.getExecutor());
    }
  
 +  @Test
 +  public void testMultipleCompactions() {
 +    // This test validates that when a tablet has many files that multiple 
compaction jobs can be
 +    // issued at the same time.
++    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
++        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
++        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
++
 +    for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) {
-       var planner = createPlanner(false);
++      var planner = createPlanner(defaultConf, executors);
 +      var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet());
 +      // simulate 10 larger files, these should not compact at the same time 
as the smaller files.
 +      // Its more optimal to wait for all of the smaller files to compact and 
them compact the
 +      // output of compacting the smaller files with the larger files.
 +      IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 
20000)).forEach(all::add);
 +      var params = createPlanningParams(all, all, Set.of(), 2, kind);
 +      var plan = planner.makePlan(params);
 +
 +      // There are 990 smaller files to compact. Should produce 66 jobs of 15 
smaller files each.
 +      assertEquals(66, plan.getJobs().size());
 +      Set<CompactableFile> filesSeen = new HashSet<>();
 +      plan.getJobs().forEach(job -> {
 +        assertEquals(15, job.getFiles().size());
 +        assertEquals(kind, job.getKind());
-         assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
++        assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 +        // ensure the files across all of the jobs are disjoint
 +        job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
 +      });
 +
 +      // Ensure all of the smaller files are scheduled for compaction. Should 
not see any of the
 +      // larger files.
 +      assertEquals(IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet()),
 +          filesSeen);
 +    }
 +  }
 +
 +  @Test
 +  public void testMultipleCompactionsAndLargeCompactionRatio() {
-     var planner = createPlanner(false);
++
++    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
++        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
++        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
++    var planner = createPlanner(defaultConf, executors);
 +    var all = IntStream.range(0, 65).mapToObj(i -> createCF("F" + i, i + 
1)).collect(toSet());
 +    // This compaction ratio would not cause a system compaction, how a user 
compaction must compact
 +    // all of the files so it should generate some compactions.
 +    var params = createPlanningParams(all, all, Set.of(), 100, 
CompactionKind.USER);
 +    var plan = planner.makePlan(params);
 +
 +    assertEquals(3, plan.getJobs().size());
 +
 +    var iterator = plan.getJobs().iterator();
 +    var job1 = iterator.next();
 +    var job2 = iterator.next();
 +    var job3 = iterator.next();
 +    assertTrue(Collections.disjoint(job1.getFiles(), job2.getFiles()));
 +    assertTrue(Collections.disjoint(job1.getFiles(), job3.getFiles()));
 +    assertTrue(Collections.disjoint(job2.getFiles(), job3.getFiles()));
 +
 +    for (var job : plan.getJobs()) {
 +      assertEquals(15, job.getFiles().size());
 +      assertEquals(CompactionKind.USER, job.getKind());
 +      assertTrue(all.containsAll(job.getFiles()));
 +      // Should select three sets of files that are from the smallest 45 
files.
 +      
assertTrue(job.getFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum()
 +          <= IntStream.range(1, 46).sum());
 +    }
 +  }
 +
 +  @Test
 +  public void testMultipleCompactionsAndRunningCompactions() {
 +    // This test validates that when a tablet has many files that multiple 
compaction jobs can be
 +    // issued at the same time even if there are running compaction as long 
everything meets the
 +    // compaction ratio.
++    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
++        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
++        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
 +    for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) {
-       var planner = createPlanner(false);
++      var planner = createPlanner(defaultConf, executors);
 +      var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet());
 +      // simulate 10 larger files, these should not compact at the same time 
as the smaller files.
 +      // Its more optimal to wait for all of the smaller files to compact and 
them compact the
 +      // output of compacting the smaller files with the larger files.
 +      IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 
20000)).forEach(all::add);
 +      // 30 files are compacting, so they will not be in the candidate set.
 +      var candidates =
 +          IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet());
 +      // create two jobs covering the first 30 files
 +      var job1 = createJob(kind, all,
 +          IntStream.range(0, 15).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet()));
 +      var job2 = createJob(kind, all,
 +          IntStream.range(15, 30).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet()));
 +      var params = createPlanningParams(all, candidates, Set.of(job1, job2), 
2, kind);
 +      var plan = planner.makePlan(params);
 +
 +      // There are 990 smaller files to compact. Should produce 66 jobs of 15 
smaller files each.
 +      assertEquals(64, plan.getJobs().size());
 +      Set<CompactableFile> filesSeen = new HashSet<>();
 +      plan.getJobs().forEach(job -> {
 +        assertEquals(15, job.getFiles().size());
 +        assertEquals(kind, job.getKind());
-         assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
++        assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 +        // ensure the files across all of the jobs are disjoint
 +        job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
 +      });
 +
 +      // Ensure all of the smaller files are scheduled for compaction. Should 
not see any of the
 +      // larger files.
 +      assertEquals(IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 
1000)).collect(toSet()),
 +          filesSeen);
 +    }
 +  }
 +
 +  @Test
 +  public void testUserCompactionDoesNotWaitOnSystemCompaction() {
 +    // this test ensure user compactions do not wait on system compactions to 
complete
-     var planner = createPlanner(true);
++
++    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
++        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
++        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
++    var planner = createPlanner(defaultConf, executors);
 +    var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "3M", "F5", 
"3M", "F6", "3M",
 +        "F7", "20M");
 +    var candidates = createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", 
"20M");
 +    var compacting = Set
 +        .of(createJob(CompactionKind.SYSTEM, all, createCFs("F1", "1M", "F2", 
"1M", "F3", "1M")));
 +    var params = createPlanningParams(all, candidates, compacting, 2, 
CompactionKind.SYSTEM);
 +    var plan = planner.makePlan(params);
 +    // The planning of the system compaction should find its most optimal to 
wait on the running
 +    // system compaction and emit zero jobs.
 +    assertEquals(0, plan.getJobs().size());
 +
 +    params = createPlanningParams(all, candidates, compacting, 2, 
CompactionKind.USER);
 +    plan = planner.makePlan(params);
 +    // The planning of user compaction should not take the running system 
compaction into
 +    // consideration and should create a compaction job.
 +    assertEquals(1, plan.getJobs().size());
 +    assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"),
 +        getOnlyElement(plan.getJobs()).getFiles());
 +
 +    // Reverse the situation and turn the running compaction into a user 
compaction
 +    compacting =
 +        Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "1M", 
"F2", "1M", "F3", "1M")));
 +    params = createPlanningParams(all, candidates, compacting, 2, 
CompactionKind.SYSTEM);
 +    plan = planner.makePlan(params);
 +    // The planning of a system compaction should not take the running user 
compaction into account
 +    // and should emit a job
 +    assertEquals(1, plan.getJobs().size());
 +    assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M"),
 +        getOnlyElement(plan.getJobs()).getFiles());
 +
 +    params = createPlanningParams(all, candidates, compacting, 2, 
CompactionKind.USER);
 +    plan = planner.makePlan(params);
 +    // The planning of the user compaction should decide the most optimal 
thing to do is to wait on
 +    // the running user compaction and should not emit any jobs.
 +    assertEquals(0, plan.getJobs().size());
 +  }
 +
    @Test
    public void testQueueCreation() throws Exception {
      DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
@@@ -505,19 -362,13 +523,13 @@@
    @Test
    public void testErrorExternalNoQueue() {
      DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
+     String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+         + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type':'external','maxSize':'512M'}]";
  
-     String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-         "'type': 'internal','maxSize':'128M','numThreads':2",
-         "'type': 'external','maxSize':'512M'");
      var e = assertThrows(NullPointerException.class,
-         () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+         () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
 -    assertTrue(e.getMessage().contains("queue"), "Error message didn't 
contain queue");
 +    assertTrue(e.getMessage().contains("group"), "Error message didn't 
contain group");
    }
  
    /**
@@@ -579,17 -416,12 +577,11 @@@
    @Test
    public void testErrorOnlyOneMaxSize() {
      DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
- 
-     String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-         "'type': 'internal','numThreads':2", "'type': 
'external','group':'q1'");
+     String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+         + "{'name':'medium','type':'internal','numThreads':2},"
 -        + "{'name':'large','type':'external','queue':'q1'}]";
 -
++        + "{'name':'large','type':'internal','numThreads':'3'}]";
      var e = assertThrows(IllegalArgumentException.class,
-         () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+         () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
      assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
    }
  
@@@ -599,118 -431,24 +591,32 @@@
    @Test
    public void testErrorDuplicateMaxSize() {
      DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
 -    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
 -        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
 -        + "{'name':'large','type':'external','maxSize':'128M','queue':'q1'}]";
++    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
++        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
++        + "{'name':'large','type': 
'internal','maxSize':'128M','numThreads':3}]";
  
-     String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-         "'type': 'internal','maxSize':'128M','numThreads':2",
-         "'type': 'external','maxSize':'128M','group':'q1'");
      var e = assertThrows(IllegalArgumentException.class,
-         () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+         () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
      assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
    }
  
    private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
        Set<CompactableFile> files) {
      return new CompactionPlanImpl.BuilderImpl(kind, all, all)
-         .addJob((short) all.size(), 
CompactionExecutorIdImpl.externalId("small"), files).build()
-         .getJobs().iterator().next();
+         .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, 
"small"), files)
+         .build().getJobs().iterator().next();
    }
  
 -  private static Set<CompactableFile> createCFs(String... namesSizePairs)
 -      throws URISyntaxException {
 +  private static CompactableFile createCF(String name, long size) {
 +    try {
 +      return CompactableFile
 +          .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name 
+ ".rf"), size, 0);
 +    } catch (URISyntaxException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  private static Set<CompactableFile> createCFs(String... namesSizePairs) {
      Set<CompactableFile> files = new HashSet<>();
  
      for (int i = 0; i < namesSizePairs.length; i += 2) {
@@@ -798,80 -537,47 +704,39 @@@
      };
    }
  
-   private static DefaultCompactionPlanner createPlanner(boolean 
withHugeExecutor) {
-     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+   private static CompactionPlanner.InitParameters 
getInitParamQueues(Configuration conf,
+       String queues) {
+ 
 -    String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen");
+     Map<String,String> options = new HashMap<>();
+     options.put("queues", queues.replaceAll("'", "\""));
 -
 -    if (maxOpen != null) {
 -      options.put("maxOpen", maxOpen);
 -    }
++    options.put("maxOpen", "15");
  
      ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
      EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+     EasyMock.replay(senv);
  
-     EasyMock.replay(conf, senv);
- 
-     StringBuilder execBldr =
-         new StringBuilder("[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
-             + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
-             + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}");
- 
-     if (withHugeExecutor) {
-       execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]");
-     } else {
-       execBldr.append("]");
-     }
+     return new CompactionPlannerInitParams(csid, prefix, options, senv);
+   }
  
-     String executors = execBldr.toString().replaceAll("'", "\"");
+   private static CompactionPlanner.InitParameters getInitParams(Configuration 
conf,
+       String executors) {
  
-     planner.init(new CompactionPlanner.InitParameters() {
 -    String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen");
+     Map<String,String> options = new HashMap<>();
+     options.put("executors", executors.replaceAll("'", "\""));
 -
 -    if (maxOpen != null) {
 -      options.put("maxOpen", maxOpen);
 -    }
++    options.put("maxOpen", "15");
  
-       @Override
-       public ServiceEnvironment getServiceEnvironment() {
-         return senv;
-       }
- 
-       @Override
-       public Map<String,String> getOptions() {
-         return Map.of("executors", executors, "maxOpen", "15");
-       }
+     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+     EasyMock.replay(senv);
  
-       @Override
-       public String getFullyQualifiedOption(String key) {
-         assertEquals("maxOpen", key);
-         return Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts." + key;
-       }
+     return new CompactionPlannerInitParams(csid, prefix, options, senv);
+   }
  
-       @Override
-       public ExecutorManager getExecutorManager() {
-         return new ExecutorManager() {
-           @Override
-           public CompactionExecutorId createExecutor(String name, int 
threads) {
-             switch (name) {
-               case "small":
-                 assertEquals(1, threads);
-                 break;
-               case "medium":
-                 assertEquals(2, threads);
-                 break;
-               case "large":
-                 assertEquals(3, threads);
-                 break;
-               case "huge":
-                 assertEquals(4, threads);
-                 break;
-               default:
-                 fail("Unexpected name " + name);
-                 break;
-             }
-             return CompactionExecutorIdImpl.externalId(name);
-           }
- 
-           @Override
-           public CompactionExecutorId getExternalExecutor(String name) {
-             throw new UnsupportedOperationException();
-           }
-         };
-       }
-     });
+   private static DefaultCompactionPlanner createPlanner(Configuration conf, 
String executors) {
+     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+     var initParams = getInitParams(conf, executors);
  
+     planner.init(initParams);
      return planner;
    }
  }

Reply via email to