This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a646b85b82 HDDS-12557. Add progress indicator for checkpoint tarball 
in leader OM (#8085)
a646b85b82 is described below

commit a646b85b82c5158e15744ae849bb91c8072bbe66
Author: SaketaChalamchala <[email protected]>
AuthorDate: Thu Mar 20 04:52:46 2025 -0700

    HDDS-12557. Add progress indicator for checkpoint tarball in leader OM 
(#8085)
---
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  6 +-
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     | 90 ++++++++++++++++++----
 2 files changed, 79 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index f554990fc9..739a764939 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -625,18 +625,20 @@ public static void writeDBCheckpointToStream(
     }
   }
 
-  public static void includeFile(File file, String entryName,
+  public static long includeFile(File file, String entryName,
                                  ArchiveOutputStream archiveOutputStream)
       throws IOException {
     ArchiveEntry archiveEntry =
         archiveOutputStream.createArchiveEntry(file, entryName);
     archiveOutputStream.putArchiveEntry(archiveEntry);
+    long bytesWritten;
     try (InputStream fis = Files.newInputStream(file.toPath())) {
-      IOUtils.copy(fis, archiveOutputStream);
+      bytesWritten = IOUtils.copy(fis, archiveOutputStream);
       archiveOutputStream.flush();
     } finally {
       archiveOutputStream.closeArchiveEntry();
     }
+    return bytesWritten;
   }
 
   // Mark tarball completed.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 7c02df4beb..e7df608aed 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om;
 
+import static org.apache.commons.io.filefilter.TrueFileFilter.TRUE;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile;
 import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
@@ -56,6 +57,11 @@
 import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.file.Counters;
+import org.apache.commons.io.file.CountingPathVisitor;
+import org.apache.commons.io.file.PathFilter;
+import org.apache.commons.io.filefilter.SuffixFileFilter;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.recon.ReconConfig;
 import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
@@ -69,6 +75,7 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,6 +100,8 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
   private static final long serialVersionUID = 1L;
   private transient BootstrapStateHandler.Lock lock;
   private long maxTotalSstSize = 0;
+  private static final PathFilter SST_FILE_FILTER =
+      new SuffixFileFilter(ROCKSDB_SST_SUFFIX, IOCase.INSENSITIVE);
 
   @Override
   public void init() throws ServletException {
@@ -167,6 +176,7 @@ public void writeDbDataToStream(DBCheckpoint checkpoint,
       // Files to be excluded from tarball
       Map<String, Map<Path, Path>> sstFilesToExclude = 
normalizeExcludeList(toExcludeList,
           checkpoint.getCheckpointLocation(), sstBackupDir);
+
       boolean completed = getFilesForArchive(checkpoint, copyFiles,
           hardLinkFiles, sstFilesToExclude, includeSnapshotData(request),
           excludedList, sstBackupDir, compactionLogDir);
@@ -270,13 +280,13 @@ public File getTmpDir() {
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   private boolean getFilesForArchive(DBCheckpoint checkpoint,
-                                  Map<String, Map<Path, Path>> copyFiles,
-                                  Map<Path, Path> hardLinkFiles,
-                                  Map<String, Map<Path, Path>> 
sstFilesToExclude,
-                                  boolean includeSnapshotData,
-                                  List<String> excluded,
-                                  DirectoryData sstBackupDir,
-                                  DirectoryData compactionLogDir)
+      Map<String, Map<Path, Path>> copyFiles,
+      Map<Path, Path> hardLinkFiles,
+      Map<String, Map<Path, Path>> sstFilesToExclude,
+      boolean includeSnapshotData,
+      List<String> excluded,
+      DirectoryData sstBackupDir,
+      DirectoryData compactionLogDir)
       throws IOException {
 
     maxTotalSstSize = getConf().getLong(
@@ -290,6 +300,12 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
     }
 
     AtomicLong copySize = new AtomicLong(0L);
+
+    // Log estimated total data transferred on first request.
+    if (sstFilesToExclude.isEmpty()) {
+      logEstimatedTarballSize(checkpoint, includeSnapshotData);
+    }
+
     // Get the active fs files.
     Path dir = checkpoint.getCheckpointLocation();
     if (!processDir(dir, copyFiles, hardLinkFiles, sstFilesToExclude,
@@ -302,7 +318,7 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
     }
 
     // Get the snapshot files.
-    Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
+    Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true);
     Path snapshotDir = getSnapshotDir();
     if (!processDir(snapshotDir, copyFiles, hardLinkFiles, sstFilesToExclude,
         snapshotPaths, excluded, copySize, null)) {
@@ -320,16 +336,40 @@ private boolean getFilesForArchive(DBCheckpoint 
checkpoint,
         hardLinkFiles, sstFilesToExclude,
         new HashSet<>(), excluded, copySize,
         compactionLogDir.getOriginalDir().toPath());
+  }
 
+  private void logEstimatedTarballSize(
+      DBCheckpoint checkpoint, boolean includeSnapshotData) {
+    try {
+      Counters.PathCounters counters = Counters.longPathCounters();
+      CountingPathVisitor visitor = new CountingPathVisitor(
+          counters, SST_FILE_FILTER, TRUE);
+      Files.walkFileTree(checkpoint.getCheckpointLocation(), visitor);
+      int totalSnapshots = 0;
+      if (includeSnapshotData) {
+        Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, false);
+        totalSnapshots = snapshotPaths.size();
+        for (Path snapshotDir: snapshotPaths) {
+          Files.walkFileTree(snapshotDir, visitor);
+        }
+      }
+      LOG.info("Estimates for Checkpoint Tarball Stream - Data size: {} KB, " 
+ "SST files: {}{}",
+          counters.getByteCounter().get() / (1024),
+          counters.getFileCounter().get(),
+          (includeSnapshotData ? ", snapshots: " + totalSnapshots : ""));
+    } catch (Exception e) {
+      LOG.error("Could not estimate size of transfer to Checkpoint Tarball 
Stream.", e);
+    }
   }
 
   /**
    * The snapshotInfo table may contain a snapshot that
    * doesn't yet exist on the fs, so wait a few seconds for it.
    * @param checkpoint Checkpoint containing snapshot entries expected.
+   * @param waitForDir Wait for dir to exist on fs.
    * @return Set of expected snapshot dirs.
    */
-  private Set<Path> waitForSnapshotDirs(DBCheckpoint checkpoint)
+  private Set<Path> getSnapshotDirs(DBCheckpoint checkpoint, boolean 
waitForDir)
       throws IOException {
 
     OzoneConfiguration conf = getConf();
@@ -348,7 +388,9 @@ private Set<Path> waitForSnapshotDirs(DBCheckpoint 
checkpoint)
       while (iterator.hasNext()) {
         Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
         Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
-        waitForDirToExist(path);
+        if (waitForDir) {
+          waitForDirToExist(path);
+        }
         snapshotPaths.add(path);
       }
     } finally {
@@ -552,17 +594,21 @@ private void writeFilesToArchive(
         e.getKey().getFileName().toString().toLowerCase().endsWith(".sst")).
         collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
+    long bytesWritten = 0L;
+    int filesWritten = 0;
+    long lastLoggedTime = Time.monotonicNow();
+
     // Go through each of the files to be copied and add to archive.
     for (Map.Entry<Path, Path> entry : filteredCopyFiles.entrySet()) {
-      Path file = entry.getValue();
+      Path path = entry.getValue();
 
       // Confirm the data is in the right place.
-      if (!file.toString().startsWith(metaDirPath.toString())) {
+      if (!path.toString().startsWith(metaDirPath.toString())) {
         throw new IOException("tarball file not in metadata dir: "
-            + file + ": " + metaDirPath);
+            + path + ": " + metaDirPath);
       }
 
-      String fixedFile = truncateFileName(truncateLength, file);
+      String fixedFile = truncateFileName(truncateLength, path);
       if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
         // checkpoint files go to root of tarball
         Path f = Paths.get(fixedFile).getFileName();
@@ -570,7 +616,17 @@ private void writeFilesToArchive(
           fixedFile = f.toString();
         }
       }
-      includeFile(entry.getKey().toFile(), fixedFile, archiveOutputStream);
+      File file = entry.getKey().toFile();
+      if (!file.isDirectory()) {
+        filesWritten++;
+      }
+      bytesWritten += includeFile(file, fixedFile, archiveOutputStream);
+      // Log progress every 30 seconds
+      if (Time.monotonicNow() - lastLoggedTime >= 30000) {
+        LOG.info("Transferred {} KB, #files {} to checkpoint tarball 
stream...",
+            bytesWritten / (1024), filesWritten);
+        lastLoggedTime = Time.monotonicNow();
+      }
     }
 
     if (completed) {
@@ -595,6 +651,10 @@ private void writeFilesToArchive(
       // Mark tarball completed.
       includeRatisSnapshotCompleteFlag(archiveOutputStream);
     }
+    LOG.info("Completed transfer of {} KB, #files {} " +
+            "to checkpoint tarball stream.{}",
+        bytesWritten / (1024), filesWritten, (completed) ?
+            " Checkpoint tarball is complete." : "");
   }
 
   @Nonnull


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to