This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 3cc53f50cd66ee92a869b55fad36b7f7d82712d3
Merge: f0bd0069ea 5cd2b91f37
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Apr 14 20:55:52 2025 +0000

    Merge branch '2.1'

 .../org/apache/accumulo/core/conf/Property.java    |   8 ++
 .../accumulo/server/compaction/FileCompactor.java  |  53 ++++++++--
 .../apache/accumulo/server/fs/FileTypePrefix.java  | 114 +++++++++++++++++++++
 .../accumulo/server/fs/FileTypePrefixTest.java     |  95 +++++++++++++++++
 .../tableOps/tableImport/MapImportFileNames.java   |   4 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   1 -
 6 files changed, 265 insertions(+), 10 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index c273391e08,b09050c1fb..54960779de
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@@ -70,12 -69,15 +72,13 @@@ import org.apache.accumulo.core.tablets
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.LocalityGroupUtil;
  import 
org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 -import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 +import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.fs.FileTypePrefix;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
 -import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.mem.LowMemoryDetector.DetectionScope;
  import org.apache.accumulo.server.problems.ProblemReportingIterator;
 -import org.apache.accumulo.server.problems.ProblemReports;
 -import org.apache.accumulo.server.problems.ProblemType;
  import org.apache.hadoop.fs.FileSystem;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -334,17 -336,31 +337,32 @@@ public class FileCompactor implements C
        FileOperations fileFactory = FileOperations.getInstance();
        FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
  
+       // Normally you would not want the DataNode to continue to
+       // cache blocks in the page cache for compaction input files
+       // as these files are normally marked for deletion after a
+       // compaction occurs. However there can be cases where the
+       // compaction input files will continue to be used, like in
+       // the case of bulk import files which may be assigned to many
+       // tablets and will still be needed until all of the tablets
+       // have compacted, or in the case of cloned tables where one
+       // of the tables has compacted the input file but the other
+       // has not.
+       String dropCachePrefixProperty =
+           acuTableConf.get(Property.TABLE_COMPACTION_INPUT_DROP_CACHE_BEHIND);
+       final EnumSet<FileTypePrefix> dropCacheFilePrefixes =
+           FileTypePrefix.typesFromList(dropCachePrefixProperty);
+ 
        final boolean isMinC = env.getIteratorScope() == 
IteratorUtil.IteratorScope.minc;
  
 -      final boolean dropCacheBehindOutput = 
!RootTable.ID.equals(this.extent.tableId())
 -          && !MetadataTable.ID.equals(this.extent.tableId())
 -          && ((isMinC && 
acuTableConf.getBoolean(Property.TABLE_MINC_OUTPUT_DROP_CACHE))
 -              || (!isMinC && 
acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE)));
 +      final boolean dropCacheBehindOutput =
 +          !AccumuloTable.ROOT.tableId().equals(this.extent.tableId())
 +              && 
!AccumuloTable.METADATA.tableId().equals(this.extent.tableId())
 +              && ((isMinC && 
acuTableConf.getBoolean(Property.TABLE_MINC_OUTPUT_DROP_CACHE))
 +                  || (!isMinC && 
acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE)));
  
 -      WriterBuilder outBuilder = fileFactory.newWriterBuilder()
 -          .forFile(outputFile.getMetaInsert(), ns, ns.getConf(), 
cryptoService)
 -          
.withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter());
 +      WriterBuilder outBuilder =
 +          fileFactory.newWriterBuilder().forFile(outputFile, ns, 
ns.getConf(), cryptoService)
 +              .withTableConfiguration(acuTableConf);
        if (dropCacheBehindOutput) {
          outBuilder.dropCachesBehind();
        }
@@@ -459,11 -476,26 +479,26 @@@
        try {
  
          FileOperations fileFactory = FileOperations.getInstance();
 -        FileSystem fs = this.fs.getFileSystemByPath(mapFile.getPath());
 +        FileSystem fs = this.fs.getFileSystemByPath(dataFile.getPath());
          FileSKVIterator reader;
  
-         reader = fileFactory.newReaderBuilder().forFile(dataFile, fs, 
fs.getConf(), cryptoService)
-             .withTableConfiguration(acuTableConf).dropCachesBehind().build();
+         boolean dropCacheBehindCompactionInputFile = false;
+         if (dropCacheFilePrefixes.contains(FileTypePrefix.ALL)) {
+           dropCacheBehindCompactionInputFile = true;
+         } else {
 -          FileTypePrefix type = 
FileTypePrefix.fromFileName(mapFile.getFileName());
++          FileTypePrefix type = 
FileTypePrefix.fromFileName(dataFile.getFileName());
+           if (dropCacheFilePrefixes.contains(type)) {
+             dropCacheBehindCompactionInputFile = true;
+           }
+         }
+ 
 -        ReaderBuilder readerBuilder = fileFactory.newReaderBuilder()
 -            .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService)
 -            
.withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter());
++        ReaderBuilder readerBuilder =
++            fileFactory.newReaderBuilder().forFile(dataFile, fs, 
fs.getConf(), cryptoService)
++                .withTableConfiguration(acuTableConf);
+         if (dropCacheBehindCompactionInputFile) {
+           readerBuilder.dropCachesBehind();
+         }
+         reader = readerBuilder.build();
  
          readers.add(reader);
  
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java
index f0fefaa3ad,7032b578c7..73a81e4db2
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MapImportFileNames.java
@@@ -83,11 -83,12 +84,12 @@@ class MapImportFileNames extends Manage
                continue;
              }
            } else {
 -            // assume it is a map file
 -            extension = Constants.MAPFILE_EXTENSION;
 +            // skip files without an extension
 +            continue;
            }
  
-           String newName = "I" + namer.getNextName() + "." + extension;
+           String newName =
+               FileTypePrefix.BULK_IMPORT.createFileName(namer.getNextName() + 
"." + extension);
  
            mappingsWriter.append(fileName);
            mappingsWriter.append(':');
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 3456bd943e,dd599b7dcf..4f1bd9bb31
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -220,30 -256,43 +220,29 @@@ public class Tablet extends TabletBase 
      public boolean closed = false;
    }
  
 -  private String chooseTabletDir() throws IOException {
 -    VolumeChooserEnvironment chooserEnv =
 -        new VolumeChooserEnvironmentImpl(extent.tableId(), extent.endRow(), 
context);
 -    String dirUri = tabletServer.getVolumeManager().choose(chooserEnv, 
context.getBaseUris())
 -        + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.tableId() + 
Path.SEPARATOR + dirName;
 -    checkTabletDir(new Path(dirUri));
 -    return dirUri;
 -  }
 -
 -  TabletFile getNextMapFilename(FileTypePrefix prefix) throws IOException {
 -    String extension = FileOperations.getNewFileExtension(tableConfiguration);
 -    return new TabletFile(new Path(chooseTabletDir() + "/"
 -        + 
prefix.createFileName(context.getUniqueNameAllocator().getNextName() + "." + 
extension)));
 +  ReferencedTabletFile getNextDataFilename(FilePrefix prefix) throws 
IOException {
 +    return TabletNameGenerator.getNextDataFilename(prefix, context, extent,
 +        getMetadata().getDirName(), dir -> checkTabletDir(new Path(dir)));
    }
  
 -  TabletFile getNextMapFilenameForMajc(boolean propagateDeletes) throws 
IOException {
 -    FileTypePrefix prefix =
 -        !propagateDeletes ? FileTypePrefix.FULL_COMPACTION : 
FileTypePrefix.COMPACTION;
 -    String tmpFileName = getNextMapFilename(prefix).getMetaInsert() + "_tmp";
 -    return new TabletFile(new Path(tmpFileName));
 -  }
 -
 -  private void checkTabletDir(Path path) throws IOException {
 -    if (!checkedTabletDirs.contains(path)) {
 -      FileStatus[] files = null;
 -      try {
 -        files = getTabletServer().getVolumeManager().listStatus(path);
 -      } catch (FileNotFoundException ex) {
 -        // ignored
 -      }
 -
 -      if (files == null) {
 -        log.debug("Tablet {} had no dir, creating {}", extent, path);
 +  private void checkTabletDir(Path path) {
 +    try {
 +      if (!checkedTabletDirs.contains(path)) {
 +        FileStatus[] files = null;
 +        try {
 +          files = getTabletServer().getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
  
 -        getTabletServer().getVolumeManager().mkdirs(path);
 +        if (files == null) {
 +          log.debug("Tablet {} had no dir, creating {}", extent, path);
- 
 +          getTabletServer().getVolumeManager().mkdirs(path);
 +        }
 +        checkedTabletDirs.add(path);
        }
 -      checkedTabletDirs.add(path);
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
      }
    }
  

Reply via email to