This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 97b08d0b32 Makes DefaultCompactionPlanner emit multiple jobs (#3662) 97b08d0b32 is described below commit 97b08d0b32f0d173e27dcb1dd46dc6080bd2cdef Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jul 27 16:45:08 2023 -0400 Makes DefaultCompactionPlanner emit multiple jobs (#3662) The DefaultCompactionPlanner would only emit one compaction job per tablet. This commit changes it to be able to emit multiple compactions for a single tablet as long as doing so is optimal w.r.t. to the compaction ratio. --- .../spi/compaction/DefaultCompactionPlanner.java | 188 ++++++++++++--------- .../compaction/DefaultCompactionPlannerTest.java | 167 +++++++++++++++++- .../accumulo/manager/TabletGroupWatcher.java | 2 +- .../coordinator/CompactionCoordinator.java | 18 -- .../accumulo/test/functional/CompactionIT.java | 13 +- 5 files changed, 276 insertions(+), 112 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index a3c76ed4ce..e3f5f6535a 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -180,6 +180,22 @@ public class DefaultCompactionPlanner implements CompactionPlanner { } } + private static class FakeFileGenerator { + + private int count = 0; + + public CompactableFile create(long size) { + try { + count++; + return CompactableFile.create( + new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" + count + ".rf"), size, + 0); + } catch (URISyntaxException e) { + throw new IllegalStateException(e); + } + } + } + private List<Executor> executors; private int maxFilesToCompact; @@ -258,76 +274,93 @@ public class DefaultCompactionPlanner implements CompactionPlanner { Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates()); + FakeFileGenerator fakeFileGenerator = new FakeFileGenerator(); + long maxSizeToCompact = getMaxSizeToCompact(params.getKind()); - Collection<CompactableFile> group; - if (params.getRunningCompactions().isEmpty()) { - group = - findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + // This set represents future files that will be produced by running compactions. If the optimal + // set of files to compact is computed and contains one of these files, then its optimal to wait + // for this compaction to finish. + Set<CompactableFile> expectedFiles = new HashSet<>(); + params.getRunningCompactions().stream().filter(job -> job.getKind() == params.getKind()) + .map(job -> getExpected(job.getFiles(), fakeFileGenerator)) + .forEach(compactableFile -> Preconditions.checkState(expectedFiles.add(compactableFile))); + Preconditions.checkState(Collections.disjoint(expectedFiles, filesCopy)); + filesCopy.addAll(expectedFiles); - if (!group.isEmpty() && group.size() < params.getCandidates().size() - && params.getCandidates().size() <= maxFilesToCompact - && (params.getKind() == CompactionKind.USER - || params.getKind() == CompactionKind.SELECTOR)) { - // USER and SELECTOR compactions must eventually compact all files. When a subset of files - // that meets the compaction ratio is selected, look ahead and see if the next compaction - // would also meet the compaction ratio. If not then compact everything to avoid doing - // more than logarithmic work across multiple comapctions. - - filesCopy.removeAll(group); - filesCopy.add(getExpected(group, 0)); - - if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, - maxSizeToCompact).isEmpty()) { - // The next possible compaction does not meet the compaction ratio, so compact - // everything. - group = Set.copyOf(params.getCandidates()); - } + List<Collection<CompactableFile>> compactionJobs = new ArrayList<>(); + + while (true) { + var filesToCompact = + findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + if (!Collections.disjoint(filesToCompact, expectedFiles)) { + // the optimal set of files to compact includes the output of a running compaction, so lets + // wait for that running compaction to finish. + break; + } + if (filesToCompact.isEmpty()) { + break; } - } else if (params.getKind() == CompactionKind.SYSTEM) { - // This code determines if once the files compacting finish would they be included in a - // compaction with the files smaller than them? If so, then wait for the running compaction - // to complete. + filesCopy.removeAll(filesToCompact); + + // A compaction job will be created for these files, so lets add an expected file for that + // planned compaction job. Then if future iterations of this loop will include that file then + // they will not compact. + var expectedFile = getExpected(filesToCompact, fakeFileGenerator); + Preconditions.checkState(expectedFiles.add(expectedFile)); + Preconditions.checkState(filesCopy.add(expectedFile)); - // The set of files running compactions may produce - var expectedFiles = getExpected(params.getRunningCompactions()); + compactionJobs.add(filesToCompact); - if (!Collections.disjoint(filesCopy, expectedFiles)) { - throw new AssertionError(); + if (filesToCompact.size() < maxFilesToCompact) { + // Only continue looking for more compaction jobs when a set of files is found equals + // maxFilesToCompact in size. When the files found is less than the max size its an + // indication that the compaction ratio was no longer met and therefore it would be + // suboptimal to look for more jobs because the smallest optimal set was found. + break; } + } - filesCopy.addAll(expectedFiles); + if (compactionJobs.size() == 1 + && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR) + && compactionJobs.get(0).size() < params.getCandidates().size() + && compactionJobs.get(0).size() <= maxFilesToCompact) { + // USER and SELECTOR compactions must eventually compact all files. When a subset of files + // that meets the compaction ratio is selected, look ahead and see if the next compaction + // would also meet the compaction ratio. If not then compact everything to avoid doing + // more than logarithmic work across multiple comapctions. - group = - findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + var group = compactionJobs.get(0); + var candidatesCopy = new HashSet<>(params.getCandidates()); + + candidatesCopy.removeAll(group); + Preconditions.checkState(candidatesCopy.add(getExpected(group, fakeFileGenerator))); - if (!Collections.disjoint(group, expectedFiles)) { - // file produced by running compaction will eventually compact with existing files, so - // wait. - group = Set.of(); + if (findDataFilesToCompact(candidatesCopy, params.getRatio(), maxFilesToCompact, + maxSizeToCompact).isEmpty()) { + // The next possible compaction does not meet the compaction ratio, so compact + // everything. + compactionJobs.set(0, Set.copyOf(params.getCandidates())); } - } else { - group = Set.of(); } - if (group.isEmpty() + if (compactionJobs.isEmpty() && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR || params.getKind() == CompactionKind.CHOP) && params.getRunningCompactions().stream() .noneMatch(job -> job.getKind() == params.getKind())) { - group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); + // These kinds of compaction require files to compact even if none of the files meet the + // compaction ratio. No files were found using the compaction ratio and no compactions are + // running, so force a compaction. + compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); } - if (group.isEmpty()) { - return params.createPlanBuilder().build(); - } else { - // determine which executor to use based on the size of the files - var ceid = getExecutor(group); - - return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group).build(); - } + var builder = params.createPlanBuilder(); + compactionJobs.forEach(jobFiles -> builder.addJob(createPriority(params, jobFiles), + getExecutor(jobFiles), jobFiles)); + return builder.build(); } private static short createPriority(PlanningParameters params, @@ -346,51 +379,42 @@ public class DefaultCompactionPlanner implements CompactionPlanner { return Long.MAX_VALUE; } - private CompactableFile getExpected(Collection<CompactableFile> files, int count) { + private CompactableFile getExpected(Collection<CompactableFile> files, + FakeFileGenerator fakeFileGenerator) { long size = files.stream().mapToLong(CompactableFile::getEstimatedSize).sum(); - try { - return CompactableFile.create( - new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" + count + ".rf"), size, - 0); - } catch (URISyntaxException e) { - throw new IllegalStateException(e); - } + return fakeFileGenerator.create(size); } - /** - * @return the expected files sizes for sets of compacting files. - */ - private Set<CompactableFile> getExpected(Collection<CompactionJob> compacting) { - - Set<CompactableFile> expected = new HashSet<>(); - - int count = 0; - - for (CompactionJob job : compacting) { - count++; - expected.add(getExpected(job.getFiles(), count)); - } - - return expected; - } - - private static Collection<CompactableFile> + private static List<Collection<CompactableFile>> findMaximalRequiredSetToCompact(Collection<CompactableFile> files, int maxFilesToCompact) { if (files.size() <= maxFilesToCompact) { - return files; + return List.of(files); } List<CompactableFile> sortedFiles = sortByFileSize(files); - int numToCompact = maxFilesToCompact; + // compute the number of full compaction jobs with full files that could run and then subtract + // 1. The 1 is subtracted because the last job is a special case. + int batches = sortedFiles.size() / maxFilesToCompact - 1; - if (sortedFiles.size() > maxFilesToCompact && sortedFiles.size() < 2 * maxFilesToCompact) { - // on the second to last compaction pass, compact the minimum amount of files possible - numToCompact = sortedFiles.size() - maxFilesToCompact + 1; - } + if (batches > 0) { + ArrayList<Collection<CompactableFile>> jobs = new ArrayList<>(); + for (int i = 0; i < batches; i++) { + jobs.add(sortedFiles.subList(i * maxFilesToCompact, (i + 1) * maxFilesToCompact)); + } + return jobs; + } else { + int numToCompact = maxFilesToCompact; + + if (sortedFiles.size() > maxFilesToCompact && sortedFiles.size() < 2 * maxFilesToCompact) { + // On the second to last compaction pass, compact the minimum amount of files possible. This + // is done to avoid unnecessarily compacting the largest files more than once. + numToCompact = sortedFiles.size() - maxFilesToCompact + 1; + } - return sortedFiles.subList(0, numToCompact); + return List.of(sortedFiles.subList(0, numToCompact)); + } } static Collection<CompactableFile> findDataFilesToCompact(Set<CompactableFile> files, diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index 4fcd6e5714..76563555f6 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.spi.compaction; import static com.google.common.collect.MoreCollectors.onlyElement; +import static java.util.stream.Collectors.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -27,10 +28,12 @@ import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; @@ -239,6 +242,149 @@ public class DefaultCompactionPlannerTest { assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); } + @Test + public void testMultipleCompactions() { + // This test validates that when a tablet has many files that multiple compaction jobs can be + // issued at the same time. + for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) { + var planner = createPlanner(false); + var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()); + // simulate 10 larger files, these should not compact at the same time as the smaller files. + // Its more optimal to wait for all of the smaller files to compact and them compact the + // output of compacting the smaller files with the larger files. + IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add); + var params = createPlanningParams(all, all, Set.of(), 2, kind); + var plan = planner.makePlan(params); + + // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each. + assertEquals(66, plan.getJobs().size()); + Set<CompactableFile> filesSeen = new HashSet<>(); + plan.getJobs().forEach(job -> { + assertEquals(15, job.getFiles().size()); + assertEquals(kind, job.getKind()); + assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); + // ensure the files across all of the jobs are disjoint + job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf))); + }); + + // Ensure all of the smaller files are scheduled for compaction. Should not see any of the + // larger files. + assertEquals(IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()), + filesSeen); + } + } + + @Test + public void testMultipleCompactionsAndLargeCompactionRatio() { + var planner = createPlanner(false); + var all = IntStream.range(0, 65).mapToObj(i -> createCF("F" + i, i + 1)).collect(toSet()); + // This compaction ratio would not cause a system compaction, how a user compaction must compact + // all of the files so it should generate some compactions. + var params = createPlanningParams(all, all, Set.of(), 100, CompactionKind.USER); + var plan = planner.makePlan(params); + + assertEquals(3, plan.getJobs().size()); + + var iterator = plan.getJobs().iterator(); + var job1 = iterator.next(); + var job2 = iterator.next(); + var job3 = iterator.next(); + assertTrue(Collections.disjoint(job1.getFiles(), job2.getFiles())); + assertTrue(Collections.disjoint(job1.getFiles(), job3.getFiles())); + assertTrue(Collections.disjoint(job2.getFiles(), job3.getFiles())); + + for (var job : plan.getJobs()) { + assertEquals(15, job.getFiles().size()); + assertEquals(CompactionKind.USER, job.getKind()); + assertTrue(all.containsAll(job.getFiles())); + // Should select three sets of files that are from the smallest 45 files. + assertTrue(job.getFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum() + <= IntStream.range(1, 46).sum()); + } + } + + @Test + public void testMultipleCompactionsAndRunningCompactions() { + // This test validates that when a tablet has many files that multiple compaction jobs can be + // issued at the same time even if there are running compaction as long everything meets the + // compaction ratio. + for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) { + var planner = createPlanner(false); + var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()); + // simulate 10 larger files, these should not compact at the same time as the smaller files. + // Its more optimal to wait for all of the smaller files to compact and them compact the + // output of compacting the smaller files with the larger files. + IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add); + // 30 files are compacting, so they will not be in the candidate set. + var candidates = + IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()); + // create two jobs covering the first 30 files + var job1 = createJob(kind, all, + IntStream.range(0, 15).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet())); + var job2 = createJob(kind, all, + IntStream.range(15, 30).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet())); + var params = createPlanningParams(all, candidates, Set.of(job1, job2), 2, kind); + var plan = planner.makePlan(params); + + // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each. + assertEquals(64, plan.getJobs().size()); + Set<CompactableFile> filesSeen = new HashSet<>(); + plan.getJobs().forEach(job -> { + assertEquals(15, job.getFiles().size()); + assertEquals(kind, job.getKind()); + assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); + // ensure the files across all of the jobs are disjoint + job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf))); + }); + + // Ensure all of the smaller files are scheduled for compaction. Should not see any of the + // larger files. + assertEquals(IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()), + filesSeen); + } + } + + @Test + public void testUserCompactionDoesNotWaitOnSystemCompaction() { + // this test ensure user compactions do not wait on system compactions to complete + var planner = createPlanner(true); + var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "3M", "F5", "3M", "F6", "3M", + "F7", "20M"); + var candidates = createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"); + var compacting = Set + .of(createJob(CompactionKind.SYSTEM, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M"))); + var params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM); + var plan = planner.makePlan(params); + // The planning of the system compaction should find its most optimal to wait on the running + // system compaction and emit zero jobs. + assertEquals(0, plan.getJobs().size()); + + params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + // The planning of user compaction should not take the running system compaction into + // consideration and should create a compaction job. + assertEquals(1, plan.getJobs().size()); + assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"), + getOnlyElement(plan.getJobs()).getFiles()); + + // Reverse the situation and turn the running compaction into a user compaction + compacting = + Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M"))); + params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM); + plan = planner.makePlan(params); + // The planning of a system compaction should not take the running user compaction into account + // and should emit a job + assertEquals(1, plan.getJobs().size()); + assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M"), + getOnlyElement(plan.getJobs()).getFiles()); + + params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER); + plan = planner.makePlan(params); + // The planning of the user compaction should decide the most optimal thing to do is to wait on + // the running user compaction and should not emit any jobs. + assertEquals(0, plan.getJobs().size()); + } + /** * Tests internal type executor with no numThreads set throws error */ @@ -393,15 +539,22 @@ public class DefaultCompactionPlannerTest { .getJobs().iterator().next(); } - private static Set<CompactableFile> createCFs(String... namesSizePairs) - throws URISyntaxException { + private static CompactableFile createCF(String name, long size) { + try { + return CompactableFile + .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static Set<CompactableFile> createCFs(String... namesSizePairs) { Set<CompactableFile> files = new HashSet<>(); for (int i = 0; i < namesSizePairs.length; i += 2) { String name = namesSizePairs[i]; long size = ConfigurationTypeHelper.getFixedMemoryAsBytes(namesSizePairs[i + 1]); - files.add(CompactableFile - .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0)); + files.add(createCF(name, size)); } return files; @@ -425,9 +578,9 @@ public class DefaultCompactionPlannerTest { double ratio, int maxFiles, long maxSize) { var result = DefaultCompactionPlanner.findDataFilesToCompact(files, ratio, maxFiles, maxSize); var expectedNames = expected.stream().map(CompactableFile::getUri).map(URI::getPath) - .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(Collectors.toSet()); + .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(toSet()); var resultNames = result.stream().map(CompactableFile::getUri).map(URI::getPath) - .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(Collectors.toSet()); + .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(toSet()); assertEquals(expectedNames, resultNames); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 025d3ccf34..ffc88ccf6c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -391,7 +391,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { var jobs = compactionGenerator.generateJobs(tm, TabletManagementIterator.determineCompactionKinds(actions)); - LOG.debug("{} may need compacting.", tm.getExtent()); + LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); manager.getCompactionQueues().add(tm, jobs); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 49a27ef7db..636156a624 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -90,7 +90,6 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; @@ -119,8 +118,6 @@ import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -434,21 +431,6 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface } - /** - * Return the Thrift client for the TServer - * - * @param tserver tserver instance - * @return thrift client - * @throws TTransportException thrift error - */ - protected TabletServerClientService.Client getTabletServerConnection(TServerInstance tserver) - throws TTransportException { - LiveTServerSet.TServerConnection connection = tserverSet.getConnection(tserver); - TTransport transport = - this.ctx.getTransportPool().getTransport(connection.getAddress(), 0, this.ctx); - return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport); - } - // ELASTICITY_TODO unit test this code private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job, Set<StoredTabletFile> jobFiles) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 53ded2699c..037488c46f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -182,8 +182,9 @@ public class CompactionIT extends AccumuloClusterHarness { public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); - cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5ms"); - cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "10ms"); + cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "1s"); + cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "100ms"); + cfg.setProperty(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "1s"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @@ -313,6 +314,9 @@ public class CompactionIT extends AccumuloClusterHarness { client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1001"); TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1)); + // In addition to testing errors in compactions, this test also exercises creating lots of + // files to compact. The following will create 1000 files to compact. When changing this test + // try to keep both or create a new test for lots of files to compact. ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1); Ample ample = ((ClientContext) client).getAmple(); @@ -325,12 +329,13 @@ public class CompactionIT extends AccumuloClusterHarness { client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); + tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES, ColumnType.ECOMP).build(); tm = tms.iterator().next(); assertEquals(1, tm.getFiles().size()); + // ensure the failed compactions did not leave anything in the metadata table + assertEquals(0, tm.getExternalCompactions().size()); ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1); - } }