This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 96390ac1427 HDDS-12984. Use InodeID to identify the SST files inside
the tarball. (#8477)
96390ac1427 is described below
commit 96390ac142725195315906400a633f316c899702
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Wed Jun 25 21:28:38 2025 +0530
HDDS-12984. Use InodeID to identify the SST files inside the tarball.
(#8477)
---
.../org/apache/hadoop/hdds/utils/Archiver.java | 43 ++
.../hadoop/hdds/utils/DBCheckpointServlet.java | 12 +-
.../org/apache/hadoop/hdds/utils/TestArchiver.java | 89 +++++
.../hdds/scm/TestSCMDbCheckpointServlet.java | 2 +
.../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 2 +
.../TestOMDbCheckpointServletInodeBasedXfer.java | 368 +++++++++++++++++
.../om/OMDBCheckpointServletInodeBasedXfer.java | 436 +++++++++++++++++++++
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 9 +-
.../hadoop/ozone/om/snapshot/OmSnapshotUtils.java | 21 +
9 files changed, 977 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java
index 20bdc5d7629..eafe85853db 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java
@@ -38,12 +38,15 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Create and extract archives. */
public final class Archiver {
static final int MIN_BUFFER_SIZE = 8 * (int) OzoneConsts.KB; // same as
IOUtils.DEFAULT_BUFFER_SIZE
static final int MAX_BUFFER_SIZE = (int) OzoneConsts.MB;
+ private static final Logger LOG = LoggerFactory.getLogger(Archiver.class);
private Archiver() {
// no instances (for now)
@@ -111,6 +114,46 @@ public static long includeFile(File file, String entryName,
return bytes;
}
+ /**
+ * Creates a hard link to the specified file in the provided temporary
directory,
+ * adds the linked file as an entry to the archive with the given entry
name, writes
+ * its contents to the archive output, and then deletes the temporary hard
link.
+ * <p>
+ * This approach avoids altering the original file and works around
limitations
+ * of certain archiving libraries that may require the source file to be
present
+ * in a specific location or have a specific name. Any errors during the
hardlink
+ * creation or archiving process are logged.
+ * </p>
+ *
+ * @param file the file to be included in the archive
+ * @param entryName the name/path under which the file should appear in
the archive
+ * @param archiveOutput the output stream for the archive (e.g., tar)
+ * @param tmpDir the temporary directory in which to create the hard
link
+ * @return number of bytes copied to the archive for this file
+ * @throws IOException if an I/O error occurs other than hardlink creation
failure
+ */
+ public static long linkAndIncludeFile(File file, String entryName,
+ ArchiveOutputStream<TarArchiveEntry> archiveOutput, Path tmpDir) throws
IOException {
+ File link = tmpDir.resolve(entryName).toFile();
+ long bytes = 0;
+ try {
+ Files.createLink(link.toPath(), file.toPath());
+ TarArchiveEntry entry = archiveOutput.createArchiveEntry(link,
entryName);
+ archiveOutput.putArchiveEntry(entry);
+ try (InputStream input = Files.newInputStream(link.toPath())) {
+ bytes = IOUtils.copyLarge(input, archiveOutput);
+ }
+ archiveOutput.closeArchiveEntry();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't create hardlink for file {} while including it in
tarball.",
+ file.getAbsolutePath(), ioe);
+ throw ioe;
+ } finally {
+ Files.deleteIfExists(link.toPath());
+ }
+ return bytes;
+ }
+
public static void extractEntry(ArchiveEntry entry, InputStream input, long
size,
Path ancestor, Path path) throws IOException {
HddsUtils.validatePath(path, ancestor);
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
index 629fba1772a..118a17fbb5d 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
@@ -22,6 +22,7 @@
import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -122,6 +123,10 @@ public void initialize(DBStore store, DBCheckpointMetrics
metrics,
}
}
+ public File getBootstrapTempData() {
+ return bootstrapTempData;
+ }
+
private boolean hasPermission(UserGroupInformation user) {
// Check ACL for dbCheckpoint only when global Ozone ACL and SPNEGO is
// enabled
@@ -132,7 +137,7 @@ private boolean hasPermission(UserGroupInformation user) {
}
}
- private static void logSstFileList(Collection<String> sstList, String msg,
int sampleSize) {
+ protected static void logSstFileList(Collection<String> sstList, String msg,
int sampleSize) {
int count = sstList.size();
if (LOG.isDebugEnabled()) {
LOG.debug(msg, count, "", sstList);
@@ -199,7 +204,8 @@ private void generateSnapshotCheckpoint(HttpServletRequest
request,
processMetadataSnapshotRequest(request, response, isFormData, flush);
}
- private void processMetadataSnapshotRequest(HttpServletRequest request,
HttpServletResponse response,
+ @VisibleForTesting
+ public void processMetadataSnapshotRequest(HttpServletRequest request,
HttpServletResponse response,
boolean isFormData, boolean flush) {
List<String> excludedSstList = new ArrayList<>();
String[] sstParam = isFormData ?
@@ -292,7 +298,7 @@ public DBCheckpoint getCheckpoint(Path ignoredTmpdir,
boolean flush)
* @param request the HTTP servlet request
* @return array of parsed sst form data parameters for exclusion
*/
- private static String[] parseFormDataParameters(HttpServletRequest request) {
+ protected static String[] parseFormDataParameters(HttpServletRequest
request) {
ServletFileUpload upload = new ServletFileUpload();
List<String> sstParam = new ArrayList<>();
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java
index e175f957355..6c69f6fbaf5 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java
@@ -18,9 +18,31 @@
package org.apache.hadoop.hdds.utils;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.MockedStatic;
/** Test {@link Archiver}. */
class TestArchiver {
@@ -46,4 +68,71 @@ void bufferSizeAboveMaximum(long fileSize) {
.isEqualTo(Archiver.MAX_BUFFER_SIZE);
}
+ @Test
+ void testLinkAndIncludeFileSuccessfulHardLink() throws IOException {
+ Path tmpDir = Files.createTempDirectory("archiver-test");
+ File tempFile = File.createTempFile("test-file", ".txt");
+ String entryName = "test-entry";
+ Files.write(tempFile.toPath(), "Test
Content".getBytes(StandardCharsets.UTF_8));
+
+ TarArchiveOutputStream mockArchiveOutput =
mock(TarArchiveOutputStream.class);
+ TarArchiveEntry mockEntry = new TarArchiveEntry(entryName);
+ AtomicBoolean isHardLinkCreated = new AtomicBoolean(false);
+ when(mockArchiveOutput.createArchiveEntry(any(File.class), eq(entryName)))
+ .thenAnswer(invocation -> {
+ File linkFile = invocation.getArgument(0);
+ isHardLinkCreated.set(Files.isSameFile(tempFile.toPath(),
linkFile.toPath()));
+ return mockEntry;
+ });
+
+ // Call method under test
+ long bytesCopied = Archiver.linkAndIncludeFile(tempFile, entryName,
mockArchiveOutput, tmpDir);
+ assertEquals(Files.size(tempFile.toPath()), bytesCopied);
+ // Verify archive interactions
+ verify(mockArchiveOutput, times(1)).putArchiveEntry(mockEntry);
+ verify(mockArchiveOutput, times(1)).closeArchiveEntry();
+ assertTrue(isHardLinkCreated.get());
+ assertFalse(Files.exists(tmpDir.resolve(entryName)));
+ // Cleanup
+ assertTrue(tempFile.delete());
+ Files.deleteIfExists(tmpDir);
+ }
+
+ @Test
+ void testLinkAndIncludeFileFailedHardLink() throws IOException {
+ Path tmpDir = Files.createTempDirectory("archiver-test");
+ File tempFile = File.createTempFile("test-file", ".txt");
+ String entryName = "test-entry";
+ Files.write(tempFile.toPath(), "Test
Content".getBytes(StandardCharsets.UTF_8));
+
+ TarArchiveOutputStream mockArchiveOutput =
+ mock(TarArchiveOutputStream.class);
+ TarArchiveEntry mockEntry = new TarArchiveEntry("test-entry");
+ AtomicBoolean isHardLinkCreated = new AtomicBoolean(false);
+ when(mockArchiveOutput.createArchiveEntry(any(File.class), eq(entryName)))
+ .thenAnswer(invocation -> {
+ File linkFile = invocation.getArgument(0);
+ isHardLinkCreated.set(Files.isSameFile(tempFile.toPath(),
linkFile.toPath()));
+ return mockEntry;
+ });
+
+ // Mock static Files.createLink to throw IOException
+ try (MockedStatic<Files> mockedFiles = mockStatic(Files.class,
CALLS_REAL_METHODS)) {
+ Path linkPath = tmpDir.resolve(entryName);
+ String errorMessage = "Failed to create hardlink";
+ mockedFiles.when(() -> Files.createLink(linkPath, tempFile.toPath()))
+ .thenThrow(new IOException(errorMessage));
+
+ IOException thrown = assertThrows(IOException.class, () ->
+ Archiver.linkAndIncludeFile(tempFile, entryName, mockArchiveOutput,
tmpDir)
+ );
+
+ assertTrue(thrown.getMessage().contains(errorMessage));
+ }
+ assertFalse(isHardLinkCreated.get());
+
+ assertTrue(tempFile.delete());
+ Files.deleteIfExists(tmpDir);
+ }
+
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index fbd02d6f8e5..74bbbb7f8c1 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -122,6 +122,8 @@ public void init() throws Exception {
responseMock);
doCallRealMethod().when(scmDbCheckpointServletMock).getCheckpoint(any(),
anyBoolean());
+ doCallRealMethod().when(scmDbCheckpointServletMock)
+ .processMetadataSnapshotRequest(any(), any(), anyBoolean(),
anyBoolean());
servletContextMock = mock(ServletContext.class);
when(scmDbCheckpointServletMock.getServletContext())
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index a9440ef1515..d820822bf78 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -229,6 +229,8 @@ public void write(int b) throws IOException {
doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(),
anyBoolean());
+ doCallRealMethod().when(omDbCheckpointServletMock)
+ .processMetadataSnapshotRequest(any(), any(), anyBoolean(),
anyBoolean());
}
@Test
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
new file mode 100644
index 00000000000..37086b2c840
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
+import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Class used for testing the OM DB Checkpoint provider servlet using inode
based transfer logic.
+ */
+public class TestOMDbCheckpointServletInodeBasedXfer {
+
+ private MiniOzoneCluster cluster;
+ private OzoneClient client;
+ private OzoneManager om;
+ private OzoneConfiguration conf;
+ @TempDir
+ private Path folder;
+ private HttpServletRequest requestMock = null;
+ private HttpServletResponse responseMock = null;
+ private OMDBCheckpointServletInodeBasedXfer omDbCheckpointServletMock = null;
+ private ServletOutputStream servletOutputStream;
+ private File tempFile;
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ @BeforeEach
+ void init() throws Exception {
+ conf = new OzoneConfiguration();
+ }
+
+ @AfterEach
+ void shutdown() {
+ IOUtils.closeQuietly(client, cluster);
+ }
+
+ private void setupCluster() throws Exception {
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+ cluster.waitForClusterToBeReady();
+ client = cluster.newClient();
+ om = cluster.getOzoneManager();
+ conf.setBoolean(OZONE_ACL_ENABLED, false);
+ conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD);
+ }
+
+ private void setupMocks() throws Exception {
+ final Path tempPath = folder.resolve("temp" + COUNTER.incrementAndGet() +
".tar");
+ tempFile = tempPath.toFile();
+
+ servletOutputStream = new ServletOutputStream() {
+ private final OutputStream fileOutputStream =
Files.newOutputStream(tempPath);
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setWriteListener(WriteListener writeListener) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ fileOutputStream.close();
+ super.close();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ fileOutputStream.write(b);
+ }
+ };
+
+ omDbCheckpointServletMock =
mock(OMDBCheckpointServletInodeBasedXfer.class);
+
+ BootstrapStateHandler.Lock lock = new OMDBCheckpointServlet.Lock(om);
+ doCallRealMethod().when(omDbCheckpointServletMock).init();
+
assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore());
+
+ requestMock = mock(HttpServletRequest.class);
+ // Return current user short name when asked
+ when(requestMock.getRemoteUser())
+ .thenReturn(UserGroupInformation.getCurrentUser().getShortUserName());
+ responseMock = mock(HttpServletResponse.class);
+
+ ServletContext servletContextMock = mock(ServletContext.class);
+ when(omDbCheckpointServletMock.getServletContext())
+ .thenReturn(servletContextMock);
+
+ when(servletContextMock.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE))
+ .thenReturn(om);
+ when(requestMock.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH))
+ .thenReturn("true");
+
+ doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock,
+ responseMock);
+ doCallRealMethod().when(omDbCheckpointServletMock).doPost(requestMock,
+ responseMock);
+
+ doCallRealMethod().when(omDbCheckpointServletMock)
+ .writeDbDataToStream(any(), any(), any(), any(), any());
+
+ when(omDbCheckpointServletMock.getBootstrapStateLock())
+ .thenReturn(lock);
+ doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(),
anyBoolean());
+
assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getBootstrapTempData());
+ doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirs(any());
+ doCallRealMethod().when(omDbCheckpointServletMock).
+ processMetadataSnapshotRequest(any(), any(), anyBoolean(),
anyBoolean());
+
doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(),
any(), any(), any());
+ doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir();
+ doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir();
+ }
+
+ @Test
+ void testContentsOfTarballWithSnapshot() throws Exception {
+ setupCluster();
+ setupMocks();
+
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");
+ String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5);
+ String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5);
+ // Create a "spy" dbstore keep track of the checkpoint.
+ writeData(volumeName, bucketName, true);
+ DBStore dbStore = om.getMetadataManager().getStore();
+ DBStore spyDbStore = spy(dbStore);
+ AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>();
+ when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> {
+ DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
+ // Don't delete the checkpoint, because we need to compare it
+ // with the snapshot data.
+ doNothing().when(checkpoint).cleanupCheckpoint();
+ realCheckpoint.set(checkpoint);
+ return checkpoint;
+ });
+ // Init the mock with the spyDbstore
+ doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
+ eq(false), any(), any(), eq(false));
+ omDbCheckpointServletMock.initialize(spyDbStore,
om.getMetrics().getDBCheckpointMetrics(),
+ false,
+ om.getOmAdminUsernames(), om.getOmAdminGroups(), false);
+
+ // Get the tarball.
+ when(responseMock.getOutputStream()).thenReturn(servletOutputStream);
+ omDbCheckpointServletMock.doGet(requestMock, responseMock);
+ String testDirName = folder.resolve("testDir").toString();
+ String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
+ File newDbDir = new File(newDbDirName);
+ assertTrue(newDbDir.mkdirs());
+ FileUtil.unTar(tempFile, newDbDir);
+ List<String> snapshotPaths = new ArrayList<>();
+ client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
+ .forEachRemaining(snapInfo ->
snapshotPaths.add(getSnapshotDBPath(snapInfo.getCheckpointDir())));
+ Set<String> inodesFromOmDataDir = new HashSet<>();
+ Set<String> inodesFromTarball = new HashSet<>();
+ Set<Path> allPathsInTarball = new HashSet<>();
+ try (Stream<Path> filesInTarball = Files.list(newDbDir.toPath())) {
+ List<Path> files = filesInTarball.collect(Collectors.toList());
+ for (Path p : files) {
+ File file = p.toFile();
+ if (file.getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) {
+ continue;
+ }
+ String inode = getInode(file.getName());
+ inodesFromTarball.add(inode);
+ allPathsInTarball.add(p);
+ }
+ }
+ Map<String, List<String>> hardLinkMapFromOmData = new HashMap<>();
+ Path checkpointLocation = realCheckpoint.get().getCheckpointLocation();
+ populateInodesOfFilesInDirectory(dbStore, checkpointLocation,
+ inodesFromOmDataDir, hardLinkMapFromOmData);
+ for (String snapshotPath : snapshotPaths) {
+ populateInodesOfFilesInDirectory(dbStore, Paths.get(snapshotPath),
+ inodesFromOmDataDir, hardLinkMapFromOmData);
+ }
+ Path hardlinkFilePath =
+ newDbDir.toPath().resolve(OmSnapshotManager.OM_HARDLINK_FILE);
+ Map<String, List<String>> hardlinkMapFromTarball =
readFileToMap(hardlinkFilePath.toString());
+
+ // verify that all entries in hardLinkMapFromOmData are present in
hardlinkMapFromTarball.
+ // entries in hardLinkMapFromOmData are from the snapshots + OM db
checkpoint so they
+ // should be present in the tarball.
+
+ for (Map.Entry<String, List<String>> entry :
hardLinkMapFromOmData.entrySet()) {
+ String key = entry.getKey();
+ List<String> value = entry.getValue();
+ assertTrue(hardlinkMapFromTarball.containsKey(key));
+ assertEquals(value, hardlinkMapFromTarball.get(key));
+ }
+ // all files from the checkpoint should be in the tarball
+ assertFalse(inodesFromTarball.isEmpty());
+ assertTrue(inodesFromTarball.containsAll(inodesFromOmDataDir));
+
+ // create hardlinks now
+ OmSnapshotUtils.createHardLinks(newDbDir.toPath());
+ for (Path old : allPathsInTarball) {
+ assertTrue(old.toFile().delete());
+ }
+ assertFalse(hardlinkFilePath.toFile().exists());
+ }
+
+ public static Map<String, List<String>> readFileToMap(String filePath)
throws IOException {
+ Map<String, List<String>> dataMap = new HashMap<>();
+ try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath),
StandardCharsets.UTF_8)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String trimmedLine = line.trim();
+ if (trimmedLine.isEmpty() || !trimmedLine.contains("\t")) {
+ continue;
+ }
+ int tabIndex = trimmedLine.indexOf("\t");
+ if (tabIndex > 0) {
+ // value is the full path that needs to be constructed
+ String value = trimmedLine.substring(0, tabIndex).trim();
+ // key is the inodeID
+ String key = getInode(trimmedLine.substring(tabIndex + 1).trim());
+ if (!key.isEmpty() && !value.isEmpty()) {
+ dataMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ }
+ }
+ }
+ }
+ for (Map.Entry<String, List<String>> entry : dataMap.entrySet()) {
+ Collections.sort(entry.getValue());
+ }
+ return dataMap;
+ }
+
+ private void populateInodesOfFilesInDirectory(DBStore dbStore, Path
dbLocation,
+ Set<String> inodesFromOmDbCheckpoint, Map<String, List<String>>
hardlinkMap) throws IOException {
+ try (Stream<Path> filesInOmDb = Files.list(dbLocation)) {
+ List<Path> files = filesInOmDb.collect(Collectors.toList());
+ for (Path p : files) {
+ if (Files.isDirectory(p) ||
p.toFile().getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) {
+ continue;
+ }
+ String inode =
getInode(OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(p));
+ Path metadataDir = OMStorage.getOmDbDir(conf).toPath();
+ String path = metadataDir.relativize(p).toString();
+ if (path.contains(OM_CHECKPOINT_DIR)) {
+ path =
metadataDir.relativize(dbStore.getDbLocation().toPath().resolve(p.getFileName())).toString();
+ }
+ if (path.startsWith(OM_DB_NAME)) {
+ Path fileName = Paths.get(path).getFileName();
+ // fileName will not be null, added null check for findbugs
+ if (fileName != null) {
+ path = fileName.toString();
+ }
+ }
+ hardlinkMap.computeIfAbsent(inode, k -> new ArrayList<>()).add(path);
+ inodesFromOmDbCheckpoint.add(inode);
+ }
+ }
+ for (Map.Entry<String, List<String>> entry : hardlinkMap.entrySet()) {
+ Collections.sort(entry.getValue());
+ }
+ }
+
+ private String getSnapshotDBPath(String checkPointDir) {
+ return OMStorage.getOmDbDir(cluster.getConf()) +
+ OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX +
+ OM_DB_NAME + checkPointDir;
+ }
+
+ private static String getInode(String inodeAndMtime) {
+ String inode = inodeAndMtime.split("-")[0];
+ return inode;
+ }
+
+ private void writeData(String volumeName, String bucketName, boolean
includeSnapshots) throws Exception {
+ OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client,
volumeName, bucketName);
+ for (int i = 0; i < 10; i++) {
+ TestDataUtil.createKey(bucket, "key" + i,
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.ONE),
+ "sample".getBytes(StandardCharsets.UTF_8));
+ om.getMetadataManager().getStore().flushDB();
+ }
+ if (includeSnapshots) {
+ TestDataUtil.createKey(bucket, "keysnap1",
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.ONE),
+ "sample".getBytes(StandardCharsets.UTF_8));
+ TestDataUtil.createKey(bucket, "keysnap2",
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.ONE),
+ "sample".getBytes(StandardCharsets.UTF_8));
+ client.getObjectStore().createSnapshot(volumeName, bucketName,
"snapshot10");
+ client.getObjectStore().createSnapshot(volumeName, bucketName,
"snapshot20");
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
new file mode 100644
index 00000000000..3fe5aca7a91
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static org.apache.hadoop.hdds.utils.Archiver.includeFile;
+import static org.apache.hadoop.hdds.utils.Archiver.linkAndIncludeFile;
+import static org.apache.hadoop.hdds.utils.Archiver.tar;
+import static
org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
+import static
org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.includeSnapshotData;
+import static
org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.logEstimatedTarballSize;
+import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
+import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.recon.ReconConfig;
+import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Specialized OMDBCheckpointServlet implementation that transfers Ozone
Manager
+ * database checkpoints using inode-based deduplication.
+ * <p>
+ * This servlet constructs checkpoint archives by examining file inodes,
+ * ensuring that files with the same inode (i.e., hardlinks or duplicates)
+ * are only transferred once. It maintains mappings from inode IDs to file
+ * paths, manages hardlink information, and enforces snapshot and SST file
+ * size constraints as needed.
+ * <p>
+ * This approach optimizes checkpoint streaming by reducing redundant data
+ * transfer, especially in environments where RocksDB and snapshotting result
+ * in multiple hardlinks to the same physical data.
+ */
+public class OMDBCheckpointServletInodeBasedXfer extends DBCheckpointServlet {
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(OMDBCheckpointServletInodeBasedXfer.class);
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void init() throws ServletException {
+ OzoneManager om = (OzoneManager) getServletContext()
+ .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
+
+ if (om == null) {
+ LOG.error("Unable to initialize OMDBCheckpointServlet. OM is null");
+ return;
+ }
+
+ OzoneConfiguration conf = getConf();
+ // Only Ozone Admins and Recon are allowed
+ Collection<String> allowedUsers =
+ new LinkedHashSet<>(om.getOmAdminUsernames());
+ Collection<String> allowedGroups = om.getOmAdminGroups();
+ ReconConfig reconConfig = conf.getObject(ReconConfig.class);
+ String reconPrincipal = reconConfig.getKerberosPrincipal();
+ if (!reconPrincipal.isEmpty()) {
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(reconPrincipal);
+ allowedUsers.add(ugi.getShortUserName());
+ }
+
+ initialize(om.getMetadataManager().getStore(),
+ om.getMetrics().getDBCheckpointMetrics(),
+ om.getAclsEnabled(),
+ allowedUsers,
+ allowedGroups,
+ om.isSpnegoEnabled());
+ }
+
+ @Override
+ public void processMetadataSnapshotRequest(HttpServletRequest request,
HttpServletResponse response,
+ boolean isFormData, boolean flush) {
+ List<String> excludedSstList = new ArrayList<>();
+ String[] sstParam = isFormData ?
+ parseFormDataParameters(request) : request.getParameterValues(
+ OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST);
+ Set<String> receivedSstFiles = extractSstFilesToExclude(sstParam);
+ Path tmpdir = null;
+ try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
+ tmpdir = Files.createTempDirectory(getBootstrapTempData().toPath(),
+ "bootstrap-data-");
+ if (tmpdir == null) {
+ throw new IOException("tmp dir is null");
+ }
+ String tarName = "om.data-" + System.currentTimeMillis() + ".tar";
+ response.setContentType("application/x-tar");
+ response.setHeader("Content-Disposition", "attachment; filename=\"" +
tarName + "\"");
+ Instant start = Instant.now();
+ writeDbDataToStream(request, response.getOutputStream(),
receivedSstFiles, tmpdir);
+ Instant end = Instant.now();
+ long duration = Duration.between(start, end).toMillis();
+ LOG.info("Time taken to write the checkpoint to response output " +
+ "stream: {} milliseconds", duration);
+ logSstFileList(excludedSstList,
+ "Excluded {} SST files from the latest checkpoint{}: {}", 5);
+ } catch (Exception e) {
+ LOG.error(
+ "Unable to process metadata snapshot request. ", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ } finally {
+ try {
+ if (tmpdir != null) {
+ FileUtils.deleteDirectory(tmpdir.toFile());
+ }
+ } catch (IOException e) {
+ LOG.error("unable to delete: " + tmpdir, e.toString());
+ }
+ }
+ }
+
+ Path getSstBackupDir() {
+ RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
+ return new File(differ.getSSTBackupDir()).toPath();
+ }
+
+ Path getCompactionLogDir() {
+ RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
+ return new File(differ.getCompactionLogDir()).toPath();
+ }
+
+ /**
+ * Streams the Ozone Manager database checkpoint and (optionally)
snapshot-related data
+ * as a tar archive to the provided output stream. This method handles
deduplication
+ * based on file inodes to avoid transferring duplicate files (such as
hardlinks),
+ * supports excluding specific SST files, enforces maximum total SST file
size limits,
+ * and manages temporary directories for processing.
+ *
+ * The method processes snapshot directories and backup/compaction logs (if
requested),
+ * then finally the active OM database. It also writes a hardlink mapping
file
+ * and includes a completion flag for Ratis snapshot streaming.
+ *
+ * @param request The HTTP servlet request containing parameters
for the snapshot.
+ * @param destination The output stream to which the tar archive is
written.
+ * @param sstFilesToExclude Set of SST file identifiers to exclude from the
archive.
+ * @param tmpdir Temporary directory for staging files during
archiving.
+ * @throws IOException if an I/O error occurs during processing or streaming.
+ */
+
+ public void writeDbDataToStream(HttpServletRequest request, OutputStream
destination,
+ Set<String> sstFilesToExclude, Path tmpdir) throws IOException {
+ DBCheckpoint checkpoint = null;
+ OzoneManager om = (OzoneManager)
getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
+ OMMetadataManager omMetadataManager = om.getMetadataManager();
+ boolean includeSnapshotData = includeSnapshotData(request);
+ AtomicLong maxTotalSstSize = new
AtomicLong(getConf().getLong(OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY,
+ OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT));
+
+ Set<Path> snapshotPaths = Collections.emptySet();
+
+ if (!includeSnapshotData) {
+ maxTotalSstSize.set(Long.MAX_VALUE);
+ } else {
+ snapshotPaths = getSnapshotDirs(omMetadataManager);
+ }
+
+ if (sstFilesToExclude.isEmpty()) {
+ logEstimatedTarballSize(getDbStore().getDbLocation().toPath(),
snapshotPaths);
+ }
+
+ boolean shouldContinue = true;
+
+ Map<String, String> hardLinkFileMap = new HashMap<>();
+ try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream =
tar(destination)) {
+ if (includeSnapshotData) {
+ // Process each snapshot db path and write it to archive
+ for (Path snapshotDbPath : snapshotPaths) {
+ if (!shouldContinue) {
+ break;
+ }
+ shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath,
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ }
+
+
+ if (shouldContinue) {
+ shouldContinue = writeDBToArchive(sstFilesToExclude,
getSstBackupDir(),
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ }
+
+ if (shouldContinue) {
+ shouldContinue = writeDBToArchive(sstFilesToExclude,
getCompactionLogDir(),
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ }
+ }
+
+ if (shouldContinue) {
+ // we finished transferring files from snapshot DB's by now and
+ // this is the last step where we transfer the active om.db contents
+ checkpoint = createAndPrepareCheckpoint(tmpdir, true);
+ // unlimited files as we want the Active DB contents to be transferred
in a single batch
+ maxTotalSstSize.set(Long.MAX_VALUE);
+ Path checkpointDir = checkpoint.getCheckpointLocation();
+ writeDBToArchive(sstFilesToExclude, checkpointDir,
+ maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
+ if (includeSnapshotData) {
+ Path tmpCompactionLogDir =
tmpdir.resolve(getCompactionLogDir().getFileName());
+ Path tmpSstBackupDir =
tmpdir.resolve(getSstBackupDir().getFileName());
+ writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir,
maxTotalSstSize, archiveOutputStream, tmpdir,
+ hardLinkFileMap, getCompactionLogDir());
+ writeDBToArchive(sstFilesToExclude, tmpSstBackupDir,
maxTotalSstSize, archiveOutputStream, tmpdir,
+ hardLinkFileMap, getSstBackupDir());
+ }
+ writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream);
+ includeRatisSnapshotCompleteFlag(archiveOutputStream);
+ }
+
+ } catch (IOException ioe) {
+ LOG.error("got exception writing to archive " + ioe);
+ throw ioe;
+ } finally {
+ cleanupCheckpoint(checkpoint);
+ }
+ }
+
+ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
+ AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry>
archiveOutputStream,
+ Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException {
+ return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
+ archiveOutputStream, tmpdir, hardLinkFileMap, null);
+ }
+
+ private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
+ if (checkpoint != null) {
+ try {
+ checkpoint.cleanupCheckpoint();
+ } catch (IOException e) {
+ LOG.error("Error trying to clean checkpoint at {} .",
+ checkpoint.getCheckpointLocation().toString());
+ }
+ }
+ }
+
+ /**
+ * Writes a hardlink mapping file to the archive, which maps file IDs to
their
+ * relative paths. This method generates the mapping file based on the
provided
+ * hardlink metadata and adds it to the archive output stream.
+ *
+ * @param conf Ozone configuration for the OM instance.
+ * @param hardlinkFileMap A map where the key is the absolute file path
+ * and the value is its corresponding file ID.
+ * @param archiveOutputStream The archive output stream to which the hardlink
+ * file should be written.
+ * @throws IOException If an I/O error occurs while creating or writing the
+ * hardlink file.
+ */
+ private static void writeHardlinkFile(OzoneConfiguration conf, Map<String,
String> hardlinkFileMap,
+ ArchiveOutputStream<TarArchiveEntry> archiveOutputStream) throws
IOException {
+ Path data = Files.createTempFile(DATA_PREFIX, DATA_SUFFIX);
+ Path metaDirPath = OMStorage.getOmDbDir(conf).toPath();
+ StringBuilder sb = new StringBuilder();
+
+ for (Map.Entry<String, String> entry : hardlinkFileMap.entrySet()) {
+ Path p = Paths.get(entry.getKey());
+ String fileId = entry.getValue();
+ Path relativePath = metaDirPath.relativize(p);
+ // if the file is in "om.db" directory, strip off the 'o
+ // m.db' name from the path
+ // and only keep the file name as this would be created in the current
dir of the untarred dir
+ // on the follower.
+ if (relativePath.startsWith(OM_DB_NAME)) {
+ relativePath = relativePath.getFileName();
+ }
+ sb.append(relativePath).append('\t').append(fileId).append('\n');
+ }
+ Files.write(data, sb.toString().getBytes(StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING);
+ includeFile(data.toFile(), OmSnapshotManager.OM_HARDLINK_FILE,
archiveOutputStream);
+ }
+
+ /**
+ * Gets the configuration from the OzoneManager context.
+ *
+ * @return OzoneConfiguration instance
+ */
+ private OzoneConfiguration getConf() {
+ return ((OzoneManager) getServletContext()
+ .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE))
+ .getConfiguration();
+ }
+
+ /**
+ * Collects paths to all snapshot databases.
+ *
+ * @param omMetadataManager OMMetadataManager instance
+ * @return Set of paths to snapshot databases
+ * @throws IOException if an I/O error occurs
+ */
+ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws
IOException {
+ Set<Path> snapshotPaths = new HashSet<>();
+ SnapshotChainManager snapshotChainManager = new
SnapshotChainManager(omMetadataManager);
+ for (SnapshotChainInfo snapInfo :
snapshotChainManager.getGlobalSnapshotChain().values()) {
+ String snapshotDir =
+ OmSnapshotManager.getSnapshotPath(getConf(),
SnapshotInfo.getCheckpointDirName(snapInfo.getSnapshotId()));
+ Path path = Paths.get(snapshotDir);
+ snapshotPaths.add(path);
+ }
+ return snapshotPaths;
+ }
+
+ /**
+ * Writes database files to the archive, handling deduplication based on
inode IDs.
+ * Here the dbDir could either be a snapshot db directory, the active om.db,
+ * compaction log dir, sst backup dir.
+ *
+ * @param sstFilesToExclude Set of SST file IDs to exclude from the archive
+ * @param dbDir Directory containing database files to archive
+ * @param maxTotalSstSize Maximum total size of SST files to include
+ * @param archiveOutputStream Archive output stream
+ * @param tmpDir Temporary directory for processing
+ * @return true if processing should continue, false if size limit reached
+ * @throws IOException if an I/O error occurs
+ */
+ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir,
AtomicLong maxTotalSstSize,
+ ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
+ Map<String, String> hardLinkFileMap, Path destDir) throws IOException {
+ if (!Files.exists(dbDir)) {
+ LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
+ return true;
+ }
+ long bytesWritten = 0L;
+ int filesWritten = 0;
+ long lastLoggedTime = Time.monotonicNow();
+ try (Stream<Path> files = Files.list(dbDir)) {
+ Iterable<Path> iterable = files::iterator;
+ for (Path dbFile : iterable) {
+ if (!Files.isDirectory(dbFile)) {
+ String fileId =
OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
+ String path = dbFile.toFile().getAbsolutePath();
+ if (destDir != null) {
+ path = destDir.resolve(dbFile.getFileName()).toString();
+ }
+ // if the file is in the om checkpoint dir, then we need to change
the path to point to the OM DB.
+ if (path.contains(OM_CHECKPOINT_DIR)) {
+ path =
getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
+ }
+ hardLinkFileMap.put(path, fileId);
+ if (!sstFilesToExclude.contains(fileId)) {
+ long fileSize = Files.size(dbFile);
+ if (maxTotalSstSize.get() - fileSize <= 0) {
+ return false;
+ }
+ bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId,
archiveOutputStream, tmpDir);
+ filesWritten++;
+ maxTotalSstSize.addAndGet(-fileSize);
+ sstFilesToExclude.add(fileId);
+ if (Time.monotonicNow() - lastLoggedTime >= 30000) {
+ LOG.info("Transferred {} KB, #files {} to checkpoint tarball
stream...",
+ bytesWritten / (1024), filesWritten);
+ lastLoggedTime = Time.monotonicNow();
+ }
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Creates a database checkpoint and copies compaction log and SST backup
files
+ * into the given temporary directory.
+ * The copy to the temporary directory for compaction log and SST backup
files
+ * is done to maintain a consistent view of the files in these directories.
+ *
+ * @param tmpdir Temporary directory for storing checkpoint-related files.
+ * @param flush If true, flushes in-memory data to disk before
checkpointing.
+ * @return The created database checkpoint.
+ * @throws IOException If an error occurs during checkpoint creation or file
copying.
+ */
+ private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush)
throws IOException {
+ // make tmp directories to contain the copies
+ Path tmpCompactionLogDir =
tmpdir.resolve(getCompactionLogDir().getFileName());
+ Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
+
+ // Create checkpoint and then copy the files so that it has all the
compaction entries and files.
+ DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
+ FileUtils.copyDirectory(getCompactionLogDir().toFile(),
tmpCompactionLogDir.toFile());
+ OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(),
tmpSstBackupDir.toFile());
+
+ return dbCheckpoint;
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 1de509e9939..93070fcbe05 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -758,10 +758,15 @@ public static Path getSnapshotPath(OMMetadataManager
omMetadataManager, Snapshot
}
public static String getSnapshotPath(OzoneConfiguration conf,
- SnapshotInfo snapshotInfo) {
+ SnapshotInfo snapshotInfo) {
+ return getSnapshotPath(conf, snapshotInfo.getCheckpointDirName());
+ }
+
+ public static String getSnapshotPath(OzoneConfiguration conf,
+ String checkpointDirName) {
return OMStorage.getOmDbDir(conf) +
OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX +
- OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+ OM_DB_NAME + checkpointDirName;
}
public static boolean isSnapshotKey(String[] keyParts) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
index f5805044b7f..848384ce3e2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
@@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -65,6 +66,26 @@ public static Object getINode(Path file) throws IOException {
return Files.readAttributes(file, BasicFileAttributes.class).fileKey();
}
+ /**
+ * Returns a string combining the inode (fileKey) and the last modification
time (mtime) of the given file.
+ * <p>
+ * The returned string is formatted as "{inode}-{mtime}", where:
+ * <ul>
+ * <li>{@code inode} is the unique file key obtained from the file system,
typically representing
+ * the inode on POSIX systems</li>
+ * <li>{@code mtime} is the last modified time of the file in milliseconds
since the epoch</li>
+ * </ul>
+ *
+ * @param file the {@link Path} to the file whose inode and modification
time are to be retrieved
+ * @return a string in the format "{inode}-{mtime}"
+ * @throws IOException if an I/O error occurs
+ */
+ public static String getFileInodeAndLastModifiedTimeString(Path file) throws
IOException {
+ Object inode = Files.readAttributes(file,
BasicFileAttributes.class).fileKey();
+ FileTime mTime = Files.getLastModifiedTime(file);
+ return String.format("%s-%s", inode, mTime.toMillis());
+ }
+
/**
* Create file of links to add to tarball.
* Format of entries are either:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]