This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 3cc53f50cd66ee92a869b55fad36b7f7d82712d3 Merge: f0bd0069ea 5cd2b91f37 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Apr 14 20:55:52 2025 +0000 Merge branch '2.1' .../org/apache/accumulo/core/conf/Property.java | 8 ++ .../accumulo/server/compaction/FileCompactor.java | 53 ++++++++-- .../apache/accumulo/server/fs/FileTypePrefix.java | 114 +++++++++++++++++++++ .../accumulo/server/fs/FileTypePrefixTest.java | 95 +++++++++++++++++ .../tableOps/tableImport/MapImportFileNames.java | 4 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 1 - 6 files changed, 265 insertions(+), 10 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index c273391e08,b09050c1fb..54960779de --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@@ -70,12 -69,15 +72,13 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.server.fs.FileTypePrefix; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; -import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.mem.LowMemoryDetector.DetectionScope; import org.apache.accumulo.server.problems.ProblemReportingIterator; -import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.problems.ProblemType; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -334,17 -336,31 +337,32 @@@ public class FileCompactor implements C FileOperations fileFactory = FileOperations.getInstance(); FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath()); + // Normally you would not want the DataNode to continue to + // cache blocks in the page cache for compaction input files + // as these files are normally marked for deletion after a + // compaction occurs. However there can be cases where the + // compaction input files will continue to be used, like in + // the case of bulk import files which may be assigned to many + // tablets and will still be needed until all of the tablets + // have compacted, or in the case of cloned tables where one + // of the tables has compacted the input file but the other + // has not. + String dropCachePrefixProperty = + acuTableConf.get(Property.TABLE_COMPACTION_INPUT_DROP_CACHE_BEHIND); + final EnumSet<FileTypePrefix> dropCacheFilePrefixes = + FileTypePrefix.typesFromList(dropCachePrefixProperty); + final boolean isMinC = env.getIteratorScope() == IteratorUtil.IteratorScope.minc; - final boolean dropCacheBehindOutput = !RootTable.ID.equals(this.extent.tableId()) - && !MetadataTable.ID.equals(this.extent.tableId()) - && ((isMinC && acuTableConf.getBoolean(Property.TABLE_MINC_OUTPUT_DROP_CACHE)) - || (!isMinC && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE))); + final boolean dropCacheBehindOutput = + !AccumuloTable.ROOT.tableId().equals(this.extent.tableId()) + && !AccumuloTable.METADATA.tableId().equals(this.extent.tableId()) + && ((isMinC && acuTableConf.getBoolean(Property.TABLE_MINC_OUTPUT_DROP_CACHE)) + || (!isMinC && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE))); - WriterBuilder outBuilder = fileFactory.newWriterBuilder() - .forFile(outputFile.getMetaInsert(), ns, ns.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()); + WriterBuilder outBuilder = + fileFactory.newWriterBuilder().forFile(outputFile, ns, ns.getConf(), cryptoService) + .withTableConfiguration(acuTableConf); if (dropCacheBehindOutput) { outBuilder.dropCachesBehind(); } @@@ -459,11 -476,26 +479,26 @@@ try { FileOperations fileFactory = FileOperations.getInstance(); - FileSystem fs = this.fs.getFileSystemByPath(mapFile.getPath()); + FileSystem fs = this.fs.getFileSystemByPath(dataFile.getPath()); FileSKVIterator reader; - reader = fileFactory.newReaderBuilder().forFile(dataFile, fs, fs.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).dropCachesBehind().build(); + boolean dropCacheBehindCompactionInputFile = false; + if (dropCacheFilePrefixes.contains(FileTypePrefix.ALL)) { + dropCacheBehindCompactionInputFile = true; + } else { - FileTypePrefix type = FileTypePrefix.fromFileName(mapFile.getFileName()); ++ FileTypePrefix type = FileTypePrefix.fromFileName(dataFile.getFileName()); + if (dropCacheFilePrefixes.contains(type)) { + dropCacheBehindCompactionInputFile = true; + } + } + - ReaderBuilder readerBuilder = fileFactory.newReaderBuilder() - .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()); ++ ReaderBuilder readerBuilder = ++ fileFactory.newReaderBuilder().forFile(dataFile, fs, fs.getConf(), cryptoService) ++ .withTableConfiguration(acuTableConf); + if (dropCacheBehindCompactionInputFile) { + readerBuilder.dropCachesBehind(); + } + reader = readerBuilder.build(); readers.add(reader); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java index f0fefaa3ad,7032b578c7..73a81e4db2 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java @@@ -83,11 -83,12 +84,12 @@@ class MapImportFileNames extends Manage continue; } } else { - // assume it is a map file - extension = Constants.MAPFILE_EXTENSION; + // skip files without an extension + continue; } - String newName = "I" + namer.getNextName() + "." + extension; + String newName = + FileTypePrefix.BULK_IMPORT.createFileName(namer.getNextName() + "." + extension); mappingsWriter.append(fileName); mappingsWriter.append(':'); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 3456bd943e,dd599b7dcf..4f1bd9bb31 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -220,30 -256,43 +220,29 @@@ public class Tablet extends TabletBase public boolean closed = false; } - private String chooseTabletDir() throws IOException { - VolumeChooserEnvironment chooserEnv = - new VolumeChooserEnvironmentImpl(extent.tableId(), extent.endRow(), context); - String dirUri = tabletServer.getVolumeManager().choose(chooserEnv, context.getBaseUris()) - + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.tableId() + Path.SEPARATOR + dirName; - checkTabletDir(new Path(dirUri)); - return dirUri; - } - - TabletFile getNextMapFilename(FileTypePrefix prefix) throws IOException { - String extension = FileOperations.getNewFileExtension(tableConfiguration); - return new TabletFile(new Path(chooseTabletDir() + "/" - + prefix.createFileName(context.getUniqueNameAllocator().getNextName() + "." + extension))); + ReferencedTabletFile getNextDataFilename(FilePrefix prefix) throws IOException { + return TabletNameGenerator.getNextDataFilename(prefix, context, extent, + getMetadata().getDirName(), dir -> checkTabletDir(new Path(dir))); } - TabletFile getNextMapFilenameForMajc(boolean propagateDeletes) throws IOException { - FileTypePrefix prefix = - !propagateDeletes ? FileTypePrefix.FULL_COMPACTION : FileTypePrefix.COMPACTION; - String tmpFileName = getNextMapFilename(prefix).getMetaInsert() + "_tmp"; - return new TabletFile(new Path(tmpFileName)); - } - - private void checkTabletDir(Path path) throws IOException { - if (!checkedTabletDirs.contains(path)) { - FileStatus[] files = null; - try { - files = getTabletServer().getVolumeManager().listStatus(path); - } catch (FileNotFoundException ex) { - // ignored - } - - if (files == null) { - log.debug("Tablet {} had no dir, creating {}", extent, path); + private void checkTabletDir(Path path) { + try { + if (!checkedTabletDirs.contains(path)) { + FileStatus[] files = null; + try { + files = getTabletServer().getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } - getTabletServer().getVolumeManager().mkdirs(path); + if (files == null) { + log.debug("Tablet {} had no dir, creating {}", extent, path); - + getTabletServer().getVolumeManager().mkdirs(path); + } + checkedTabletDirs.add(path); } - checkedTabletDirs.add(path); + } catch (IOException e) { + throw new UncheckedIOException(e); } }