This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b595f3cf72 HDDS-13770. SstBackup Directory can have orphan files 
after bootstrap not present in compaction log table (#9169)
1b595f3cf72 is described below

commit 1b595f3cf721ea53844bc6973e56eefbb235cc6f
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Thu Oct 23 07:37:45 2025 -0700

    HDDS-13770. SstBackup Directory can have orphan files after bootstrap not 
present in compaction log table (#9169)
    
    Reviewed-by: Swaminathan Balachandran <[email protected]>
    Reviewed-by: Sadanand Shenoy <[email protected]>
---
 .../om/OMDBCheckpointServletInodeBasedXfer.java    | 141 +++++++++++----------
 1 file changed, 76 insertions(+), 65 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index f967e30ec52..27e7f1c2d6d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -44,11 +44,13 @@
 import java.nio.file.StandardOpenOption;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -64,6 +66,7 @@
 import org.apache.hadoop.hdds.recon.ReconConfig;
 import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -71,6 +74,7 @@
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -249,7 +253,9 @@ public void writeDbDataToStream(HttpServletRequest request, 
OutputStream destina
       if (shouldContinue) {
         // we finished transferring files from snapshot DB's by now and
         // this is the last step where we transfer the active om.db contents
-        checkpoint = createAndPrepareCheckpoint(tmpdir, true);
+        // get the list of sst files of the checkpoint.
+        checkpoint = createAndPrepareCheckpoint(true);
+        List<Path> sstBackupFiles = 
extractSSTFilesFromCompactionLog(checkpoint);
         // unlimited files as we want the Active DB contents to be transferred 
in a single batch
         maxTotalSstSize.set(Long.MAX_VALUE);
         Path checkpointDir = checkpoint.getCheckpointLocation();
@@ -257,12 +263,10 @@ public void writeDbDataToStream(HttpServletRequest 
request, OutputStream destina
         writeDBToArchive(sstFilesToExclude, checkpointDir,
             maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, 
false);
         if (includeSnapshotData) {
-          Path tmpCompactionLogDir = 
tmpdir.resolve(getCompactionLogDir().getFileName());
-          Path tmpSstBackupDir = 
tmpdir.resolve(getSstBackupDir().getFileName());
-          writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, 
maxTotalSstSize, archiveOutputStream, tmpdir,
-              hardLinkFileMap, getCompactionLogDir(), false);
-          writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, 
maxTotalSstSize, archiveOutputStream, tmpdir,
-              hardLinkFileMap, getSstBackupDir(), false);
+          writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), 
maxTotalSstSize, archiveOutputStream, tmpdir,
+              hardLinkFileMap, false);
+          writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
+              maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, 
false);
           // This is done to ensure all data to be copied correctly is flushed 
in the snapshot DB
           transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, 
maxTotalSstSize,
               archiveOutputStream, hardLinkFileMap);
@@ -317,14 +321,6 @@ private void transferSnapshotData(Set<String> 
sstFilesToExclude, Path tmpdir, Se
     }
   }
 
-  @VisibleForTesting
-  boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
-      AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> 
archiveOutputStream,
-      Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile) 
throws IOException {
-    return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
-        archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile);
-  }
-
   private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
     if (checkpoint != null) {
       try {
@@ -402,18 +398,30 @@ Set<Path> getSnapshotDirs(OMMetadataManager 
omMetadataManager) throws IOExceptio
     return snapshotPaths;
   }
 
+  @VisibleForTesting
+  boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, 
AtomicLong maxTotalSstSize,
+      ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
+      Map<String, String> hardLinkFileMap, boolean onlySstFile) throws 
IOException {
+    if (!Files.exists(dbDir)) {
+      LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
+      return true;
+    }
+    Stream<Path> files = Files.list(dbDir);
+    return writeDBToArchive(sstFilesToExclude, files,
+        maxTotalSstSize, archiveOutputStream, tmpDir, hardLinkFileMap, 
onlySstFile);
+  }
+
   /**
    * Writes database files to the archive, handling deduplication based on 
inode IDs.
    * Here the dbDir could either be a snapshot db directory, the active om.db,
    * compaction log dir, sst backup dir.
    *
    * @param sstFilesToExclude Set of SST file IDs to exclude from the archive
-   * @param dbDir Directory containing database files to archive
+   * @param files Stream of files to archive
    * @param maxTotalSstSize Maximum total size of SST files to include
    * @param archiveOutputStream Archive output stream
    * @param tmpDir Temporary directory for processing
    * @param hardLinkFileMap Map of hardlink file paths to their unique 
identifiers for deduplication
-   * @param destDir Destination directory for the archived files. If null,
    * the archived files are not moved to this directory.
    * @param onlySstFile If true, only SST files are processed. If false, all 
files are processed.
    * <p>
@@ -424,49 +432,40 @@ Set<Path> getSnapshotDirs(OMMetadataManager 
omMetadataManager) throws IOExceptio
    * @throws IOException if an I/O error occurs
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
-  private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, 
AtomicLong maxTotalSstSize,
+  private boolean writeDBToArchive(Set<String> sstFilesToExclude, Stream<Path> 
files, AtomicLong maxTotalSstSize,
       ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
-      Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) 
throws IOException {
-    if (!Files.exists(dbDir)) {
-      LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
-      return true;
-    }
+      Map<String, String> hardLinkFileMap, boolean onlySstFile) throws 
IOException {
     long bytesWritten = 0L;
     int filesWritten = 0;
     long lastLoggedTime = Time.monotonicNow();
-    try (Stream<Path> files = Files.list(dbDir)) {
-      Iterable<Path> iterable = files::iterator;
-      for (Path dbFile : iterable) {
-        if (!Files.isDirectory(dbFile)) {
-          if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
-            continue;
+    Iterable<Path> iterable = files::iterator;
+    for (Path dbFile : iterable) {
+      if (!Files.isDirectory(dbFile)) {
+        if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
+          continue;
+        }
+        String fileId = 
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
+        if (hardLinkFileMap != null) {
+          String path = dbFile.toFile().getAbsolutePath();
+          // if the file is in the om checkpoint dir, then we need to change 
the path to point to the OM DB.
+          if (path.contains(OM_CHECKPOINT_DIR)) {
+            path = 
getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
           }
-          String fileId = 
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
-          if (hardLinkFileMap != null) {
-            String path = dbFile.toFile().getAbsolutePath();
-            if (destDir != null) {
-              path = destDir.resolve(dbFile.getFileName()).toString();
-            }
-            // if the file is in the om checkpoint dir, then we need to change 
the path to point to the OM DB.
-            if (path.contains(OM_CHECKPOINT_DIR)) {
-              path = 
getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
-            }
-            hardLinkFileMap.put(path, fileId);
+          hardLinkFileMap.put(path, fileId);
+        }
+        if (!sstFilesToExclude.contains(fileId)) {
+          long fileSize = Files.size(dbFile);
+          if (maxTotalSstSize.get() - fileSize <= 0) {
+            return false;
           }
-          if (!sstFilesToExclude.contains(fileId)) {
-            long fileSize = Files.size(dbFile);
-            if (maxTotalSstSize.get() - fileSize <= 0) {
-              return false;
-            }
-            bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, 
archiveOutputStream, tmpDir);
-            filesWritten++;
-            maxTotalSstSize.addAndGet(-fileSize);
-            sstFilesToExclude.add(fileId);
-            if (Time.monotonicNow() - lastLoggedTime >= 30000) {
-              LOG.info("Transferred {} KB, #files {} to checkpoint tarball 
stream...",
-                  bytesWritten / (1024), filesWritten);
-              lastLoggedTime = Time.monotonicNow();
-            }
+          bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, 
archiveOutputStream, tmpDir);
+          filesWritten++;
+          maxTotalSstSize.addAndGet(-fileSize);
+          sstFilesToExclude.add(fileId);
+          if (Time.monotonicNow() - lastLoggedTime >= 30000) {
+            LOG.info("Transferred {} KB, #files {} to checkpoint tarball 
stream...",
+                bytesWritten / (1024), filesWritten);
+            lastLoggedTime = Time.monotonicNow();
           }
         }
       }
@@ -480,21 +479,33 @@ private boolean writeDBToArchive(Set<String> 
sstFilesToExclude, Path dbDir, Atom
    * The copy to the temporary directory for compaction log and SST backup 
files
    * is done to maintain a consistent view of the files in these directories.
    *
-   * @param tmpdir Temporary directory for storing checkpoint-related files.
    * @param flush  If true, flushes in-memory data to disk before 
checkpointing.
-   * @return The created database checkpoint.
    * @throws IOException If an error occurs during checkpoint creation or file 
copying.
    */
-  private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) 
throws IOException {
-    // make tmp directories to contain the copies
-    Path tmpCompactionLogDir = 
tmpdir.resolve(getCompactionLogDir().getFileName());
-    Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
+  private DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws 
IOException {
+    // Create & return the checkpoint.
+    return getDbStore().getCheckpoint(flush);
+  }
+
+  private List<Path> extractSSTFilesFromCompactionLog(DBCheckpoint 
dbCheckpoint) throws IOException {
+    List<Path> sstFiles = new ArrayList<>();
+    try (OmMetadataManagerImpl checkpointMetadataManager =
+             OmMetadataManagerImpl.createCheckpointMetadataManager(getConf(), 
dbCheckpoint)) {
+      try (Table.KeyValueIterator<String, CompactionLogEntry>
+               iterator = 
checkpointMetadataManager.getCompactionLogTable().iterator()) {
+        iterator.seekToFirst();
 
-    // Create checkpoint and then copy the files so that it has all the 
compaction entries and files.
-    DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
-    FileUtils.copyDirectory(getCompactionLogDir().toFile(), 
tmpCompactionLogDir.toFile());
-    OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(), 
tmpSstBackupDir.toFile());
+        Path sstBackupDir = getSstBackupDir();
 
-    return dbCheckpoint;
+        while (iterator.hasNext()) {
+          CompactionLogEntry logEntry = iterator.next().getValue();
+          logEntry.getInputFileInfoList().forEach(f ->
+              sstFiles.add(sstBackupDir.resolve(f.getFileName() + 
ROCKSDB_SST_SUFFIX)));
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException("Error reading compaction log from checkpoint", e);
+    }
+    return sstFiles;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to