This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1b595f3cf72 HDDS-13770. SstBackup Directory can have orphan files
after bootstrap not present in compaction log table (#9169)
1b595f3cf72 is described below
commit 1b595f3cf721ea53844bc6973e56eefbb235cc6f
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Thu Oct 23 07:37:45 2025 -0700
HDDS-13770. SstBackup Directory can have orphan files after bootstrap not
present in compaction log table (#9169)
Reviewed-by: Swaminathan Balachandran <[email protected]>
Reviewed-by: Sadanand Shenoy <[email protected]>
---
.../om/OMDBCheckpointServletInodeBasedXfer.java | 141 +++++++++++----------
1 file changed, 76 insertions(+), 65 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index f967e30ec52..27e7f1c2d6d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -44,11 +44,13 @@
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -64,6 +66,7 @@
import org.apache.hadoop.hdds.recon.ReconConfig;
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -71,6 +74,7 @@
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -249,7 +253,9 @@ public void writeDbDataToStream(HttpServletRequest request,
OutputStream destina
if (shouldContinue) {
// we finished transferring files from snapshot DB's by now and
// this is the last step where we transfer the active om.db contents
- checkpoint = createAndPrepareCheckpoint(tmpdir, true);
+ // get the list of sst files of the checkpoint.
+ checkpoint = createAndPrepareCheckpoint(true);
+ List<Path> sstBackupFiles =
extractSSTFilesFromCompactionLog(checkpoint);
// unlimited files as we want the Active DB contents to be transferred
in a single batch
maxTotalSstSize.set(Long.MAX_VALUE);
Path checkpointDir = checkpoint.getCheckpointLocation();
@@ -257,12 +263,10 @@ public void writeDbDataToStream(HttpServletRequest
request, OutputStream destina
writeDBToArchive(sstFilesToExclude, checkpointDir,
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
false);
if (includeSnapshotData) {
- Path tmpCompactionLogDir =
tmpdir.resolve(getCompactionLogDir().getFileName());
- Path tmpSstBackupDir =
tmpdir.resolve(getSstBackupDir().getFileName());
- writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir,
maxTotalSstSize, archiveOutputStream, tmpdir,
- hardLinkFileMap, getCompactionLogDir(), false);
- writeDBToArchive(sstFilesToExclude, tmpSstBackupDir,
maxTotalSstSize, archiveOutputStream, tmpdir,
- hardLinkFileMap, getSstBackupDir(), false);
+ writeDBToArchive(sstFilesToExclude, getCompactionLogDir(),
maxTotalSstSize, archiveOutputStream, tmpdir,
+ hardLinkFileMap, false);
+ writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
false);
// This is done to ensure all data to be copied correctly is flushed
in the snapshot DB
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths,
maxTotalSstSize,
archiveOutputStream, hardLinkFileMap);
@@ -317,14 +321,6 @@ private void transferSnapshotData(Set<String>
sstFilesToExclude, Path tmpdir, Se
}
}
- @VisibleForTesting
- boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
- AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry>
archiveOutputStream,
- Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile)
throws IOException {
- return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
- archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile);
- }
-
private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
if (checkpoint != null) {
try {
@@ -402,18 +398,30 @@ Set<Path> getSnapshotDirs(OMMetadataManager
omMetadataManager) throws IOExceptio
return snapshotPaths;
}
+ @VisibleForTesting
+ boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir,
AtomicLong maxTotalSstSize,
+ ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
+ Map<String, String> hardLinkFileMap, boolean onlySstFile) throws
IOException {
+ if (!Files.exists(dbDir)) {
+ LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
+ return true;
+ }
+ Stream<Path> files = Files.list(dbDir);
+ return writeDBToArchive(sstFilesToExclude, files,
+ maxTotalSstSize, archiveOutputStream, tmpDir, hardLinkFileMap,
onlySstFile);
+ }
+
/**
* Writes database files to the archive, handling deduplication based on
inode IDs.
* Here the dbDir could either be a snapshot db directory, the active om.db,
* compaction log dir, sst backup dir.
*
* @param sstFilesToExclude Set of SST file IDs to exclude from the archive
- * @param dbDir Directory containing database files to archive
+ * @param files Stream of files to archive
* @param maxTotalSstSize Maximum total size of SST files to include
* @param archiveOutputStream Archive output stream
* @param tmpDir Temporary directory for processing
* @param hardLinkFileMap Map of hardlink file paths to their unique
identifiers for deduplication
- * @param destDir Destination directory for the archived files. If null,
* the archived files are not moved to this directory.
* @param onlySstFile If true, only SST files are processed. If false, all
files are processed.
* <p>
@@ -424,49 +432,40 @@ Set<Path> getSnapshotDirs(OMMetadataManager
omMetadataManager) throws IOExceptio
* @throws IOException if an I/O error occurs
*/
@SuppressWarnings("checkstyle:ParameterNumber")
- private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir,
AtomicLong maxTotalSstSize,
+ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Stream<Path>
files, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
- Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile)
throws IOException {
- if (!Files.exists(dbDir)) {
- LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
- return true;
- }
+ Map<String, String> hardLinkFileMap, boolean onlySstFile) throws
IOException {
long bytesWritten = 0L;
int filesWritten = 0;
long lastLoggedTime = Time.monotonicNow();
- try (Stream<Path> files = Files.list(dbDir)) {
- Iterable<Path> iterable = files::iterator;
- for (Path dbFile : iterable) {
- if (!Files.isDirectory(dbFile)) {
- if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
- continue;
+ Iterable<Path> iterable = files::iterator;
+ for (Path dbFile : iterable) {
+ if (!Files.isDirectory(dbFile)) {
+ if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
+ continue;
+ }
+ String fileId =
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
+ if (hardLinkFileMap != null) {
+ String path = dbFile.toFile().getAbsolutePath();
+ // if the file is in the om checkpoint dir, then we need to change
the path to point to the OM DB.
+ if (path.contains(OM_CHECKPOINT_DIR)) {
+ path =
getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
}
- String fileId =
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
- if (hardLinkFileMap != null) {
- String path = dbFile.toFile().getAbsolutePath();
- if (destDir != null) {
- path = destDir.resolve(dbFile.getFileName()).toString();
- }
- // if the file is in the om checkpoint dir, then we need to change
the path to point to the OM DB.
- if (path.contains(OM_CHECKPOINT_DIR)) {
- path =
getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
- }
- hardLinkFileMap.put(path, fileId);
+ hardLinkFileMap.put(path, fileId);
+ }
+ if (!sstFilesToExclude.contains(fileId)) {
+ long fileSize = Files.size(dbFile);
+ if (maxTotalSstSize.get() - fileSize <= 0) {
+ return false;
}
- if (!sstFilesToExclude.contains(fileId)) {
- long fileSize = Files.size(dbFile);
- if (maxTotalSstSize.get() - fileSize <= 0) {
- return false;
- }
- bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId,
archiveOutputStream, tmpDir);
- filesWritten++;
- maxTotalSstSize.addAndGet(-fileSize);
- sstFilesToExclude.add(fileId);
- if (Time.monotonicNow() - lastLoggedTime >= 30000) {
- LOG.info("Transferred {} KB, #files {} to checkpoint tarball
stream...",
- bytesWritten / (1024), filesWritten);
- lastLoggedTime = Time.monotonicNow();
- }
+ bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId,
archiveOutputStream, tmpDir);
+ filesWritten++;
+ maxTotalSstSize.addAndGet(-fileSize);
+ sstFilesToExclude.add(fileId);
+ if (Time.monotonicNow() - lastLoggedTime >= 30000) {
+ LOG.info("Transferred {} KB, #files {} to checkpoint tarball
stream...",
+ bytesWritten / (1024), filesWritten);
+ lastLoggedTime = Time.monotonicNow();
}
}
}
@@ -480,21 +479,33 @@ private boolean writeDBToArchive(Set<String>
sstFilesToExclude, Path dbDir, Atom
* The copy to the temporary directory for compaction log and SST backup
files
* is done to maintain a consistent view of the files in these directories.
*
- * @param tmpdir Temporary directory for storing checkpoint-related files.
* @param flush If true, flushes in-memory data to disk before
checkpointing.
- * @return The created database checkpoint.
* @throws IOException If an error occurs during checkpoint creation or file
copying.
*/
- private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush)
throws IOException {
- // make tmp directories to contain the copies
- Path tmpCompactionLogDir =
tmpdir.resolve(getCompactionLogDir().getFileName());
- Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
+ private DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws
IOException {
+ // Create & return the checkpoint.
+ return getDbStore().getCheckpoint(flush);
+ }
+
+ private List<Path> extractSSTFilesFromCompactionLog(DBCheckpoint
dbCheckpoint) throws IOException {
+ List<Path> sstFiles = new ArrayList<>();
+ try (OmMetadataManagerImpl checkpointMetadataManager =
+ OmMetadataManagerImpl.createCheckpointMetadataManager(getConf(),
dbCheckpoint)) {
+ try (Table.KeyValueIterator<String, CompactionLogEntry>
+ iterator =
checkpointMetadataManager.getCompactionLogTable().iterator()) {
+ iterator.seekToFirst();
- // Create checkpoint and then copy the files so that it has all the
compaction entries and files.
- DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
- FileUtils.copyDirectory(getCompactionLogDir().toFile(),
tmpCompactionLogDir.toFile());
- OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(),
tmpSstBackupDir.toFile());
+ Path sstBackupDir = getSstBackupDir();
- return dbCheckpoint;
+ while (iterator.hasNext()) {
+ CompactionLogEntry logEntry = iterator.next().getValue();
+ logEntry.getInputFileInfoList().forEach(f ->
+ sstFiles.add(sstBackupDir.resolve(f.getFileName() +
ROCKSDB_SST_SUFFIX)));
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Error reading compaction log from checkpoint", e);
+ }
+ return sstFiles;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]