This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a646b85b82 HDDS-12557. Add progress indicator for checkpoint tarball
in leader OM (#8085)
a646b85b82 is described below
commit a646b85b82c5158e15744ae849bb91c8072bbe66
Author: SaketaChalamchala <[email protected]>
AuthorDate: Thu Mar 20 04:52:46 2025 -0700
HDDS-12557. Add progress indicator for checkpoint tarball in leader OM
(#8085)
---
.../apache/hadoop/hdds/utils/HddsServerUtil.java | 6 +-
.../hadoop/ozone/om/OMDBCheckpointServlet.java | 90 ++++++++++++++++++----
2 files changed, 79 insertions(+), 17 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index f554990fc9..739a764939 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -625,18 +625,20 @@ public static void writeDBCheckpointToStream(
}
}
- public static void includeFile(File file, String entryName,
+ public static long includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream)
throws IOException {
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
+ long bytesWritten;
try (InputStream fis = Files.newInputStream(file.toPath())) {
- IOUtils.copy(fis, archiveOutputStream);
+ bytesWritten = IOUtils.copy(fis, archiveOutputStream);
archiveOutputStream.flush();
} finally {
archiveOutputStream.closeArchiveEntry();
}
+ return bytesWritten;
}
// Mark tarball completed.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 7c02df4beb..e7df608aed 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om;
+import static org.apache.commons.io.filefilter.TrueFileFilter.TRUE;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag;
import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
@@ -56,6 +57,11 @@
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.file.Counters;
+import org.apache.commons.io.file.CountingPathVisitor;
+import org.apache.commons.io.file.PathFilter;
+import org.apache.commons.io.filefilter.SuffixFileFilter;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.recon.ReconConfig;
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
@@ -69,6 +75,7 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,6 +100,8 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
private static final long serialVersionUID = 1L;
private transient BootstrapStateHandler.Lock lock;
private long maxTotalSstSize = 0;
+ private static final PathFilter SST_FILE_FILTER =
+ new SuffixFileFilter(ROCKSDB_SST_SUFFIX, IOCase.INSENSITIVE);
@Override
public void init() throws ServletException {
@@ -167,6 +176,7 @@ public void writeDbDataToStream(DBCheckpoint checkpoint,
// Files to be excluded from tarball
Map<String, Map<Path, Path>> sstFilesToExclude =
normalizeExcludeList(toExcludeList,
checkpoint.getCheckpointLocation(), sstBackupDir);
+
boolean completed = getFilesForArchive(checkpoint, copyFiles,
hardLinkFiles, sstFilesToExclude, includeSnapshotData(request),
excludedList, sstBackupDir, compactionLogDir);
@@ -270,13 +280,13 @@ public File getTmpDir() {
@SuppressWarnings("checkstyle:ParameterNumber")
private boolean getFilesForArchive(DBCheckpoint checkpoint,
- Map<String, Map<Path, Path>> copyFiles,
- Map<Path, Path> hardLinkFiles,
- Map<String, Map<Path, Path>>
sstFilesToExclude,
- boolean includeSnapshotData,
- List<String> excluded,
- DirectoryData sstBackupDir,
- DirectoryData compactionLogDir)
+ Map<String, Map<Path, Path>> copyFiles,
+ Map<Path, Path> hardLinkFiles,
+ Map<String, Map<Path, Path>> sstFilesToExclude,
+ boolean includeSnapshotData,
+ List<String> excluded,
+ DirectoryData sstBackupDir,
+ DirectoryData compactionLogDir)
throws IOException {
maxTotalSstSize = getConf().getLong(
@@ -290,6 +300,12 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
}
AtomicLong copySize = new AtomicLong(0L);
+
+ // Log estimated total data transferred on first request.
+ if (sstFilesToExclude.isEmpty()) {
+ logEstimatedTarballSize(checkpoint, includeSnapshotData);
+ }
+
// Get the active fs files.
Path dir = checkpoint.getCheckpointLocation();
if (!processDir(dir, copyFiles, hardLinkFiles, sstFilesToExclude,
@@ -302,7 +318,7 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
}
// Get the snapshot files.
- Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
+ Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true);
Path snapshotDir = getSnapshotDir();
if (!processDir(snapshotDir, copyFiles, hardLinkFiles, sstFilesToExclude,
snapshotPaths, excluded, copySize, null)) {
@@ -320,16 +336,40 @@ private boolean getFilesForArchive(DBCheckpoint
checkpoint,
hardLinkFiles, sstFilesToExclude,
new HashSet<>(), excluded, copySize,
compactionLogDir.getOriginalDir().toPath());
+ }
+ private void logEstimatedTarballSize(
+ DBCheckpoint checkpoint, boolean includeSnapshotData) {
+ try {
+ Counters.PathCounters counters = Counters.longPathCounters();
+ CountingPathVisitor visitor = new CountingPathVisitor(
+ counters, SST_FILE_FILTER, TRUE);
+ Files.walkFileTree(checkpoint.getCheckpointLocation(), visitor);
+ int totalSnapshots = 0;
+ if (includeSnapshotData) {
+ Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, false);
+ totalSnapshots = snapshotPaths.size();
+ for (Path snapshotDir: snapshotPaths) {
+ Files.walkFileTree(snapshotDir, visitor);
+ }
+ }
+ LOG.info("Estimates for Checkpoint Tarball Stream - Data size: {} KB, "
+ "SST files: {}{}",
+ counters.getByteCounter().get() / (1024),
+ counters.getFileCounter().get(),
+ (includeSnapshotData ? ", snapshots: " + totalSnapshots : ""));
+ } catch (Exception e) {
+ LOG.error("Could not estimate size of transfer to Checkpoint Tarball
Stream.", e);
+ }
}
/**
* The snapshotInfo table may contain a snapshot that
* doesn't yet exist on the fs, so wait a few seconds for it.
* @param checkpoint Checkpoint containing snapshot entries expected.
+ * @param waitForDir Wait for dir to exist on fs.
* @return Set of expected snapshot dirs.
*/
- private Set<Path> waitForSnapshotDirs(DBCheckpoint checkpoint)
+ private Set<Path> getSnapshotDirs(DBCheckpoint checkpoint, boolean
waitForDir)
throws IOException {
OzoneConfiguration conf = getConf();
@@ -348,7 +388,9 @@ private Set<Path> waitForSnapshotDirs(DBCheckpoint
checkpoint)
while (iterator.hasNext()) {
Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
- waitForDirToExist(path);
+ if (waitForDir) {
+ waitForDirToExist(path);
+ }
snapshotPaths.add(path);
}
} finally {
@@ -552,17 +594,21 @@ private void writeFilesToArchive(
e.getKey().getFileName().toString().toLowerCase().endsWith(".sst")).
collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ long bytesWritten = 0L;
+ int filesWritten = 0;
+ long lastLoggedTime = Time.monotonicNow();
+
// Go through each of the files to be copied and add to archive.
for (Map.Entry<Path, Path> entry : filteredCopyFiles.entrySet()) {
- Path file = entry.getValue();
+ Path path = entry.getValue();
// Confirm the data is in the right place.
- if (!file.toString().startsWith(metaDirPath.toString())) {
+ if (!path.toString().startsWith(metaDirPath.toString())) {
throw new IOException("tarball file not in metadata dir: "
- + file + ": " + metaDirPath);
+ + path + ": " + metaDirPath);
}
- String fixedFile = truncateFileName(truncateLength, file);
+ String fixedFile = truncateFileName(truncateLength, path);
if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
// checkpoint files go to root of tarball
Path f = Paths.get(fixedFile).getFileName();
@@ -570,7 +616,17 @@ private void writeFilesToArchive(
fixedFile = f.toString();
}
}
- includeFile(entry.getKey().toFile(), fixedFile, archiveOutputStream);
+ File file = entry.getKey().toFile();
+ if (!file.isDirectory()) {
+ filesWritten++;
+ }
+ bytesWritten += includeFile(file, fixedFile, archiveOutputStream);
+ // Log progress every 30 seconds
+ if (Time.monotonicNow() - lastLoggedTime >= 30000) {
+ LOG.info("Transferred {} KB, #files {} to checkpoint tarball
stream...",
+ bytesWritten / (1024), filesWritten);
+ lastLoggedTime = Time.monotonicNow();
+ }
}
if (completed) {
@@ -595,6 +651,10 @@ private void writeFilesToArchive(
// Mark tarball completed.
includeRatisSnapshotCompleteFlag(archiveOutputStream);
}
+ LOG.info("Completed transfer of {} KB, #files {} " +
+ "to checkpoint tarball stream.{}",
+ bytesWritten / (1024), filesWritten, (completed) ?
+ " Checkpoint tarball is complete." : "");
}
@Nonnull
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]