This is an automated email from the ASF dual-hosted git repository.
sshenoy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2870126afbf HDDS-13500. Transfer Non SST Files in the last batch of
the tarball transfer. (#8857)
2870126afbf is described below
commit 2870126afbf96514a210b59204977a0e0d064e9d
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Wed Jul 30 13:56:28 2025 +0530
HDDS-13500. Transfer Non SST Files in the last batch of the tarball
transfer. (#8857)
---
.../TestOMDbCheckpointServletInodeBasedXfer.java | 73 +++++++++++++++++++++-
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 13 ++--
.../om/OMDBCheckpointServletInodeBasedXfer.java | 38 +++++++----
3 files changed, 107 insertions(+), 17 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
index 454f7a91811..724a26c4cb3 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
@@ -33,13 +33,17 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
@@ -59,6 +63,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -67,6 +72,8 @@
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -74,6 +81,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Archiver;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -94,6 +102,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
@@ -169,7 +180,10 @@ public void write(int b) throws IOException {
omDbCheckpointServletMock =
mock(OMDBCheckpointServletInodeBasedXfer.class);
- BootstrapStateHandler.Lock lock = new OMDBCheckpointServlet.Lock(om);
+ BootstrapStateHandler.Lock lock = null;
+ if (om != null) {
+ lock = new OMDBCheckpointServlet.Lock(om);
+ }
doCallRealMethod().when(omDbCheckpointServletMock).init();
assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore());
@@ -195,6 +209,8 @@ public void write(int b) throws IOException {
doCallRealMethod().when(omDbCheckpointServletMock)
.writeDbDataToStream(any(), any(), any(), any(), any());
+ doCallRealMethod().when(omDbCheckpointServletMock)
+ .writeDBToArchive(any(), any(), any(), any(), any(), any(),
anyBoolean());
when(omDbCheckpointServletMock.getBootstrapStateLock())
.thenReturn(lock);
@@ -308,6 +324,61 @@ public void testSnapshotDBConsistency() throws Exception {
assertNotNull(value);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWriteDBToArchive(boolean expectOnlySstFiles) throws
Exception {
+ setupMocks();
+ Path dbDir = folder.resolve("db_data");
+ Files.createDirectories(dbDir);
+ // Create dummy files: one SST, one non-SST
+ Path sstFile = dbDir.resolve("test.sst");
+ Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); //
Write some content to make it non-empty
+
+ Path nonSstFile = dbDir.resolve("test.log");
+ Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8));
+ Set<String> sstFilesToExclude = new HashSet<>();
+ AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size
+ Map<String, String> hardLinkFileMap = new java.util.HashMap<>();
+ Path tmpDir = folder.resolve("tmp");
+ Files.createDirectories(tmpDir);
+ TarArchiveOutputStream mockArchiveOutputStream =
mock(TarArchiveOutputStream.class);
+ List<String> fileNames = new ArrayList<>();
+ try (MockedStatic<Archiver> archiverMock = mockStatic(Archiver.class)) {
+ archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(),
any())).thenAnswer(invocation -> {
+ // Get the actual mockArchiveOutputStream passed from writeDBToArchive
+ TarArchiveOutputStream aos = invocation.getArgument(2);
+ File sourceFile = invocation.getArgument(0);
+ String fileId = invocation.getArgument(1);
+ fileNames.add(sourceFile.getName());
+ aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId));
+ aos.write(new byte[100], 0, 100); // Simulate writing
+ aos.closeArchiveEntry();
+ return 100L;
+ });
+ boolean success = omDbCheckpointServletMock.writeDBToArchive(
+ sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream,
+ tmpDir, hardLinkFileMap, expectOnlySstFiles);
+ assertTrue(success);
+ verify(mockArchiveOutputStream,
times(fileNames.size())).putArchiveEntry(any());
+ verify(mockArchiveOutputStream,
times(fileNames.size())).closeArchiveEntry();
+ verify(mockArchiveOutputStream,
times(fileNames.size())).write(any(byte[].class), anyInt(),
+ anyInt()); // verify write was called once
+
+ boolean containsNonSstFile = false;
+ for (String fileName : fileNames) {
+ if (expectOnlySstFiles) {
+ assertTrue(fileName.endsWith(".sst"), "File is not an SST File");
+ } else {
+ containsNonSstFile = true;
+ }
+ }
+
+ if (!expectOnlySstFiles) {
+ assertTrue(containsNonSstFile, "SST File is not expected");
+ }
+ }
+ }
+
private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
List<Path> files = filesInTarball.filter(p ->
p.toString().contains(".log"))
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 814294dd36c..f6a8438db4f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -1158,7 +1158,7 @@ public void pause() throws IOException {
// max size config. That way next time through, we get multiple
// tarballs.
if (count == 1) {
- long sstSize = getSizeOfFiles(tarball);
+ long sstSize = getSizeOfSstFiles(tarball);
om.getConfiguration().setLong(
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2);
// Now empty the tarball to restart the download
@@ -1172,13 +1172,16 @@ public void pause() throws IOException {
}
// Get Size of sstfiles in tarball.
- private long getSizeOfFiles(File tarball) throws IOException {
+ private long getSizeOfSstFiles(File tarball) throws IOException {
FileUtil.unTar(tarball, tempDir.toFile());
- List<Path> sstPaths = Files.walk(tempDir).
- collect(Collectors.toList());
+ OmSnapshotUtils.createHardLinks(tempDir, true);
+ List<Path> sstPaths = Files.list(tempDir).collect(Collectors.toList());
long totalFileSize = 0;
for (Path sstPath : sstPaths) {
- totalFileSize += Files.size(sstPath);
+ File file = sstPath.toFile();
+ if (file.isFile() && file.getName().endsWith(".sst")) {
+ totalFileSize += Files.size(sstPath);
+ }
}
return totalFileSize;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index c0e9d74ec54..97e345e7677 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -24,6 +24,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
@@ -32,6 +33,7 @@
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -223,18 +225,18 @@ public void writeDbDataToStream(HttpServletRequest
request, OutputStream destina
break;
}
shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath,
- maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
true);
}
if (shouldContinue) {
shouldContinue = writeDBToArchive(sstFilesToExclude,
getSstBackupDir(),
- maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
true);
}
if (shouldContinue) {
shouldContinue = writeDBToArchive(sstFilesToExclude,
getCompactionLogDir(),
- maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
true);
}
}
@@ -246,14 +248,14 @@ public void writeDbDataToStream(HttpServletRequest
request, OutputStream destina
maxTotalSstSize.set(Long.MAX_VALUE);
Path checkpointDir = checkpoint.getCheckpointLocation();
writeDBToArchive(sstFilesToExclude, checkpointDir,
- maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
false);
if (includeSnapshotData) {
Path tmpCompactionLogDir =
tmpdir.resolve(getCompactionLogDir().getFileName());
Path tmpSstBackupDir =
tmpdir.resolve(getSstBackupDir().getFileName());
writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir,
maxTotalSstSize, archiveOutputStream, tmpdir,
- hardLinkFileMap, getCompactionLogDir());
+ hardLinkFileMap, getCompactionLogDir(), false);
writeDBToArchive(sstFilesToExclude, tmpSstBackupDir,
maxTotalSstSize, archiveOutputStream, tmpdir,
- hardLinkFileMap, getSstBackupDir());
+ hardLinkFileMap, getSstBackupDir(), false);
// This is done to ensure all data to be copied correctly is flushed
in the snapshot DB
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths,
maxTotalSstSize,
archiveOutputStream, hardLinkFileMap);
@@ -293,18 +295,20 @@ private void transferSnapshotData(Set<String>
sstFilesToExclude, Path tmpdir, Se
try {
// invalidate closes the snapshot DB
om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId));
- writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize,
archiveOutputStream, tmpdir, hardLinkFileMap);
+ writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize,
archiveOutputStream, tmpdir,
+ hardLinkFileMap, false);
} finally {
omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK,
snapshotId);
}
}
}
- private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
+ @VisibleForTesting
+ boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry>
archiveOutputStream,
- Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException {
+ Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile)
throws IOException {
return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
- archiveOutputStream, tmpdir, hardLinkFileMap, null);
+ archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile);
}
private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
@@ -394,12 +398,21 @@ Set<Path> getSnapshotDirs(OMMetadataManager
omMetadataManager) throws IOExceptio
* @param maxTotalSstSize Maximum total size of SST files to include
* @param archiveOutputStream Archive output stream
* @param tmpDir Temporary directory for processing
+ * @param hardLinkFileMap Map of hardlink file paths to their unique
identifiers for deduplication
+ * @param destDir Destination directory for the archived files. If null,
+ * the archived files are not moved to this directory.
+ * @param onlySstFile If true, only SST files are processed. If false, all
files are processed.
+ * <p>
+ * This parameter is typically set to {@code true} for initial iterations to
+ * prioritize SST file transfer, and then set to {@code false} only for the
+ * final iteration to ensure all remaining file types are transferred.
* @return true if processing should continue, false if size limit reached
* @throws IOException if an I/O error occurs
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir,
AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
- Map<String, String> hardLinkFileMap, Path destDir) throws IOException {
+ Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile)
throws IOException {
if (!Files.exists(dbDir)) {
LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
return true;
@@ -411,6 +424,9 @@ private boolean writeDBToArchive(Set<String>
sstFilesToExclude, Path dbDir, Atom
Iterable<Path> iterable = files::iterator;
for (Path dbFile : iterable) {
if (!Files.isDirectory(dbFile)) {
+ if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
+ continue;
+ }
String fileId =
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
String path = dbFile.toFile().getAbsolutePath();
if (destDir != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]