This is an automated email from the ASF dual-hosted git repository.

sshenoy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2870126afbf HDDS-13500. Transfer Non SST Files in the last batch of 
the tarball transfer. (#8857)
2870126afbf is described below

commit 2870126afbf96514a210b59204977a0e0d064e9d
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Wed Jul 30 13:56:28 2025 +0530

    HDDS-13500. Transfer Non SST Files in the last batch of the tarball 
transfer. (#8857)
---
 .../TestOMDbCheckpointServletInodeBasedXfer.java   | 73 +++++++++++++++++++++-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 13 ++--
 .../om/OMDBCheckpointServletInodeBasedXfer.java    | 38 +++++++----
 3 files changed, 107 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
index 454f7a91811..724a26c4cb3 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
@@ -33,13 +33,17 @@
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.BufferedReader;
@@ -59,6 +63,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -67,6 +72,8 @@
 import javax.servlet.WriteListener;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -74,6 +81,7 @@
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Archiver;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -94,6 +102,9 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.DBOptions;
@@ -169,7 +180,10 @@ public void write(int b) throws IOException {
 
     omDbCheckpointServletMock = 
mock(OMDBCheckpointServletInodeBasedXfer.class);
 
-    BootstrapStateHandler.Lock lock = new OMDBCheckpointServlet.Lock(om);
+    BootstrapStateHandler.Lock lock = null;
+    if (om != null) {
+      lock = new OMDBCheckpointServlet.Lock(om);
+    }
     doCallRealMethod().when(omDbCheckpointServletMock).init();
     
assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore());
 
@@ -195,6 +209,8 @@ public void write(int b) throws IOException {
 
     doCallRealMethod().when(omDbCheckpointServletMock)
         .writeDbDataToStream(any(), any(), any(), any(), any());
+    doCallRealMethod().when(omDbCheckpointServletMock)
+        .writeDBToArchive(any(), any(), any(), any(), any(), any(), 
anyBoolean());
 
     when(omDbCheckpointServletMock.getBootstrapStateLock())
         .thenReturn(lock);
@@ -308,6 +324,61 @@ public void testSnapshotDBConsistency() throws Exception {
     assertNotNull(value);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWriteDBToArchive(boolean expectOnlySstFiles) throws 
Exception {
+    setupMocks();
+    Path dbDir = folder.resolve("db_data");
+    Files.createDirectories(dbDir);
+    // Create dummy files: one SST, one non-SST
+    Path sstFile = dbDir.resolve("test.sst");
+    Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); // 
Write some content to make it non-empty
+
+    Path nonSstFile = dbDir.resolve("test.log");
+    Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8));
+    Set<String> sstFilesToExclude = new HashSet<>();
+    AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size
+    Map<String, String> hardLinkFileMap = new java.util.HashMap<>();
+    Path tmpDir = folder.resolve("tmp");
+    Files.createDirectories(tmpDir);
+    TarArchiveOutputStream mockArchiveOutputStream = 
mock(TarArchiveOutputStream.class);
+    List<String> fileNames = new ArrayList<>();
+    try (MockedStatic<Archiver> archiverMock = mockStatic(Archiver.class)) {
+      archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(), 
any())).thenAnswer(invocation -> {
+        // Get the actual mockArchiveOutputStream passed from writeDBToArchive
+        TarArchiveOutputStream aos = invocation.getArgument(2);
+        File sourceFile = invocation.getArgument(0);
+        String fileId = invocation.getArgument(1);
+        fileNames.add(sourceFile.getName());
+        aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId));
+        aos.write(new byte[100], 0, 100); // Simulate writing
+        aos.closeArchiveEntry();
+        return 100L;
+      });
+      boolean success = omDbCheckpointServletMock.writeDBToArchive(
+          sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream,
+              tmpDir, hardLinkFileMap, expectOnlySstFiles);
+      assertTrue(success);
+      verify(mockArchiveOutputStream, 
times(fileNames.size())).putArchiveEntry(any());
+      verify(mockArchiveOutputStream, 
times(fileNames.size())).closeArchiveEntry();
+      verify(mockArchiveOutputStream, 
times(fileNames.size())).write(any(byte[].class), anyInt(),
+          anyInt()); // verify write was called once
+
+      boolean containsNonSstFile = false;
+      for (String fileName : fileNames) {
+        if (expectOnlySstFiles) {
+          assertTrue(fileName.endsWith(".sst"), "File is not an SST File");
+        } else {
+          containsNonSstFile = true;
+        }
+      }
+
+      if (!expectOnlySstFiles) {
+        assertTrue(containsNonSstFile, "SST File is not expected");
+      }
+    }
+  }
+
   private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
     try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
       List<Path> files = filesInTarball.filter(p -> 
p.toString().contains(".log"))
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 814294dd36c..f6a8438db4f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -1158,7 +1158,7 @@ public void pause() throws IOException {
       // max size config.  That way next time through, we get multiple
       // tarballs.
       if (count == 1) {
-        long sstSize = getSizeOfFiles(tarball);
+        long sstSize = getSizeOfSstFiles(tarball);
         om.getConfiguration().setLong(
             OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2);
         // Now empty the tarball to restart the download
@@ -1172,13 +1172,16 @@ public void pause() throws IOException {
     }
 
     // Get Size of sstfiles in tarball.
-    private long getSizeOfFiles(File tarball) throws IOException {
+    private long getSizeOfSstFiles(File tarball) throws IOException {
       FileUtil.unTar(tarball, tempDir.toFile());
-      List<Path> sstPaths = Files.walk(tempDir).
-          collect(Collectors.toList());
+      OmSnapshotUtils.createHardLinks(tempDir, true);
+      List<Path> sstPaths = Files.list(tempDir).collect(Collectors.toList());
       long totalFileSize = 0;
       for (Path sstPath : sstPaths) {
-        totalFileSize += Files.size(sstPath);
+        File file = sstPath.toFile();
+        if (file.isFile() && file.getName().endsWith(".sst")) {
+          totalFileSize += Files.size(sstPath);
+        }
       }
       return totalFileSize;
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index c0e9d74ec54..97e345e7677 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -24,6 +24,7 @@
 import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
@@ -32,6 +33,7 @@
 import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
 import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -223,18 +225,18 @@ public void writeDbDataToStream(HttpServletRequest 
request, OutputStream destina
             break;
           }
           shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath,
-              maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+              maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, 
true);
         }
 
 
         if (shouldContinue) {
           shouldContinue = writeDBToArchive(sstFilesToExclude, 
getSstBackupDir(),
-              maxTotalSstSize, archiveOutputStream,  tmpdir, hardLinkFileMap);
+              maxTotalSstSize, archiveOutputStream,  tmpdir, hardLinkFileMap, 
true);
         }
 
         if (shouldContinue) {
           shouldContinue = writeDBToArchive(sstFilesToExclude, 
getCompactionLogDir(),
-              maxTotalSstSize, archiveOutputStream,  tmpdir, hardLinkFileMap);
+              maxTotalSstSize, archiveOutputStream,  tmpdir, hardLinkFileMap, 
true);
         }
       }
 
@@ -246,14 +248,14 @@ public void writeDbDataToStream(HttpServletRequest 
request, OutputStream destina
         maxTotalSstSize.set(Long.MAX_VALUE);
         Path checkpointDir = checkpoint.getCheckpointLocation();
         writeDBToArchive(sstFilesToExclude, checkpointDir,
-            maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+            maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, 
false);
         if (includeSnapshotData) {
           Path tmpCompactionLogDir = 
tmpdir.resolve(getCompactionLogDir().getFileName());
           Path tmpSstBackupDir = 
tmpdir.resolve(getSstBackupDir().getFileName());
           writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, 
maxTotalSstSize, archiveOutputStream, tmpdir,
-              hardLinkFileMap, getCompactionLogDir());
+              hardLinkFileMap, getCompactionLogDir(), false);
           writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, 
maxTotalSstSize, archiveOutputStream, tmpdir,
-              hardLinkFileMap, getSstBackupDir());
+              hardLinkFileMap, getSstBackupDir(), false);
           // This is done to ensure all data to be copied correctly is flushed 
in the snapshot DB
           transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, 
maxTotalSstSize,
               archiveOutputStream, hardLinkFileMap);
@@ -293,18 +295,20 @@ private void transferSnapshotData(Set<String> 
sstFilesToExclude, Path tmpdir, Se
       try {
         // invalidate closes the snapshot DB
         
om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId));
-        writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, 
archiveOutputStream, tmpdir, hardLinkFileMap);
+        writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, 
archiveOutputStream, tmpdir,
+            hardLinkFileMap, false);
       } finally {
         omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK, 
snapshotId);
       }
     }
   }
 
-  private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
+  @VisibleForTesting
+  boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
       AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> 
archiveOutputStream,
-      Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException {
+      Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile) 
throws IOException {
     return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
-        archiveOutputStream, tmpdir, hardLinkFileMap, null);
+        archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile);
   }
 
   private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
@@ -394,12 +398,21 @@ Set<Path> getSnapshotDirs(OMMetadataManager 
omMetadataManager) throws IOExceptio
    * @param maxTotalSstSize Maximum total size of SST files to include
    * @param archiveOutputStream Archive output stream
    * @param tmpDir Temporary directory for processing
+   * @param hardLinkFileMap Map of hardlink file paths to their unique 
identifiers for deduplication
+   * @param destDir Destination directory for the archived files. If null,
+   * the archived files are not moved to this directory.
+   * @param onlySstFile If true, only SST files are processed. If false, all 
files are processed.
+   * <p>
+   * This parameter is typically set to {@code true} for initial iterations to
+   * prioritize SST file transfer, and then set to {@code false} only for the
+   * final iteration to ensure all remaining file types are transferred.
    * @return true if processing should continue, false if size limit reached
    * @throws IOException if an I/O error occurs
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, 
AtomicLong maxTotalSstSize,
       ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
-      Map<String, String> hardLinkFileMap, Path destDir) throws IOException {
+      Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) 
throws IOException {
     if (!Files.exists(dbDir)) {
       LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
       return true;
@@ -411,6 +424,9 @@ private boolean writeDBToArchive(Set<String> 
sstFilesToExclude, Path dbDir, Atom
       Iterable<Path> iterable = files::iterator;
       for (Path dbFile : iterable) {
         if (!Files.isDirectory(dbFile)) {
+          if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
+            continue;
+          }
           String fileId = 
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
           String path = dbFile.toFile().getAbsolutePath();
           if (destDir != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to