This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new d02340a7ee Added Indication of Intermediate Compactions to CompactionConfigurer (#4118) d02340a7ee is described below commit d02340a7eece992b85937876674d8aa2f51baaf6 Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Thu Jan 4 18:45:11 2024 -0500 Added Indication of Intermediate Compactions to CompactionConfigurer (#4118) Changes - Added getSelectedFiles() method to CompactionConfigurer.InputParameters - Added method functionality in the two implementations of this: ShellCompactCommandConfigurerTest and CompactableUtils - Tested functionality in new IT: testGetSelectedFilesForCompaction() in CompactionIT - Added new method getCompressionType() to PrintBCInfo (used in the test) - Changed several method signatures to pass around necessary info for getSelectedFiles() implementations - Renamed 'files' variable to 'inputFiles' in CompactableUtils and CompactableImpl for a more clear distinction between inputFiles and the (new) selectedFiles. Potentially still left TODO: - Add this functionality for external compactions? - Move CompressionTypeConfigurer to it's own class? * Moved 'fsin' to try-with-resources * Added @since to getSelectedFiles() --- .../admin/compaction/CompactionConfigurer.java | 23 +++ .../core/file/rfile/bcfile/PrintBCInfo.java | 14 +- .../ShellCompactCommandConfigurerTest.java | 5 + .../accumulo/tserver/tablet/CompactableImpl.java | 7 +- .../accumulo/tserver/tablet/CompactableUtils.java | 44 +++-- .../accumulo/test/functional/CompactionIT.java | 189 +++++++++++++++++++++ 6 files changed, 264 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java index 6a7e48edd5..1e0e47c0f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.client.admin.compaction; import java.util.Collection; import java.util.Map; +import java.util.Set; import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.data.TableId; @@ -59,6 +60,28 @@ public interface CompactionConfigurer { public Collection<CompactableFile> getInputFiles(); + /** + * For user and selector compactions: + * <ul> + * <li>Returns the selected set of files to be compacted.</li> + * <li>When getInputFiles() (inputFiles) and getSelectedFiles() (selectedFiles) are equal, then + * this is the final compaction.</li> + * <li>When they are not equal, this is an intermediate compaction.</li> + * <li>Intermediate compactions are compactions whose resultant RFile will be consumed by + * another compaction.</li> + * <li>inputFiles and selectedFiles can be compared using: <code> + * selectedFiles.equals(inputFiles instanceof Set ? inputFiles : Set.copyOf(inputFiles)) + * </code></li> + * </ul> + * For system compactions: + * <ul> + * <li>There is no selected set of files so the empty set is returned.</li> + * </ul> + * + * @since 3.1 + */ + public Set<CompactableFile> getSelectedFiles(); + PluginEnvironment getEnvironment(); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintBCInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintBCInfo.java index fd8356a026..55d4338cb3 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintBCInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintBCInfo.java @@ -47,8 +47,7 @@ public class PrintBCInfo { CryptoService cryptoService = NoCryptoServiceFactory.NONE; public void printMetaBlockInfo() throws IOException { - FSDataInputStream fsin = fs.open(path); - try (BCFile.Reader bcfr = + try (FSDataInputStream fsin = fs.open(path); BCFile.Reader bcfr = new BCFile.Reader(fsin, fs.getFileStatus(path).getLen(), conf, cryptoService)) { Set<Entry<String,MetaIndexEntry>> es = bcfr.metaIndex.index.entrySet(); @@ -67,6 +66,17 @@ public class PrintBCInfo { } } + public String getCompressionType() throws IOException { + try (FSDataInputStream fsin = fs.open(path); BCFile.Reader bcfr = + new BCFile.Reader(fsin, fs.getFileStatus(path).getLen(), conf, cryptoService)) { + + Set<Entry<String,MetaIndexEntry>> es = bcfr.metaIndex.index.entrySet(); + + return es.stream().filter(entry -> entry.getKey().equals("RFile.index")).findFirst() + .map(entry -> entry.getValue().getCompressionAlgorithm().getName()).orElse(null); + } + } + static class Opts extends ConfigOpts { @Parameter(description = " <file>") String file; diff --git a/core/src/test/java/org/apache/accumulo/core/compaction/ShellCompactCommandConfigurerTest.java b/core/src/test/java/org/apache/accumulo/core/compaction/ShellCompactCommandConfigurerTest.java index 0d423d0924..ef68da90bb 100644 --- a/core/src/test/java/org/apache/accumulo/core/compaction/ShellCompactCommandConfigurerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/compaction/ShellCompactCommandConfigurerTest.java @@ -88,6 +88,11 @@ public class ShellCompactCommandConfigurerTest { return files; } + @Override + public Set<CompactableFile> getSelectedFiles() { + throw new UnsupportedOperationException(); + } + @Override public PluginEnvironment getEnvironment() { return null; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index ce91f5d3ac..ab969fbdda 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -134,7 +134,8 @@ public class CompactableImpl implements Compactable { public interface CompactionHelper { Set<StoredTabletFile> selectFiles(SortedMap<StoredTabletFile,DataFileValue> allFiles); - Map<String,String> getConfigOverrides(Set<CompactableFile> files); + Map<String,String> getConfigOverrides(Set<CompactableFile> inputFiles, + Set<StoredTabletFile> selectedFiles, CompactionKind kind); } @@ -1128,8 +1129,8 @@ public class CompactableImpl implements Compactable { var cInfo = ocInfo.orElseThrow(); try { - Map<String,String> overrides = - CompactableUtils.getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles()); + Map<String,String> overrides = CompactableUtils.getOverrides(job.getKind(), tablet, + cInfo.localHelper, job.getFiles(), cInfo.selectedFiles); ReferencedTabletFile compactTmpName = tablet.getNextDataFilenameForMajc(cInfo.propagateDeletes); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index 321c07a714..c1d2a2c8a0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -81,7 +81,8 @@ import com.google.common.collect.Collections2; public class CompactableUtils { - static Map<String,String> computeOverrides(Tablet tablet, Set<CompactableFile> files) { + static Map<String,String> computeOverrides(Tablet tablet, Set<CompactableFile> inputFiles, + Set<StoredTabletFile> selectedFiles, CompactionKind kind) { var tconf = tablet.getTableConfiguration(); var configurorClass = tconf.get(Property.TABLE_COMPACTION_CONFIGURER); @@ -91,11 +92,12 @@ public class CompactableUtils { var opts = tconf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS); - return computeOverrides(tablet, files, new PluginConfig(configurorClass, opts)); + return computeOverrides(tablet, inputFiles, selectedFiles, + new PluginConfig(configurorClass, opts), kind); } - static Map<String,String> computeOverrides(Tablet tablet, Set<CompactableFile> files, - PluginConfig cfg) { + static Map<String,String> computeOverrides(Tablet tablet, Set<CompactableFile> inputFiles, + Set<StoredTabletFile> selectedFiles, PluginConfig cfg, CompactionKind kind) { CompactionConfigurer configurer = CompactableUtils.newInstance(tablet.getTableConfiguration(), cfg.getClassName(), CompactionConfigurer.class); @@ -121,7 +123,18 @@ public class CompactableUtils { var overrides = configurer.override(new CompactionConfigurer.InputParameters() { @Override public Collection<CompactableFile> getInputFiles() { - return files; + return inputFiles; + } + + @Override + public Set<CompactableFile> getSelectedFiles() { + if (kind == CompactionKind.USER || kind == CompactionKind.SELECTOR) { + var dataFileSizes = tablet.getDatafileManager().getDatafileSizes(); + return selectedFiles.stream().map(f -> new CompactableFileImpl(f, dataFileSizes.get(f))) + .collect(Collectors.toSet()); + } else { // kind == CompactionKind.SYSTEM + return Collections.emptySet(); + } } @Override @@ -266,7 +279,8 @@ public class CompactableUtils { } @Override - public Map<String,String> getConfigOverrides(Set<CompactableFile> files) { + public Map<String,String> getConfigOverrides(Set<CompactableFile> inputFiles, + Set<StoredTabletFile> selectedFiles, CompactionKind kind) { return null; } } @@ -306,9 +320,11 @@ public class CompactableUtils { } @Override - public Map<String,String> getConfigOverrides(Set<CompactableFile> files) { + public Map<String,String> getConfigOverrides(Set<CompactableFile> inputFiles, + Set<StoredTabletFile> selectedFiles, CompactionKind kind) { if (!UserCompactionUtils.isDefault(compactionConfig.getConfigurer())) { - return computeOverrides(tablet, files, compactionConfig.getConfigurer()); + return computeOverrides(tablet, inputFiles, selectedFiles, compactionConfig.getConfigurer(), + kind); } return null; @@ -340,16 +356,17 @@ public class CompactableUtils { } public static Map<String,String> getOverrides(CompactionKind kind, Tablet tablet, - CompactionHelper driver, Set<CompactableFile> files) { + CompactionHelper driver, Set<CompactableFile> inputFiles, + Set<StoredTabletFile> selectedFiles) { Map<String,String> overrides = null; if (kind == CompactionKind.USER || kind == CompactionKind.SELECTOR) { - overrides = driver.getConfigOverrides(files); + overrides = driver.getConfigOverrides(inputFiles, selectedFiles, kind); } if (overrides == null) { - overrides = computeOverrides(tablet, files); + overrides = computeOverrides(tablet, inputFiles, selectedFiles, kind); } if (overrides == null) { @@ -380,8 +397,9 @@ public class CompactableUtils { throws IOException, CompactionCanceledException { TableConfiguration tableConf = tablet.getTableConfiguration(); - AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf, - getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles())); + AccumuloConfiguration compactionConfig = + getCompactionConfig(tableConf, getOverrides(job.getKind(), tablet, cInfo.localHelper, + job.getFiles(), cInfo.selectedFiles)); FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(), compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, compactionConfig, diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 6e48bf1cf0..c6b194e8c7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -28,10 +28,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -56,6 +63,7 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -65,6 +73,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.bcfile.PrintBCInfo; import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; @@ -73,6 +82,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.AgeOffFilter; import org.apache.accumulo.core.iterators.user.GrepIterator; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; @@ -81,6 +91,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; @@ -179,6 +190,51 @@ public class CompactionIT extends AccumuloClusterHarness { } } + /** + * A CompactionConfigurer that can be used to configure the compression type used for intermediate + * and final compactions. An intermediate compaction is a compaction whose result is short-lived. + * For instance, if 10 files are selected for compaction, 5 of these files are compacted to a file + * 'f0', then f0 is compacted with the 5 remaining files creating file 'f1', then f0 would be an + * intermediate file and f1 would be the final file. + */ + public static class CompressionTypeConfigurer implements CompactionConfigurer { + public static final String COMPRESS_TYPE_KEY = Property.TABLE_FILE_COMPRESSION_TYPE.getKey(); + public static final String FINAL_COMPRESS_TYPE_KEY = "final.compress.type"; + public static final String INTERMEDIATE_COMPRESS_TYPE_KEY = "intermediate.compress.type"; + private String finalCompressTypeVal; + private String interCompressTypeVal; + + @Override + public void init(InitParameters iparams) { + var options = iparams.getOptions(); + String finalCompressTypeVal = options.get(FINAL_COMPRESS_TYPE_KEY); + String interCompressTypeVal = options.get(INTERMEDIATE_COMPRESS_TYPE_KEY); + if (finalCompressTypeVal != null && interCompressTypeVal != null) { + this.finalCompressTypeVal = finalCompressTypeVal; + this.interCompressTypeVal = interCompressTypeVal; + } else { + throw new IllegalArgumentException( + "Must set " + FINAL_COMPRESS_TYPE_KEY + " and " + INTERMEDIATE_COMPRESS_TYPE_KEY); + } + } + + @Override + public Overrides override(InputParameters params) { + var inputFiles = params.getInputFiles(); + var selectedFiles = params.getSelectedFiles(); + // If this is the final compaction, set the compression type to the value set for + // finalCompressTypeVal + // If this is an intermediate compaction, set the compression type to the value set for + // interCompressTypeVal + if (selectedFiles.equals(inputFiles instanceof Set ? inputFiles : Set.copyOf(inputFiles))) { + return new Overrides(Map.of(COMPRESS_TYPE_KEY, finalCompressTypeVal)); + } else { + return new Overrides(Map.of(COMPRESS_TYPE_KEY, interCompressTypeVal)); + } + } + + } + private static final Logger log = LoggerFactory.getLogger(CompactionIT.class); private static final int MAX_DATA = 1000; @@ -699,6 +755,139 @@ public class CompactionIT extends AccumuloClusterHarness { } } + @Test + public void testGetSelectedFilesForCompaction() throws Exception { + + // Tests CompactionConfigurer.InputParameters.getSelectedFiles() + + String tableName = this.getUniqueNames(1)[0]; + // Disable GC so intermediate compaction files are not deleted + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); + // This is done to avoid system compactions - we want to do all the compactions ourselves + props.put("table.compaction.dispatcher.opts.service.system", "nonexitant"); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + client.tableOperations().create(tableName, ntc); + + // The following will create 4 small and 4 large RFiles + // The 4 small files will be compacted into one file (an "intermediate compaction" file) + // Then, this file will be compacted with the 4 large files, creating the final compaction + // file + byte[] largeData = new byte[1_000_000]; + byte[] smallData = new byte[100_000]; + final int numFiles = 8; + Arrays.fill(largeData, (byte) 65); + Arrays.fill(smallData, (byte) 65); + try (var writer = client.createBatchWriter(tableName)) { + for (int i = 0; i < numFiles; i++) { + Mutation mut = new Mutation("r" + i); + if (i < numFiles / 2) { + mut.at().family("f").qualifier("q").put(largeData); + } else { + mut.at().family("f").qualifier("q").put(smallData); + } + writer.addMutation(mut); + writer.flush(); + client.tableOperations().flush(tableName, null, null, true); + } + } + + client.tableOperations().compact(tableName, + new CompactionConfig().setWait(true) + .setConfigurer(new PluginConfig(CompressionTypeConfigurer.class.getName(), + Map.of(CompressionTypeConfigurer.FINAL_COMPRESS_TYPE_KEY, "snappy", + CompressionTypeConfigurer.INTERMEDIATE_COMPRESS_TYPE_KEY, "gz")))); + + var tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + // The directory of the RFiles + java.nio.file.Path rootPath = null; + // The path to the final compaction RFile (located within rootPath) + java.nio.file.Path finalCompactionFilePath = null; + int count = 0; + try (var tabletsMeta = + TabletsMetadata.builder(client).forTable(tableId).fetch(ColumnType.FILES).build()) { + for (TabletMetadata tm : tabletsMeta) { + for (StoredTabletFile stf : tm.getFiles()) { + // Since the 8 files should be compacted down to 1 file, these should only be set once + finalCompactionFilePath = Paths.get(stf.getPath().toUri().getRawPath()); + rootPath = Paths.get(stf.getPath().getParent().toUri().getRawPath()); + count++; + } + } + } + assertEquals(1, count); + assertNotNull(finalCompactionFilePath); + assertNotNull(rootPath); + String finalCompactionFile = finalCompactionFilePath.toString(); + // The following will find the intermediate compaction file in the root path. + // Intermediate compaction files begin with 'C' and end with '.rf' + final String[] interCompactionFile = {null}; + Files.walkFileTree(rootPath, new SimpleFileVisitor<java.nio.file.Path>() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) + throws IOException { + String regex = "^C.*\\.rf$"; + java.nio.file.Path fileName = (file != null) ? file.getFileName() : null; + if (fileName != null && fileName.toString().matches(regex)) { + interCompactionFile[0] = file.toString(); + return FileVisitResult.TERMINATE; + } + return FileVisitResult.CONTINUE; + } + }); + assertNotNull(interCompactionFile[0]); + String[] args = new String[3]; + args[0] = "--props"; + args[1] = getCluster().getAccumuloPropertiesPath(); + args[2] = finalCompactionFile; + PrintBCInfo bcInfo = new PrintBCInfo(args); + String finalCompressionType = bcInfo.getCompressionType(); + // The compression type used on the final compaction file should be 'snappy' + assertEquals("snappy", finalCompressionType); + args[2] = interCompactionFile[0]; + bcInfo = new PrintBCInfo(args); + String interCompressionType = bcInfo.getCompressionType(); + // The compression type used on the intermediate compaction file should be 'gz' + assertEquals("gz", interCompressionType); + } finally { + // Re-enable GC + getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR); + } + } + + /** + * Was used in debugging {@link #testGetSelectedFilesForCompaction}. May be useful later. + * + * @param client An accumulo client + * @param tableName The name of the table + * @return a map of the RFiles to their size in bytes + */ + private Map<String,Long> getFileSizeMap(AccumuloClient client, String tableName) { + var tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Map<String,Long> map = new HashMap<>(); + + try (var tabletsMeta = + TabletsMetadata.builder(client).forTable(tableId).fetch(ColumnType.FILES).build()) { + for (TabletMetadata tm : tabletsMeta) { + for (StoredTabletFile stf : tm.getFiles()) { + try { + String filePath = stf.getPath().toString(); + Long fileSize = + FileSystem.getLocal(new Configuration()).getFileStatus(stf.getPath()).getLen(); + map.put(filePath, fileSize); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + return map; + } + } + private int countFiles(AccumuloClient c) throws Exception { try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));