This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dc2b5585818 HDDS-13867. SnapshotDiff delta file computation should 
happen based on LocalDataYaml file (#9268)
dc2b5585818 is described below

commit dc2b55858180dd719205073a30db0e7cf84aa96f
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Nov 17 21:37:39 2025 -0500

    HDDS-13867. SnapshotDiff delta file computation should happen based on 
LocalDataYaml file (#9268)
---
 .../org/apache/ozone/rocksdb/util/RdbUtil.java     |  18 +-
 .../org/apache/ozone/rocksdb/util/SstFileInfo.java |   6 +
 .../apache/ozone/rocksdiff/DifferSnapshotInfo.java |  72 ++--
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 271 +++++++-------
 .../org/apache/ozone/rocksdiff/RocksDiffUtils.java |  61 ++--
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 403 +++++++++++++--------
 .../apache/ozone/rocksdiff/TestRocksDiffUtils.java |  98 +----
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      | 110 +++---
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   2 +-
 .../ozone/om/snapshot/SnapshotDiffManager.java     | 139 ++++---
 .../ozone/om/snapshot/TestSnapshotDiffManager.java | 129 ++++---
 11 files changed, 711 insertions(+), 598 deletions(-)

diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
index 95c4a4aa2bb..ac88102f800 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.ozone.rocksdb.util;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -41,28 +40,25 @@ public final class RdbUtil {
 
   private RdbUtil() { }
 
-  public static List<LiveFileMetaData> getLiveSSTFilesForCFs(
-      final ManagedRocksDB rocksDB, Set<String> cfs) {
+  public static List<LiveFileMetaData> getLiveSSTFilesForCFs(final 
ManagedRocksDB rocksDB, Set<String> cfs) {
     return rocksDB.get().getLiveFilesMetaData().stream()
         .filter(lfm -> 
cfs.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
         .collect(Collectors.toList());
   }
 
-  public static Set<String> getSSTFilesForComparison(
-      final ManagedRocksDB rocksDB, Set<String> cfs) {
-    return getLiveSSTFilesForCFs(rocksDB, cfs).stream()
-        .map(lfm -> new File(lfm.path(), lfm.fileName()).getPath())
+  public static Set<SstFileInfo> getSSTFilesForComparison(final ManagedRocksDB 
rocksDB, Set<String> cfs) {
+    return getLiveSSTFilesForCFs(rocksDB, cfs).stream().map(SstFileInfo::new)
         .collect(Collectors.toCollection(HashSet::new));
   }
 
-  public static Map<Object, String> getSSTFilesWithInodesForComparison(final 
ManagedRocksDB rocksDB, Set<String> cfs)
-      throws IOException {
+  public static Map<Object, SstFileInfo> getSSTFilesWithInodesForComparison(
+      final ManagedRocksDB rocksDB, Set<String> cfs) throws IOException {
     List<LiveFileMetaData> liveSSTFilesForCFs = getLiveSSTFilesForCFs(rocksDB, 
cfs);
-    Map<Object, String> inodeToSstMap = new HashMap<>();
+    Map<Object, SstFileInfo> inodeToSstMap = new HashMap<>();
     for (LiveFileMetaData lfm : liveSSTFilesForCFs) {
       Path sstFilePath = Paths.get(lfm.path(), lfm.fileName());
       Object inode = Files.readAttributes(sstFilePath, 
BasicFileAttributes.class).fileKey();
-      inodeToSstMap.put(inode, sstFilePath.toString());
+      inodeToSstMap.put(inode, new SstFileInfo(lfm));
     }
     return inodeToSstMap;
   }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
index 50f8c4c54d0..bc4975f9868 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
@@ -18,7 +18,9 @@
 package org.apache.ozone.rocksdb.util;
 
 import static org.apache.commons.io.FilenameUtils.getBaseName;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
 
+import java.nio.file.Path;
 import java.util.Objects;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CopyObject;
@@ -89,6 +91,10 @@ public int hashCode() {
     return Objects.hash(fileName, startKey, endKey, columnFamily);
   }
 
+  public Path getFilePath(Path directoryPath) {
+    return directoryPath.resolve(fileName + SST_FILE_EXTENSION);
+  }
+
   @Override
   public SstFileInfo copyObject() {
     return new SstFileInfo(fileName, startKey, endKey, columnFamily);
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
index c72f56d5f11..840ed37a246 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
@@ -17,56 +17,68 @@
 
 package org.apache.ozone.rocksdiff;
 
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Set;
 import java.util.UUID;
-import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
 
 /**
  * Snapshot information node class for the differ.
  */
 public class DifferSnapshotInfo {
-  private final String dbPath;
-  private final UUID snapshotId;
-  private final long snapshotGeneration;
+  private final UUID id;
+  private final long generation;
+  private final Function<Integer, Path> dbPathFunction;
+  private final NavigableMap<Integer, List<SstFileInfo>> versionSstFiles;
 
-  private final TablePrefixInfo tablePrefixes;
+  public DifferSnapshotInfo(Function<Integer, Path> dbPathFunction, UUID id, 
long gen,
+                            NavigableMap<Integer, List<SstFileInfo>> sstFiles) 
{
+    this.dbPathFunction = dbPathFunction;
+    this.id = id;
+    generation = gen;
+    this.versionSstFiles = sstFiles;
+  }
 
-  private final ManagedRocksDB rocksDB;
+  public Path getDbPath(int version) {
+    return dbPathFunction.apply(version);
+  }
 
-  public DifferSnapshotInfo(String db, UUID id, long gen,
-                            TablePrefixInfo tablePrefixInfo,
-                            ManagedRocksDB rocksDB) {
-    dbPath = db;
-    snapshotId = id;
-    snapshotGeneration = gen;
-    tablePrefixes = tablePrefixInfo;
-    this.rocksDB = rocksDB;
+  public UUID getId() {
+    return id;
   }
 
-  public String getDbPath() {
-    return dbPath;
+  public long getGeneration() {
+    return generation;
   }
 
-  public UUID getSnapshotId() {
-    return snapshotId;
+  List<SstFileInfo> getSstFiles(int version, Set<String> tablesToLookup) {
+    return versionSstFiles.get(version).stream()
+        .filter(sstFileInfo -> 
tablesToLookup.contains(sstFileInfo.getColumnFamily()))
+        .collect(Collectors.toList());
   }
 
-  public long getSnapshotGeneration() {
-    return snapshotGeneration;
+  @VisibleForTesting
+  SstFileInfo getSstFile(int version, String fileName) {
+    return versionSstFiles.get(version).stream()
+        .filter(sstFileInfo -> sstFileInfo.getFileName().equals(fileName))
+        .findFirst().orElse(null);
   }
 
-  public TablePrefixInfo getTablePrefixes() {
-    return tablePrefixes;
+  Integer getMaxVersion() {
+    return versionSstFiles.lastKey();
   }
 
   @Override
   public String toString() {
-    return String.format("DifferSnapshotInfo{dbPath='%s', snapshotID='%s', " +
-            "snapshotGeneration=%d, tablePrefixes size=%s}",
-        dbPath, snapshotId, snapshotGeneration, tablePrefixes.size());
-  }
-
-  public ManagedRocksDB getRocksDB() {
-    return rocksDB;
+    return String.format("DifferSnapshotInfo{dbPath='%s', id='%s', 
generation=%d}",
+        versionSstFiles.keySet().stream().collect(toMap(identity(), 
dbPathFunction::apply)), id, generation);
   }
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 9de00948da7..2a3ae61fa56 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -18,6 +18,7 @@
 package org.apache.ozone.rocksdiff;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.function.Function.identity;
 import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
@@ -32,7 +33,6 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import com.google.common.graph.MutableGraph;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.BufferedWriter;
@@ -73,6 +73,7 @@
 import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
 import org.apache.hadoop.hdds.utils.Scheduler;
 import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedEnvOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
@@ -84,7 +85,7 @@
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.ozone.compaction.log.CompactionFileInfo;
 import org.apache.ozone.compaction.log.CompactionLogEntry;
-import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
 import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.rocksdb.AbstractEventListener;
 import org.rocksdb.ColumnFamilyHandle;
@@ -153,7 +154,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
    * Used to trim the file extension when writing compaction entries to the log
    * to save space.
    */
-  static final String SST_FILE_EXTENSION = ".sst";
+  public static final String SST_FILE_EXTENSION = ".sst";
   public static final int SST_FILE_EXTENSION_LENGTH =
       SST_FILE_EXTENSION.length();
   static final String PRUNED_SST_FILE_TEMP = "pruned.sst.tmp";
@@ -597,57 +598,6 @@ private void createLink(Path link, Path source) {
     }
   }
 
-  /**
-   * Helper method to trim the filename retrieved from LiveFileMetaData.
-   */
-  private String trimSSTFilename(String filename) {
-    if (!filename.startsWith("/")) {
-      final String errorMsg = String.format(
-          "Invalid start of filename: '%s'. Expected '/'", filename);
-      LOG.error(errorMsg);
-      throw new RuntimeException(errorMsg);
-    }
-    if (!filename.endsWith(SST_FILE_EXTENSION)) {
-      final String errorMsg = String.format(
-          "Invalid extension of file: '%s'. Expected '%s'",
-          filename, SST_FILE_EXTENSION_LENGTH);
-      LOG.error(errorMsg);
-      throw new RuntimeException(errorMsg);
-    }
-    return filename.substring("/".length(),
-        filename.length() - SST_FILE_EXTENSION_LENGTH);
-  }
-
-  /**
-   * Read the current Live manifest for a given RocksDB instance (Active or
-   * Checkpoint).
-   * @param rocksDB open rocksDB instance.
-   * @param tableFilter set of column-family/table names to include when 
collecting live SSTs.
-   * @return a list of SST files (without extension) in the DB.
-   */
-  public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB, Set<String> 
tableFilter) {
-    HashSet<String> liveFiles = new HashSet<>();
-
-    final Set<String> cfs = Sets.newHashSet(
-        org.apache.hadoop.hdds.StringUtils.bytes2String(
-            RocksDB.DEFAULT_COLUMN_FAMILY), "keyTable", "directoryTable",
-        "fileTable");
-    // Note it retrieves only the selected column families by the descriptor
-    // i.e. keyTable, directoryTable, fileTable
-    List<LiveFileMetaData> liveFileMetaDataList =
-        RdbUtil.getLiveSSTFilesForCFs(rocksDB, cfs);
-    LOG.debug("SST File Metadata for DB: " + rocksDB.get().getName());
-    for (LiveFileMetaData m : liveFileMetaDataList) {
-      if 
(!tableFilter.contains(StringUtils.bytes2String(m.columnFamilyName()))) {
-        continue;
-      }
-      LOG.debug("File: {}, Level: {}", m.fileName(), m.level());
-      final String trimmedFilename = trimSSTFilename(m.fileName());
-      liveFiles.add(trimmedFilename);
-    }
-    return liveFiles;
-  }
-
   /**
    * Process log line of compaction log text file input and populate the DAG.
    * It also adds the compaction log entry to compaction log table.
@@ -791,12 +741,10 @@ private void preconditionChecksForLoadAllCompactionLogs() 
{
    * exist in backup directory before being involved in compactions),
    * and appends the extension '.sst'.
    */
-  private String getSSTFullPath(String sstFilenameWithoutExtension,
-      String... dbPaths) {
+  private String getSSTFullPath(String sstFilenameWithoutExtension, Path... 
dbPaths) {
 
     // Try to locate the SST in the backup dir first
-    final Path sstPathInBackupDir = Paths.get(sstBackupDir,
-        sstFilenameWithoutExtension + SST_FILE_EXTENSION);
+    final Path sstPathInBackupDir = Paths.get(sstBackupDir, 
sstFilenameWithoutExtension + SST_FILE_EXTENSION);
     if (Files.exists(sstPathInBackupDir)) {
       return sstPathInBackupDir.toString();
     }
@@ -804,17 +752,15 @@ private String getSSTFullPath(String 
sstFilenameWithoutExtension,
     // SST file does not exist in the SST backup dir, this means the SST file
     // has not gone through any compactions yet and is only available in the
     // src DB directory or destDB directory
-    for (String dbPath : dbPaths) {
-      final Path sstPathInDBDir = Paths.get(dbPath,
-          sstFilenameWithoutExtension + SST_FILE_EXTENSION);
+    for (Path dbPath : dbPaths) {
+      final Path sstPathInDBDir = dbPath.resolve(sstFilenameWithoutExtension + 
SST_FILE_EXTENSION);
       if (Files.exists(sstPathInDBDir)) {
         return sstPathInDBDir.toString();
       }
     }
 
     // TODO: More graceful error handling?
-    throw new RuntimeException("Unable to locate SST file: " +
-        sstFilenameWithoutExtension);
+    throw new RuntimeException("Unable to locate SST file: " + 
sstFilenameWithoutExtension);
   }
 
   /**
@@ -824,6 +770,7 @@ private String getSSTFullPath(String 
sstFilenameWithoutExtension,
    *
    * @param src source snapshot
    * @param dest destination snapshot
+   * @param versionMap version map containing the connection between source 
snapshot version and dest snapshot version.
    * @param tablesToLookup tablesToLookup set of table (column family) names 
used to restrict which SST files to return.
    * @param sstFilesDirForSnapDiffJob dir to create hardlinks for SST files
    *                                 for snapDiff job.
@@ -832,22 +779,31 @@ private String getSSTFullPath(String 
sstFilenameWithoutExtension,
    *               "/path/to/sstBackupDir/000060.sst"]
    */
   public synchronized Optional<List<String>> 
getSSTDiffListWithFullPath(DifferSnapshotInfo src,
-      DifferSnapshotInfo dest, Set<String> tablesToLookup,
-      String sstFilesDirForSnapDiffJob) {
+      DifferSnapshotInfo dest, Map<Integer, Integer> versionMap, 
TablePrefixInfo prefixInfo,
+      Set<String> tablesToLookup, String sstFilesDirForSnapDiffJob) throws 
IOException {
+    int srcVersion = src.getMaxVersion();
+    if (!versionMap.containsKey(srcVersion)) {
+      throw new IOException("No corresponding dest version corresponding 
srcVersion : " + srcVersion + " in " +
+          "versionMap : " + versionMap);
+    }
+    int destVersion = versionMap.get(srcVersion);
+    DifferSnapshotVersion srcSnapshotVersion = new DifferSnapshotVersion(src, 
src.getMaxVersion(), tablesToLookup);
+    DifferSnapshotVersion destSnapshotVersion = new 
DifferSnapshotVersion(dest, destVersion, tablesToLookup);
 
-    Optional<List<String>> sstDiffList = getSSTDiffList(src, dest, 
tablesToLookup);
+    // If the source snapshot version is 0, use the compaction DAG path 
otherwise performs a full diff on the basis
+    // of the sst file names.
+    Optional<List<SstFileInfo>> sstDiffList = 
getSSTDiffList(srcSnapshotVersion, destSnapshotVersion, prefixInfo,
+        tablesToLookup, srcVersion == 0);
 
     return sstDiffList.map(diffList -> diffList.stream()
-        .map(
-            sst -> {
-              String sstFullPath = getSSTFullPath(sst, src.getDbPath(), 
dest.getDbPath());
-              Path link = Paths.get(sstFilesDirForSnapDiffJob,
-                  sst + SST_FILE_EXTENSION);
-              Path srcFile = Paths.get(sstFullPath);
-              createLink(link, srcFile);
-              return link.toString();
-            })
-        .collect(Collectors.toList()));
+        .map(sst -> {
+          String sstFullPath = getSSTFullPath(sst.getFileName(), 
srcSnapshotVersion.getDbPath(),
+              destSnapshotVersion.getDbPath());
+          Path link = sst.getFilePath(Paths.get(sstFilesDirForSnapDiffJob));
+          Path srcFile = Paths.get(sstFullPath);
+          createLink(link, srcFile);
+          return link.toString();
+        }).collect(Collectors.toList()));
   }
 
   /**
@@ -859,57 +815,116 @@ public synchronized Optional<List<String>> 
getSSTDiffListWithFullPath(DifferSnap
    *
    * @param src source snapshot
    * @param dest destination snapshot
+   * @param prefixInfo TablePrefixInfo to filter irrelevant SST files; can be 
null.
    * @param tablesToLookup tablesToLookup Set of column-family (table) names 
to include when reading SST files;
    *                       must be non-null.
+   * @param useCompactionDag If true, the method uses the compaction history 
to produce the incremental diff,
+   *                         otherwise a full diff would be performed on the 
basis of the sst file names.
    * @return A list of SST files without extension. e.g. ["000050", "000060"]
    */
-  public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo 
src,
-      DifferSnapshotInfo dest, Set<String> tablesToLookup) {
+  public synchronized Optional<List<SstFileInfo>> 
getSSTDiffList(DifferSnapshotVersion src,
+      DifferSnapshotVersion dest, TablePrefixInfo prefixInfo, Set<String> 
tablesToLookup, boolean useCompactionDag) {
 
     // TODO: Reject or swap if dest is taken after src, once snapshot chain
     //  integration is done.
-    Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB(), 
tablesToLookup);
-    Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB(), 
tablesToLookup);
-
-    Set<String> fwdDAGSameFiles = new HashSet<>();
-    Set<String> fwdDAGDifferentFiles = new HashSet<>();
-
-    LOG.debug("Doing forward diff from src '{}' to dest '{}'",
-        src.getDbPath(), dest.getDbPath());
-    internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
-        fwdDAGSameFiles, fwdDAGDifferentFiles);
+    Map<String, SstFileInfo>  fwdDAGSameFiles = new HashMap<>();
+    Map<String, SstFileInfo>  fwdDAGDifferentFiles = new HashMap<>();
 
+    if (useCompactionDag) {
+      LOG.debug("Doing forward diff from src '{}' to dest '{}'", 
src.getDbPath(), dest.getDbPath());
+      internalGetSSTDiffList(src, dest, fwdDAGSameFiles, fwdDAGDifferentFiles);
+    } else {
+      Set<SstFileInfo> srcSstFileInfos = new 
HashSet<>(src.getSstFileMap().values());
+      Set<SstFileInfo> destSstFileInfos = new 
HashSet<>(dest.getSstFileMap().values());
+      for (SstFileInfo srcSstFileInfo : srcSstFileInfos) {
+        if (destSstFileInfos.contains(srcSstFileInfo)) {
+          fwdDAGSameFiles.put(srcSstFileInfo.getFileName(), srcSstFileInfo);
+        } else {
+          fwdDAGDifferentFiles.put(srcSstFileInfo.getFileName(), 
srcSstFileInfo);
+        }
+      }
+      for (SstFileInfo destSstFileInfo : destSstFileInfos) {
+        if (srcSstFileInfos.contains(destSstFileInfo)) {
+          fwdDAGSameFiles.put(destSstFileInfo.getFileName(), destSstFileInfo);
+        } else {
+          fwdDAGDifferentFiles.put(destSstFileInfo.getFileName(), 
destSstFileInfo);
+        }
+      }
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Result of diff from src '" + src.getDbPath() + "' to dest '" +
           dest.getDbPath() + "':");
       StringBuilder logSB = new StringBuilder();
 
       logSB.append("Fwd DAG same SST files:      ");
-      for (String file : fwdDAGSameFiles) {
+      for (String file : fwdDAGSameFiles.keySet()) {
         logSB.append(file).append(SPACE_DELIMITER);
       }
       LOG.debug(logSB.toString());
 
       logSB.setLength(0);
       logSB.append("Fwd DAG different SST files: ");
-      for (String file : fwdDAGDifferentFiles) {
+      for (String file : fwdDAGDifferentFiles.keySet()) {
         logSB.append(file).append(SPACE_DELIMITER);
       }
       LOG.debug("{}", logSB);
     }
 
     // Check if the DAG traversal was able to reach all the destination SST 
files.
-    for (String destSnapFile : destSnapFiles) {
-      if (!fwdDAGSameFiles.contains(destSnapFile) && 
!fwdDAGDifferentFiles.contains(destSnapFile)) {
+    for (String destSnapFile : dest.getSstFiles()) {
+      if (!fwdDAGSameFiles.containsKey(destSnapFile) && 
!fwdDAGDifferentFiles.containsKey(destSnapFile)) {
         return Optional.empty();
       }
     }
 
-    if (src.getTablePrefixes() != null && src.getTablePrefixes().size() != 0) {
-      RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, 
src.getTablePrefixes(),
-          compactionDag.getCompactionMap(), tablesToLookup, src.getRocksDB(), 
dest.getRocksDB());
+    if (prefixInfo != null && prefixInfo.size() != 0) {
+      RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, 
tablesToLookup, prefixInfo);
+    }
+    return Optional.of(new ArrayList<>(fwdDAGDifferentFiles.values()));
+  }
+
+  /**
+   * This class represents a version of a snapshot in a database differ 
operation.
+   * It contains metadata associated with a specific snapshot version, 
including
+   * SST file information, generation id, and the database path for the given 
version.
+   *
+   * Designed to work with `DifferSnapshotInfo`, this class allows the 
retrieval of
+   * snapshot-related metadata and facilitates mapping of SST files for 
version comparison
+   * and other operations.
+   *
+   * The core functionality is to store and provide read-only access to:
+   * - SST file information for a specified snapshot version.
+   * - Snapshot generation identifier.
+   * - Path to the database directory corresponding to the snapshot version.
+   */
+  public static class DifferSnapshotVersion {
+    private Map<String, SstFileInfo> sstFiles;
+    private long generation;
+    private Path dbPath;
+
+    public DifferSnapshotVersion(DifferSnapshotInfo differSnapshotInfo, int 
version,
+        Set<String> tablesToLookup) {
+      this.sstFiles = differSnapshotInfo.getSstFiles(version, tablesToLookup)
+          .stream().collect(Collectors.toMap(SstFileInfo::getFileName, 
identity()));
+      this.generation = differSnapshotInfo.getGeneration();
+      this.dbPath = differSnapshotInfo.getDbPath(version);
+    }
+
+    private Path getDbPath() {
+      return dbPath;
+    }
+
+    private long getGeneration() {
+      return generation;
+    }
+
+    private Set<String> getSstFiles() {
+      return sstFiles.keySet();
+    }
+
+    private Map<String, SstFileInfo> getSstFileMap() {
+      return Collections.unmodifiableMap(sstFiles);
     }
-    return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
   }
 
   /**
@@ -921,30 +936,26 @@ public synchronized Optional<List<String>> 
getSSTDiffList(DifferSnapshotInfo src
    * diffing).  Otherwise, add it to the differentFiles map, as it will
    * need further diffing.
    */
-  synchronized void internalGetSSTDiffList(
-      DifferSnapshotInfo src,
-      DifferSnapshotInfo dest,
-      Set<String> srcSnapFiles,
-      Set<String> destSnapFiles,
-      Set<String> sameFiles,
-      Set<String> differentFiles) {
+  synchronized void internalGetSSTDiffList(DifferSnapshotVersion src, 
DifferSnapshotVersion dest,
+      Map<String, SstFileInfo> sameFiles, Map<String, SstFileInfo> 
differentFiles) {
 
     Preconditions.checkArgument(sameFiles.isEmpty(), "Set must be empty");
     Preconditions.checkArgument(differentFiles.isEmpty(), "Set must be empty");
-
-    for (String fileName : srcSnapFiles) {
-      if (destSnapFiles.contains(fileName)) {
+    Map<String, SstFileInfo> destSnapFiles = dest.getSstFileMap();
+    for (Map.Entry<String, SstFileInfo> sstFileEntry : 
src.getSstFileMap().entrySet()) {
+      String fileName =  sstFileEntry.getKey();
+      SstFileInfo sstFileInfo = sstFileEntry.getValue();
+      if (destSnapFiles.containsKey(fileName)) {
         LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
             src.getDbPath(), dest.getDbPath(), fileName);
-        sameFiles.add(fileName);
+        sameFiles.put(fileName, sstFileInfo);
         continue;
       }
 
       CompactionNode infileNode = compactionDag.getCompactionNode(fileName);
       if (infileNode == null) {
-        LOG.debug("Source '{}' SST file '{}' is never compacted",
-            src.getDbPath(), fileName);
-        differentFiles.add(fileName);
+        LOG.debug("Source '{}' SST file '{}' is never compacted", 
src.getDbPath(), fileName);
+        differentFiles.put(fileName, sstFileInfo);
         continue;
       }
 
@@ -954,15 +965,12 @@ synchronized void internalGetSSTDiffList(
       // Traversal level/depth indicator for debug print
       int level = 1;
       while (!currentLevel.isEmpty()) {
-        LOG.debug("Traversal level: {}. Current level has {} nodes.",
-            level++, currentLevel.size());
+        LOG.debug("Traversal level: {}. Current level has {} nodes.", level++, 
currentLevel.size());
 
         if (level >= 1000000) {
-          final String errorMsg = String.format(
-                  "Graph traversal level exceeded allowed maximum (%d). "
-                  + "This could be due to invalid input generating a "
-                  + "loop in the traversal path. Same SSTs found so far: %s, "
-                  + "different SSTs: %s", level, sameFiles, differentFiles);
+          final String errorMsg = String.format("Graph traversal level 
exceeded allowed maximum (%d). " +
+              "This could be due to invalid input generating a loop in the 
traversal path. Same SSTs found so " +
+              "far: %s, different SSTs: %s", level, sameFiles, differentFiles);
           LOG.error(errorMsg);
           // Clear output in case of error. Expect fall back to full diff
           sameFiles.clear();
@@ -974,43 +982,42 @@ synchronized void internalGetSSTDiffList(
         final Set<CompactionNode> nextLevel = new HashSet<>();
         for (CompactionNode current : currentLevel) {
           LOG.debug("Processing node: '{}'", current.getFileName());
-          if (current.getSnapshotGeneration() < dest.getSnapshotGeneration()) {
+          if (current.getSnapshotGeneration() < dest.getGeneration()) {
             LOG.debug("Current node's snapshot generation '{}' "
                     + "reached destination snapshot's '{}'. "
                     + "Src '{}' and dest '{}' have different SST file: '{}'",
-                current.getSnapshotGeneration(), dest.getSnapshotGeneration(),
+                current.getSnapshotGeneration(), dest.getGeneration(),
                 src.getDbPath(), dest.getDbPath(), current.getFileName());
-            differentFiles.add(current.getFileName());
+            differentFiles.put(current.getFileName(), current);
             continue;
           }
 
           Set<CompactionNode> successors = 
compactionDag.getForwardCompactionDAG().successors(current);
           if (successors.isEmpty()) {
-            LOG.debug("No further compaction happened to the current file. " +
-                "Src '{}' and dest '{}' have different file: {}",
-                src.getDbPath(), dest.getDbPath(), current.getFileName());
-            differentFiles.add(current.getFileName());
+            LOG.debug("No further compaction happened to the current file. Src 
'{}' and dest '{}' " +
+                    "have different file: {}", src.getDbPath(), 
dest.getDbPath(), current.getFileName());
+            differentFiles.put(current.getFileName(), current);
             continue;
           }
 
           for (CompactionNode nextNode : successors) {
-            if (sameFiles.contains(nextNode.getFileName()) ||
-                differentFiles.contains(nextNode.getFileName())) {
+            if (sameFiles.containsKey(nextNode.getFileName()) ||
+                differentFiles.containsKey(nextNode.getFileName())) {
               LOG.debug("Skipping known processed SST: {}",
                   nextNode.getFileName());
               continue;
             }
 
-            if (destSnapFiles.contains(nextNode.getFileName())) {
-              LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
-                  src.getDbPath(), dest.getDbPath(), nextNode.getFileName());
-              sameFiles.add(nextNode.getFileName());
+            if (destSnapFiles.containsKey(nextNode.getFileName())) {
+              LOG.debug("Src '{}' and dest '{}' have the same SST: {}", 
src.getDbPath(), dest.getDbPath(),
+                  nextNode.getFileName());
+              sameFiles.put(nextNode.getFileName(), 
destSnapFiles.get(nextNode.getFileName()));
               continue;
             }
 
             // Queue different SST to the next level
-            LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
-                src.getDbPath(), dest.getDbPath(), nextNode.getFileName());
+            LOG.debug("Src '{}' and dest '{}' have a different SST: {}", 
src.getDbPath(), dest.getDbPath(),
+                nextNode.getFileName());
             nextLevel.add(nextNode);
           }
         }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
index 7d9512768bc..c85c02f4c3b 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
@@ -20,17 +20,11 @@
 import static org.apache.hadoop.hdds.StringUtils.getFirstNChars;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
-import org.apache.ozone.compaction.log.CompactionFileInfo;
 import org.apache.ozone.rocksdb.util.SstFileInfo;
-import org.rocksdb.LiveFileMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,40 +48,41 @@ public static boolean isKeyWithPrefixPresent(String 
prefixForColumnFamily,
         && prefixForColumnFamily.compareTo(endKeyPrefix) <= 0;
   }
 
-  public static void filterRelevantSstFiles(Set<String> inputFiles,
-                                            TablePrefixInfo tablePrefixInfo,
-                                            Set<String> columnFamiliesToLookup,
-                                            ManagedRocksDB... dbs) {
-    filterRelevantSstFiles(inputFiles, tablePrefixInfo, 
Collections.emptyMap(), columnFamiliesToLookup, dbs);
+  /**
+   * Filter sst files based on prefixes. The map of sst files to be filtered 
would be mutated.
+   * @param <T> Type of the key in the map.
+   * @param filesMapToBeFiltered Map of sst files to be filtered.
+   * @param tablesToLookup Set of column families to be included in the diff.
+   * @param tablePrefixInfo TablePrefixInfo to filter irrelevant SST files.
+   */
+  public static <T> Map<T, SstFileInfo> filterRelevantSstFiles(Map<T, 
SstFileInfo> filesMapToBeFiltered,
+      Set<String> tablesToLookup, TablePrefixInfo tablePrefixInfo) {
+    for (Iterator<Map.Entry<T, SstFileInfo>> fileIterator = 
filesMapToBeFiltered.entrySet().iterator();
+         fileIterator.hasNext();) {
+      SstFileInfo sstFileInfo = fileIterator.next().getValue();
+      if (shouldSkipNode(sstFileInfo, tablePrefixInfo, tablesToLookup)) {
+        fileIterator.remove();
+      }
+    }
+    return filesMapToBeFiltered;
   }
 
   /**
-   * Filter sst files based on prefixes.
+   * Filter sst files based on prefixes. The set of sst files to be filtered 
would be mutated.
+   * @param <T> Type of the key in the map.
+   * @param filesToBeFiltered sst files to be filtered.
+   * @param tablesToLookup Set of column families to be included in the diff.
+   * @param tablePrefixInfo TablePrefixInfo to filter irrelevant SST files.
    */
-  public static void filterRelevantSstFiles(Set<String> inputFiles,
-                                            TablePrefixInfo tablePrefixInfo,
-                                            Map<String, CompactionNode> 
preExistingCompactionNodes,
-                                            Set<String> columnFamiliesToLookup,
-                                            ManagedRocksDB... dbs) {
-    Map<String, LiveFileMetaData> liveFileMetaDataMap = new HashMap<>();
-    int dbIdx = 0;
-    for (Iterator<String> fileIterator =
-         inputFiles.iterator(); fileIterator.hasNext();) {
-      String filename = FilenameUtils.getBaseName(fileIterator.next());
-      while (!preExistingCompactionNodes.containsKey(filename) && 
!liveFileMetaDataMap.containsKey(filename)
-          && dbIdx < dbs.length) {
-        liveFileMetaDataMap.putAll(dbs[dbIdx].getLiveMetadataForSSTFiles());
-        dbIdx += 1;
-      }
-      CompactionNode compactionNode = preExistingCompactionNodes.get(filename);
-      if (compactionNode == null) {
-        compactionNode = new CompactionNode(new 
CompactionFileInfo.Builder(filename)
-            .setValues(liveFileMetaDataMap.get(filename)).build());
-      }
-      if (shouldSkipNode(compactionNode, tablePrefixInfo, 
columnFamiliesToLookup)) {
+  public static <T> Set<SstFileInfo> filterRelevantSstFiles(Set<SstFileInfo> 
filesToBeFiltered,
+      Set<String> tablesToLookup, TablePrefixInfo tablePrefixInfo) {
+    for (Iterator<SstFileInfo> fileIterator = filesToBeFiltered.iterator(); 
fileIterator.hasNext();) {
+      SstFileInfo sstFileInfo = fileIterator.next();
+      if (shouldSkipNode(sstFileInfo, tablePrefixInfo, tablesToLookup)) {
         fileIterator.remove();
       }
     }
+    return filesToBeFiltered;
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index d24790dfcfc..8af38c5454b 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -41,11 +41,14 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.graph.MutableGraph;
@@ -67,6 +70,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -76,10 +80,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.StringUtils;
@@ -104,7 +106,8 @@
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.compaction.log.CompactionFileInfo;
 import org.apache.ozone.compaction.log.CompactionLogEntry;
-import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
+import 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotVersion;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -112,6 +115,7 @@
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -131,6 +135,8 @@
  * Test RocksDBCheckpointDiffer basic functionality.
  */
 public class TestRocksDBCheckpointDiffer {
+  @TempDir
+  private static File dbDir;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
 
@@ -279,8 +285,6 @@ public class TestRocksDBCheckpointDiffer {
 
   private final List<File> cpDirList = new ArrayList<>();
 
-  private final List<List<ColumnFamilyHandle>> colHandles = new ArrayList<>();
-
   private static final String ACTIVE_DB_DIR_NAME = "./rocksdb-data";
   private static final String METADATA_DIR_NAME = "./metadata";
   private static final String COMPACTION_LOG_DIR_NAME = "compaction-log";
@@ -408,7 +412,7 @@ public void cleanUp() {
     }
   }
 
-  private static List<CompactionLogEntry> getPrunedCompactionEntries(boolean 
prune, Map<String, String[]> metadata) {
+  private static List<CompactionLogEntry> getPrunedCompactionEntries(boolean 
prune, Map<String, SstFileInfo> metadata) {
     List<CompactionLogEntry> entries = new ArrayList<>();
     if (!prune) {
       entries.add(createCompactionEntry(1,
@@ -431,6 +435,122 @@ private static List<CompactionLogEntry> 
getPrunedCompactionEntries(boolean prune
     return entries;
   }
 
+  private static DifferSnapshotInfo mockDifferSnapshotVersion(String dbPath, 
long generation) {
+    DifferSnapshotInfo differSnapshotInfo = mock(DifferSnapshotInfo.class);
+    when(differSnapshotInfo.getDbPath(anyInt())).thenReturn(Paths.get(dbPath));
+    when(differSnapshotInfo.getGeneration()).thenReturn(generation);
+    return differSnapshotInfo;
+  }
+
+  private static Stream<Arguments> getSSTDiffListWithoutCompactionDAGCase() {
+    return Stream.of(
+        Arguments.of("Delta File with same source and target",
+        ImmutableList.of(
+            new SstFileInfo("1", "ac", "ae", "cf1"),
+            new SstFileInfo("2", "ad", "ag", "cf1")),
+        ImmutableList.of(
+            new SstFileInfo("1", "ac", "ae", "cf1"),
+            new SstFileInfo("2", "ad", "ag", "cf1")),
+        ImmutableMap.of("cf1", "a", "cf2", "z"), ImmutableSet.of("cf1"), 
Collections.emptyList()),
+        Arguments.of("Delta File with source having more files",
+            ImmutableList.of(
+                new SstFileInfo("2", "ad", "ag", "cf1"),
+                new SstFileInfo("3", "af", "ah", "cf1")),
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ad", "ag", "cf1"),
+                new SstFileInfo("3", "af", "ah", "cf1")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"),
+            ImmutableSet.of("cf1"),
+            ImmutableList.of(new SstFileInfo("1", "ac", "ae", "cf1"))),
+        Arguments.of("Delta File with target having more files",
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ad", "ag", "cf1"),
+                new SstFileInfo("3", "af", "ah", "cf1")),
+            ImmutableList.of(
+                new SstFileInfo("2", "ad", "ag", "cf1"),
+                new SstFileInfo("3", "af", "ah", "cf1")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"),
+            ImmutableSet.of("cf1"),
+            ImmutableList.of(new SstFileInfo("1", "ac", "ae", "cf1"))),
+        Arguments.of("Delta File computation with source files with invalid 
prefix",
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "bh", "bi", "cf1")),
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("4", "af", "ai", "cf1")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"),
+            ImmutableSet.of("cf1"),
+            ImmutableList.of(new SstFileInfo("4", "af", "ai", "cf1"))),
+        Arguments.of("Delta File computation with target files with invalid 
prefix",
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1")),
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("4", "bf", "bi", "cf1")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"),
+            ImmutableSet.of("cf1"),
+            ImmutableList.of(new SstFileInfo("2", "ah", "ai", "cf1"))),
+        Arguments.of("Delta File computation with target files with multiple 
tables",
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1"),
+                new SstFileInfo("3", "ah", "ai", "cf3")),
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1"),
+                new SstFileInfo("5", "af", "ai", "cf4")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"), ImmutableSet.of("cf1"), 
Collections.emptyList()),
+        Arguments.of("Delta File computation with target files with multiple 
tables to lookup on source",
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1"),
+                new SstFileInfo("3", "ah", "ai", "cf3")),
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1"),
+                new SstFileInfo("5", "af", "ai", "cf4")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"),
+            ImmutableSet.of("cf1", "cf3"),
+            ImmutableList.of(new SstFileInfo("3", "ah", "ai", "cf3"))),
+        Arguments.of("Delta File computation with target files with multiple 
tables to lookup on target",
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1"),
+                new SstFileInfo("3", "ah", "ai", "cf3")),
+            ImmutableList.of(
+                new SstFileInfo("1", "ac", "ae", "cf1"),
+                new SstFileInfo("2", "ah", "ai", "cf1"),
+                new SstFileInfo("5", "af", "ai", "cf4")),
+            ImmutableMap.of("cf1", "a", "cf2", "z"),
+            ImmutableSet.of("cf1", "cf4"),
+            ImmutableList.of(new SstFileInfo("5", "af", "ai", "cf4")))
+        );
+  }
+
+  private DifferSnapshotInfo getDifferSnapshotInfoForVersion(List<SstFileInfo> 
sstFiles, int version) {
+    TreeMap<Integer, List<SstFileInfo>> sourceSstFileMap = new TreeMap<>();
+    sourceSstFileMap.put(version, sstFiles);
+    return new DifferSnapshotInfo(v -> Paths.get("src"), UUID.randomUUID(), 0, 
sourceSstFileMap);
+  }
+
+  @ParameterizedTest
+  @MethodSource("getSSTDiffListWithoutCompactionDAGCase")
+  public void testGetSSTDiffListWithoutCompactionDag(String description, 
List<SstFileInfo> sourceSstFiles,
+      List<SstFileInfo> destSstFiles, Map<String, String> prefixMap, 
Set<String> tablesToLookup,
+      List<SstFileInfo> expectedDiffList) {
+    DifferSnapshotInfo sourceDSI = 
getDifferSnapshotInfoForVersion(sourceSstFiles, 0);
+    DifferSnapshotVersion sourceVersion = new DifferSnapshotVersion(sourceDSI, 
0, tablesToLookup);
+    DifferSnapshotInfo destDSI = getDifferSnapshotInfoForVersion(destSstFiles, 
1);
+    DifferSnapshotVersion destVersion = new DifferSnapshotVersion(destDSI, 1, 
tablesToLookup);
+    List<SstFileInfo> diffList = 
rocksDBCheckpointDiffer.getSSTDiffList(sourceVersion, destVersion,
+        new TablePrefixInfo(prefixMap), tablesToLookup, false).orElse(null);
+    assertEquals(expectedDiffList, diffList);
+  }
+
   /**
    * Test cases for testGetSSTDiffListWithoutDB.
    */
@@ -504,21 +624,15 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             Arrays.asList("000105", "000095", "000088"),
             Collections.singletonList("000107"))
     );
-
-    DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
-        "/path/to/dbcp1", UUID.randomUUID(), 3008L, null, 
Mockito.mock(ManagedRocksDB.class));
-    DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
-        "/path/to/dbcp2", UUID.randomUUID(), 14980L, null, 
Mockito.mock(ManagedRocksDB.class));
-    DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo(
-        "/path/to/dbcp3", UUID.randomUUID(), 17975L, null, 
Mockito.mock(ManagedRocksDB.class));
-    DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
-        "/path/to/dbcp4", UUID.randomUUID(), 18000L, null, 
Mockito.mock(ManagedRocksDB.class));
+    Path baseDir = 
dbDir.toPath().resolve("path").resolve("to").toAbsolutePath();
+    DifferSnapshotInfo snapshotInfo1 = 
mockDifferSnapshotVersion(baseDir.resolve("dbcp1").toString(), 3008L);
+    DifferSnapshotInfo snapshotInfo2 = 
mockDifferSnapshotVersion(baseDir.resolve("dbcp2").toString(), 14980L);
+    DifferSnapshotInfo snapshotInfo3 = 
mockDifferSnapshotVersion(baseDir.resolve("dbcp3").toString(), 17975L);
+    DifferSnapshotInfo snapshotInfo4 = 
mockDifferSnapshotVersion(baseDir.resolve("dbcp4").toString(), 18000L);
 
     TablePrefixInfo prefixMap = new TablePrefixInfo(ImmutableMap.of("col1", 
"c", "col2", "d"));
-    DifferSnapshotInfo snapshotInfo5 = new DifferSnapshotInfo(
-        "/path/to/dbcp2", UUID.randomUUID(), 0L, prefixMap, 
Mockito.mock(ManagedRocksDB.class));
-    DifferSnapshotInfo snapshotInfo6 = new DifferSnapshotInfo(
-        "/path/to/dbcp2", UUID.randomUUID(), 100L, prefixMap, 
Mockito.mock(ManagedRocksDB.class));
+    DifferSnapshotInfo snapshotInfo5 = 
mockDifferSnapshotVersion(baseDir.resolve("dbcp2").toString(), 0L);
+    DifferSnapshotInfo snapshotInfo6 = 
mockDifferSnapshotVersion(baseDir.resolve("dbcp2").toString(), 100L);
 
     Set<String> snapshotSstFiles1 = ImmutableSet.of("000059", "000053");
     Set<String> snapshotSstFiles2 = ImmutableSet.of("000088", "000059",
@@ -550,7 +664,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
                 "000095"),
             ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
                 "000095"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 2: Compaction log file crafted input: " +
                 "One source ('to' snapshot) SST file is never compacted " +
                 "(newly flushed)",
@@ -563,7 +677,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
             ImmutableSet.of("000108"),
             ImmutableSet.of("000108"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 3: Compaction log file crafted input: " +
                 "Same SST files found during SST expansion",
             compactionLog,
@@ -575,7 +689,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("000066", "000059", "000053"),
             ImmutableSet.of("000080", "000087", "000073", "000095"),
             ImmutableSet.of("000080", "000087", "000073", "000095"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 4: Compaction log file crafted input: " +
                 "Skipping known processed SST.",
             compactionLog,
@@ -587,7 +701,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             Collections.emptySet(),
             Collections.emptySet(),
             Collections.emptySet(),
-            true, Collections.emptyMap()),
+            true, Collections.emptyMap(), null),
         Arguments.of("Test 5: Compaction log file hit snapshot" +
                 " generation early exit condition",
             compactionLog,
@@ -599,7 +713,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("000059", "000053"),
             ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
             ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 6: Compaction log table regular case. " +
                 "Expands expandable SSTs in the initial diff.",
             null,
@@ -613,7 +727,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
                 "000095"),
             ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
                 "000095"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 7: Compaction log table crafted input: " +
                 "One source ('to' snapshot) SST file is never compacted " +
                 "(newly flushed)",
@@ -626,7 +740,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
             ImmutableSet.of("000108"),
             ImmutableSet.of("000108"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 8: Compaction log table crafted input: " +
                 "Same SST files found during SST expansion",
             null,
@@ -638,7 +752,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("000066", "000059", "000053"),
             ImmutableSet.of("000080", "000087", "000073", "000095"),
             ImmutableSet.of("000080", "000087", "000073", "000095"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 9: Compaction log table crafted input: " +
                 "Skipping known processed SST.",
             null,
@@ -650,7 +764,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             Collections.emptySet(),
             Collections.emptySet(),
             Collections.emptySet(),
-            true, Collections.emptyMap()),
+            true, Collections.emptyMap(), null),
         Arguments.of("Test 10: Compaction log table hit snapshot " +
                 "generation early exit condition",
             null,
@@ -662,7 +776,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("000059", "000053"),
             ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
             ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), null),
         Arguments.of("Test 11: Older Compaction log got pruned and source 
snapshot delta files would be " +
                 "unreachable",
             null,
@@ -674,7 +788,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("1", "3", "13", "14"),
             ImmutableSet.of("2", "8", "9", "12"),
             ImmutableSet.of("2", "8", "9", "12"),
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), prefixMap),
         Arguments.of("Test 12: Older Compaction log got pruned and source 
snapshot delta files would be " +
                 "unreachable",
             null,
@@ -686,22 +800,22 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("3", "13", "14"),
             ImmutableSet.of("4", "5", "8", "9", "12"),
             null,
-            false, Collections.emptyMap()),
+            false, Collections.emptyMap(), prefixMap),
         Arguments.of("Test 13: Compaction log to test filtering logic based on 
range and column family",
             null,
             getPrunedCompactionEntries(false,
-                new HashMap<String, String[]>() {{
-                  put("1", new String[]{"a", "c", "col1"});
-                  put("3", new String[]{"a", "d", "col2"});
-                  put("13", new String[]{"a", "c", "col13"});
-                  put("14", new String[]{"a", "c", "col1"});
-                  put("2", new String[]{"a", "c", "col1"});
-                  put("4", new String[]{"a", "b", "col1"});
-                  put("5", new String[]{"b", "b", "col1"});
-                  put("10", new String[]{"a", "b", "col1"});
-                  put("8", new String[]{"a", "b", "col1"});
-                  put("6", new String[]{"a", "z", "col13"});
-                  put("7", new String[]{"a", "z", "col13"});
+                new HashMap<String, SstFileInfo>() {{
+                  put("1", new SstFileInfo("1", "a", "c", "col1"));
+                  put("3", new SstFileInfo("3", "a", "d", "col2"));
+                  put("13", new SstFileInfo("13", "a", "c", "col13"));
+                  put("14", new SstFileInfo("14", "a", "c", "col1"));
+                  put("2", new SstFileInfo("2", "a", "c", "col1"));
+                  put("4", new SstFileInfo("4", "a", "b", "col1"));
+                  put("5", new SstFileInfo("5", "b", "b", "col1"));
+                  put("10", new SstFileInfo("10", "a", "b", "col1"));
+                  put("8", new SstFileInfo("8", "a", "b", "col1"));
+                  put("6", new SstFileInfo("6", "a", "z", "col13"));
+                  put("7", new SstFileInfo("7", "a", "z", "col13"));
                 }}),
             snapshotInfo6,
             snapshotInfo5,
@@ -712,12 +826,12 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
             ImmutableSet.of("2", "9", "12"),
             false,
             ImmutableMap.of(
-                "2", new String[]{"a", "b", "col1"},
-                "12", new String[]{"a", "d", "col2"},
-                "8", new String[]{"a", "b", "col1"},
-                "9", new String[]{"a", "c", "col1"},
-                "15", new String[]{"a", "z", "col13"}
-            ))
+                "2", new SstFileInfo("2", "a", "b", "col1"),
+                "12", new SstFileInfo("12", "a", "d", "col2"),
+                "8", new SstFileInfo("8", "a", "b", "col1"),
+                "9", new SstFileInfo("9", "a", "c", "col1"),
+                "15", new SstFileInfo("15", "a", "z", "col13")
+            ), prefixMap)
 
     );
   }
@@ -740,7 +854,8 @@ public void testGetSSTDiffListWithoutDB(String description,
       Set<String> expectedDiffSstFiles,
       Set<String> expectedSSTDiffFiles,
       boolean expectingException,
-      Map<String, String[]> metaDataMap) {
+      Map<String, SstFileInfo> metaDataMap,
+      TablePrefixInfo prefixInfo) {
 
     boolean exceptionThrown = false;
     if (compactionLog != null) {
@@ -756,15 +871,33 @@ public void testGetSSTDiffListWithoutDB(String 
description,
     }
     rocksDBCheckpointDiffer.loadAllCompactionLogs();
 
-    Set<String> actualSameSstFiles = new HashSet<>();
-    Set<String> actualDiffSstFiles = new HashSet<>();
+    Set<String> tablesToLookup;
+    String dummyTable;
+    if (prefixInfo != null) {
+      tablesToLookup = prefixInfo.getTableNames();
+      dummyTable = tablesToLookup.stream().findAny().get();
+    } else {
+      tablesToLookup = mock(Set.class);
+      when(tablesToLookup.contains(anyString())).thenReturn(true);
+      dummyTable = "dummy";
+    }
 
+    Map<String, SstFileInfo> actualSameSstFiles = new HashMap<>();
+    Map<String, SstFileInfo> actualDiffSstFiles = new HashMap<>();
+    List<SstFileInfo> sourceSnapshotFiles = srcSnapshotSstFiles.stream()
+        .map(fileName -> new SstFileInfo(fileName, "", "", dummyTable))
+        .collect(Collectors.toList());
+    List<SstFileInfo> destSnapshotFiles = destSnapshotSstFiles.stream()
+        .map(fileName -> new SstFileInfo(fileName, "", "", dummyTable))
+        .collect(Collectors.toList());
+    when(srcSnapshot.getSstFiles(eq(0), 
eq(tablesToLookup))).thenReturn(sourceSnapshotFiles);
+    when(destSnapshot.getSstFiles(eq(0), 
eq(tablesToLookup))).thenReturn(destSnapshotFiles);
+    DifferSnapshotVersion srcVersion = new DifferSnapshotVersion(srcSnapshot, 
0, tablesToLookup);
+    DifferSnapshotVersion destVersion = new 
DifferSnapshotVersion(destSnapshot, 0, tablesToLookup);
     try {
       rocksDBCheckpointDiffer.internalGetSSTDiffList(
-          srcSnapshot,
-          destSnapshot,
-          srcSnapshotSstFiles,
-          destSnapshotSstFiles,
+          srcVersion,
+          destVersion,
           actualSameSstFiles,
           actualDiffSstFiles);
     } catch (RuntimeException rtEx) {
@@ -780,57 +913,30 @@ public void testGetSSTDiffListWithoutDB(String 
description,
     }
 
     // Check same and different SST files result
-    assertEquals(expectedSameSstFiles, actualSameSstFiles);
-    assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
-    try (MockedStatic<RdbUtil> mockedHandler = 
Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) {
-      RocksDB rocksDB = Mockito.mock(RocksDB.class);
-      Mockito.when(rocksDB.getName()).thenReturn("dummy");
-      Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB);
-      Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB);
-      Mockito.when(srcSnapshot.getRocksDB().getLiveMetadataForSSTFiles())
-          .thenAnswer(invocation -> 
srcSnapshotSstFiles.stream().filter(metaDataMap::containsKey).map(file -> {
-            LiveFileMetaData liveFileMetaData = 
Mockito.mock(LiveFileMetaData.class);
-            String[] metaData = metaDataMap.get(file);
-            Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + file + 
SST_FILE_EXTENSION);
-            
Mockito.when(liveFileMetaData.smallestKey()).thenReturn(metaData[0].getBytes(UTF_8));
-            
Mockito.when(liveFileMetaData.largestKey()).thenReturn(metaData[1].getBytes(UTF_8));
-            
Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(metaData[2].getBytes(UTF_8));
-            return liveFileMetaData;
-          }).collect(Collectors.toMap(liveFileMetaData -> 
FilenameUtils.getBaseName(liveFileMetaData.fileName()),
-              Function.identity())));
-      Set<String> tablesToLookup;
-      String dummyTable;
-      if (srcSnapshot.getTablePrefixes() != null) {
-        tablesToLookup = srcSnapshot.getTablePrefixes().getTableNames();
-        dummyTable = tablesToLookup.stream().findAny().get();
+    assertEquals(expectedSameSstFiles, actualSameSstFiles.keySet());
+    assertEquals(expectedDiffSstFiles, actualDiffSstFiles.keySet());
+    when(srcSnapshot.getSstFiles(eq(0), eq(tablesToLookup)))
+        .thenAnswer(invocation -> srcSnapshotSstFiles.stream()
+            .map(file -> metaDataMap.getOrDefault(file, new SstFileInfo(file, 
null, null, null)))
+            .collect(Collectors.toList()));
+    when(destSnapshot.getSstFiles(eq(0), eq(tablesToLookup)))
+        .thenAnswer(invocation -> destSnapshotSstFiles.stream()
+            .map(file -> metaDataMap.getOrDefault(file, new SstFileInfo(file, 
null, null, null)))
+            .collect(Collectors.toList()));
+
+    try {
+      Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
+              .map(files -> 
files.stream().sorted().collect(Collectors.toList())).orElse(null),
+          rocksDBCheckpointDiffer.getSSTDiffList(
+                  new DifferSnapshotVersion(srcSnapshot, 0, tablesToLookup),
+                  new DifferSnapshotVersion(destSnapshot, 0, tablesToLookup), 
prefixInfo, tablesToLookup,
+                  true)
+              .map(i -> 
i.stream().map(SstFileInfo::getFileName).sorted().collect(Collectors.toList())).orElse(null));
+    } catch (RuntimeException rtEx) {
+      if (!expectingException) {
+        fail("Unexpected exception thrown in test.");
       } else {
-        tablesToLookup = mock(Set.class);
-        when(tablesToLookup.contains(anyString())).thenReturn(true);
-        dummyTable = "dummy";
-      }
-      mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any()))
-          .thenAnswer(i -> {
-            Set<String> sstFiles = 
i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles
-                : destSnapshotSstFiles;
-            return sstFiles.stream().map(fileName -> {
-              LiveFileMetaData liveFileMetaData = mock(LiveFileMetaData.class);
-              when(liveFileMetaData.fileName()).thenReturn("/" + fileName + 
SST_FILE_EXTENSION);
-              
when(liveFileMetaData.columnFamilyName()).thenReturn(dummyTable.getBytes(UTF_8));
-              return liveFileMetaData;
-            }).collect(Collectors.toList());
-          });
-      try {
-        Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
-                .map(files -> 
files.stream().sorted().collect(Collectors.toList())).orElse(null),
-            rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot, 
tablesToLookup)
-                .map(i -> 
i.stream().sorted().collect(Collectors.toList())).orElse(null));
-      } catch (RuntimeException rtEx) {
-        if (!expectingException) {
-          rtEx.printStackTrace();
-          fail("Unexpected exception thrown in test.");
-        } else {
-          exceptionThrown = true;
-        }
+        exceptionThrown = true;
       }
     }
     if (expectingException && !exceptionThrown) {
@@ -878,19 +984,6 @@ void testDifferWithDB() throws Exception {
     if (LOG.isDebugEnabled()) {
       rocksDBCheckpointDiffer.dumpCompactionNodeTable();
     }
-
-    cleanUpSnapshots();
-  }
-
-  public void cleanUpSnapshots() {
-    for (DifferSnapshotInfo snap : snapshots) {
-      snap.getRocksDB().close();
-    }
-    for (List<ColumnFamilyHandle> colHandle : colHandles) {
-      for (ColumnFamilyHandle handle : colHandle) {
-        handle.close();
-      }
-    }
   }
 
   private static List<ColumnFamilyDescriptor> getColumnFamilyDescriptors() {
@@ -941,18 +1034,21 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
           if 
(rocksDBCheckpointDiffer.getCompactionNodeMap().containsKey(diffFile)) {
             columnFamily = 
rocksDBCheckpointDiffer.getCompactionNodeMap().get(diffFile).getColumnFamily();
           } else {
-            columnFamily = 
bytes2String(src.getRocksDB().getLiveMetadataForSSTFiles().get(diffFile).columnFamilyName());
+            columnFamily = src.getSstFile(0, diffFile).getColumnFamily();
           }
           if (columnFamily == null || tableToLookUp.contains(columnFamily)) {
             expectedDiffFiles.add(diffFile);
           }
         }
-        List<String> sstDiffList = differ.getSSTDiffList(src, snap, 
tableToLookUp).orElse(Collections.emptyList());
+        DifferSnapshotVersion srcSnapVersion = new DifferSnapshotVersion(src, 
0, tableToLookUp);
+        DifferSnapshotVersion destSnapVersion = new 
DifferSnapshotVersion(snap, 0, tableToLookUp);
+        List<SstFileInfo> sstDiffList = differ.getSSTDiffList(srcSnapVersion, 
destSnapVersion, null,
+                tableToLookUp, true).orElse(Collections.emptyList());
         LOG.info("SST diff list from '{}' to '{}': {} tables: {}",
-            src.getDbPath(), snap.getDbPath(), sstDiffList, tableToLookUp);
-
-        assertEquals(expectedDiffFiles, sstDiffList);
+            src.getDbPath(0), snap.getDbPath(0), sstDiffList, tableToLookUp);
 
+        assertEquals(expectedDiffFiles, 
sstDiffList.stream().map(SstFileInfo::getFileName)
+            .collect(Collectors.toList()));
       }
 
       ++index;
@@ -980,12 +1076,14 @@ private void createCheckpoint(ManagedRocksDB rocksDB) 
throws RocksDBException {
     createCheckPoint(ACTIVE_DB_DIR_NAME, cpPath, rocksDB);
     final UUID snapshotId = UUID.randomUUID();
     List<ColumnFamilyHandle> colHandle = new ArrayList<>();
-    colHandles.add(colHandle);
-    final DifferSnapshotInfo currentSnapshot =
-        new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration, null,
-            ManagedRocksDB.openReadOnly(cpPath, getColumnFamilyDescriptors(),
-                colHandle));
-    this.snapshots.add(currentSnapshot);
+    try (ManagedRocksDB rdb = ManagedRocksDB.openReadOnly(cpPath, 
getColumnFamilyDescriptors(), colHandle)) {
+      TreeMap<Integer, List<SstFileInfo>> versionSstFilesMap = new TreeMap<>();
+      versionSstFilesMap.put(0, 
rdb.getLiveMetadataForSSTFiles().values().stream().map(SstFileInfo::new)
+          .collect(Collectors.toList()));
+      final DifferSnapshotInfo currentSnapshot = new 
DifferSnapshotInfo((version) -> Paths.get(cpPath),
+          snapshotId, snapshotGeneration, versionSstFilesMap);
+      this.snapshots.add(currentSnapshot);
+    }
 
     long t2 = Time.monotonicNow();
     LOG.trace("Current time: " + t2);
@@ -1347,18 +1445,18 @@ private static CompactionLogEntry 
createCompactionEntry(long dbSequenceNumber,
                                                           long compactionTime,
                                                           List<String> 
inputFiles,
                                                           List<String> 
outputFiles,
-                                                          Map<String, 
String[]> metadata) {
+                                                          Map<String, 
SstFileInfo> metadata) {
     return new CompactionLogEntry.Builder(dbSequenceNumber, compactionTime,
         toFileInfoList(inputFiles, metadata), toFileInfoList(outputFiles, 
metadata)).build();
   }
 
   private static List<CompactionFileInfo> toFileInfoList(List<String> files,
-                                                         Map<String, String[]> 
metadata) {
+                                                         Map<String, 
SstFileInfo> metadata) {
     return files.stream()
         .map(fileName -> new CompactionFileInfo.Builder(fileName)
-            
.setStartRange(Optional.ofNullable(metadata.get(fileName)).map(meta -> 
meta[0]).orElse(null))
-            .setEndRange(Optional.ofNullable(metadata.get(fileName)).map(meta 
-> meta[1]).orElse(null))
-            
.setColumnFamily(Optional.ofNullable(metadata.get(fileName)).map(meta -> 
meta[2]).orElse(null))
+            
.setStartRange(Optional.ofNullable(metadata.get(fileName)).map(SstFileInfo::getStartKey).orElse(null))
+            
.setEndRange(Optional.ofNullable(metadata.get(fileName)).map(SstFileInfo::getEndKey).orElse(null))
+            
.setColumnFamily(Optional.ofNullable(metadata.get(fileName)).map(SstFileInfo::getColumnFamily).orElse(null))
             .build())
         .collect(Collectors.toList());
   }
@@ -1621,25 +1719,36 @@ public void testGetSSTDiffListWithoutDB2(
 
     // Snapshot is used for logging purpose and short-circuiting traversal.
     // Using gen 0 for this test.
+    List<SstFileInfo> srcSnapshotSstFileInfoSet = srcSnapshotSstFiles.stream()
+        .map(fileName -> new SstFileInfo(fileName, "", "", 
"cf1")).collect(Collectors.toList());
+    List<SstFileInfo> destSnapshotSstFileInfoSet = 
destSnapshotSstFiles.stream()
+        .map(fileName -> new SstFileInfo(fileName, "", "", 
"cf1")).collect(Collectors.toList());
+    TreeMap<Integer, List<SstFileInfo>> srcSnapshotSstFileInfoMap = new 
TreeMap<>();
+    srcSnapshotSstFileInfoMap.put(0, srcSnapshotSstFileInfoSet);
+    TreeMap<Integer, List<SstFileInfo>> destSnapshotSstFileInfoMap = new 
TreeMap<>();
+    destSnapshotSstFileInfoMap.put(0, destSnapshotSstFileInfoSet);
+    Path path1 = 
dbDir.toPath().resolve("path").resolve("to").resolve("dbcp1").toAbsolutePath();
+    Path path2 = 
dbDir.toPath().resolve("path").resolve("to").resolve("dbcp2").toAbsolutePath();
     DifferSnapshotInfo mockedSourceSnapshot = new DifferSnapshotInfo(
-        "/path/to/dbcp1", UUID.randomUUID(), 0L, columnFamilyPrefixInfo, null);
+        (version) -> path1, UUID.randomUUID(), 0L, srcSnapshotSstFileInfoMap);
     DifferSnapshotInfo mockedDestinationSnapshot = new DifferSnapshotInfo(
-        "/path/to/dbcp2", UUID.randomUUID(), 0L, columnFamilyPrefixInfo, null);
-
-    Set<String> actualSameSstFiles = new HashSet<>();
-    Set<String> actualDiffSstFiles = new HashSet<>();
-
+        (version) -> path2, UUID.randomUUID(), 0L, destSnapshotSstFileInfoMap);
+
+    Map<String, SstFileInfo> actualSameSstFiles = new HashMap<>();
+    Map<String, SstFileInfo> actualDiffSstFiles = new HashMap<>();
+    DifferSnapshotVersion srcSnapshotVersion = new 
DifferSnapshotVersion(mockedSourceSnapshot, 0,
+        Collections.singleton("cf1"));
+    DifferSnapshotVersion destSnapshotVersion = new 
DifferSnapshotVersion(mockedDestinationSnapshot, 0,
+        Collections.singleton("cf1"));
     rocksDBCheckpointDiffer.internalGetSSTDiffList(
-        mockedSourceSnapshot,
-        mockedDestinationSnapshot,
-        srcSnapshotSstFiles,
-        destSnapshotSstFiles,
+        srcSnapshotVersion,
+        destSnapshotVersion,
         actualSameSstFiles,
         actualDiffSstFiles);
 
     // Check same and different SST files result
-    assertEquals(expectedSameSstFiles, actualSameSstFiles);
-    assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
+    assertEquals(expectedSameSstFiles, actualSameSstFiles.keySet());
+    assertEquals(expectedDiffSstFiles, actualDiffSstFiles.keySet());
   }
 
   private static Stream<Arguments> shouldSkipNodeCases() {
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
index a44baf1905f..08ff90ab6cc 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
@@ -24,27 +24,21 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
 import org.assertj.core.util.Sets;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.Mockito;
-import org.rocksdb.LiveFileMetaData;
-import org.rocksdb.RocksDB;
 
 /**
  * Class to test RocksDiffUtils.
@@ -80,9 +74,6 @@ public void testFilterFunction() {
 
   public static Stream<Arguments> values() {
     return Stream.of(
-        arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "b", 
"f"),
-        arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "e", 
"f"),
-        arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "a", 
"f"),
         arguments("validColumnFamily", "validColumnFamily", "a", "d", "e", 
"g"),
         arguments("validColumnFamily", "validColumnFamily", "e", "g", "a", 
"d"),
         arguments("validColumnFamily", "validColumnFamily", "b", "b", "e", 
"g"),
@@ -92,95 +83,40 @@ public static Stream<Arguments> values() {
 
   @ParameterizedTest
   @MethodSource("values")
-  public void testFilterRelevantSstFilesWithPreExistingCompactionInfo(String 
validSSTColumnFamilyName,
-                                                                      String 
invalidColumnFamilyName,
-                                                                      String 
validSSTFileStartRange,
-                                                                      String 
validSSTFileEndRange,
-                                                                      String 
invalidSSTFileStartRange,
-                                                                      String 
invalidSSTFileEndRange) {
+  public void testFilterRelevantSstFilesMap(String validSSTColumnFamilyName, 
String invalidColumnFamilyName,
+      String validSSTFileStartRange, String validSSTFileEndRange, String 
invalidSSTFileStartRange,
+      String invalidSSTFileEndRange) {
     String validSstFile = "filePath/validSSTFile.sst";
     String invalidSstFile = "filePath/invalidSSTFile.sst";
     String untrackedSstFile = "filePath/untrackedSSTFile.sst";
     String expectedPrefix = 
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
         validSSTFileStartRange.charAt(0)) / 2));
-    Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, 
untrackedSstFile);
-    Set<String> inputSstFiles = new HashSet<>();
+    Map<String, SstFileInfo> sstFile = ImmutableMap.of(
+        validSstFile, new SstFileInfo(validSstFile, validSSTFileStartRange, 
validSSTFileEndRange,
+            validSSTColumnFamilyName), invalidSstFile, new 
SstFileInfo(invalidSstFile, invalidSSTFileStartRange,
+            invalidSSTFileEndRange, invalidColumnFamilyName), untrackedSstFile,
+        new SstFileInfo(untrackedSstFile, null, null, null));
+    Map<String, SstFileInfo> inputSstFiles = new HashMap<>();
     List<Set<String>> tablesToLookupSet = 
Arrays.asList(ImmutableSet.of(validSSTColumnFamilyName),
         ImmutableSet.of(invalidColumnFamilyName), 
ImmutableSet.of(validSSTColumnFamilyName, invalidColumnFamilyName),
         Collections.emptySet());
     for (Set<String> tablesToLookup : tablesToLookupSet) {
       inputSstFiles.clear();
-      inputSstFiles.addAll(sstFile);
+      inputSstFiles.putAll(sstFile);
       RocksDiffUtils.filterRelevantSstFiles(inputSstFiles,
+          tablesToLookup,
           new TablePrefixInfo(
               new HashMap<String, String>() {{
                 put(invalidColumnFamilyName, 
getLexicographicallyHigherString(invalidSSTFileEndRange));
                 put(validSSTColumnFamilyName, expectedPrefix);
-              }}), ImmutableMap.of("validSSTFile", new 
CompactionNode(validSstFile, 0, validSSTFileStartRange,
-                  validSSTFileEndRange, validSSTColumnFamilyName), 
"invalidSSTFile",
-                new CompactionNode(invalidSstFile, 0, invalidSSTFileStartRange,
-                  invalidSSTFileEndRange, invalidColumnFamilyName)), 
tablesToLookup);
+              }}));
       if (tablesToLookup.contains(validSSTColumnFamilyName)) {
-        Assertions.assertEquals(Sets.newTreeSet(validSstFile, 
untrackedSstFile), inputSstFiles,
+        Assertions.assertEquals(Sets.newTreeSet(validSstFile, 
untrackedSstFile), inputSstFiles.keySet(),
             "Failed for " + tablesToLookup);
       } else {
-        Assertions.assertEquals(Sets.newTreeSet(untrackedSstFile), 
inputSstFiles, "Failed for " + tablesToLookup);
-      }
-    }
-  }
-
-  private LiveFileMetaData getMockedLiveFileMetadata(String columnFamilyName, 
String startRange,
-                                                     String endRange,
-                                                     String name) {
-    LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class);
-    
Mockito.when(liveFileMetaData.largestKey()).thenReturn(endRange.getBytes(StandardCharsets.UTF_8));
-    
Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(columnFamilyName.getBytes(StandardCharsets.UTF_8));
-    
Mockito.when(liveFileMetaData.smallestKey()).thenReturn(startRange.getBytes(StandardCharsets.UTF_8));
-    Mockito.when(liveFileMetaData.fileName()).thenReturn("basePath/" + name + 
".sst");
-    return liveFileMetaData;
-  }
-
-  @ParameterizedTest
-  @MethodSource("values")
-  public void testFilterRelevantSstFilesFromDB(String validSSTColumnFamilyName,
-                                               String invalidColumnFamilyName,
-                                               String validSSTFileStartRange,
-                                               String validSSTFileEndRange,
-                                               String invalidSSTFileStartRange,
-                                               String invalidSSTFileEndRange) {
-    for (int numberOfDBs = 1; numberOfDBs < 10; numberOfDBs++) {
-      String validSstFile = "filePath/validSSTFile.sst";
-      String invalidSstFile = "filePath/invalidSSTFile.sst";
-      String untrackedSstFile = "filePath/untrackedSSTFile.sst";
-      int expectedDBKeyIndex = numberOfDBs / 2;
-      ManagedRocksDB[] rocksDBs =
-          IntStream.range(0, numberOfDBs).mapToObj(i -> 
Mockito.mock(ManagedRocksDB.class))
-              .collect(Collectors.toList()).toArray(new 
ManagedRocksDB[numberOfDBs]);
-      for (int i = 0; i < numberOfDBs; i++) {
-        ManagedRocksDB managedRocksDB = rocksDBs[i];
-        RocksDB mockedRocksDB = Mockito.mock(RocksDB.class);
-        Mockito.when(managedRocksDB.get()).thenReturn(mockedRocksDB);
-        if (i == expectedDBKeyIndex) {
-          LiveFileMetaData validLiveFileMetaData = 
getMockedLiveFileMetadata(validSSTColumnFamilyName,
-              validSSTFileStartRange, validSSTFileEndRange, "validSSTFile");
-          LiveFileMetaData invalidLiveFileMetaData = 
getMockedLiveFileMetadata(invalidColumnFamilyName,
-              invalidSSTFileStartRange, invalidSSTFileEndRange, 
"invalidSSTFile");
-          List<LiveFileMetaData> liveFileMetaDatas = 
Arrays.asList(validLiveFileMetaData, invalidLiveFileMetaData);
-          
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(liveFileMetaDatas);
-        } else {
-          
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(Collections.emptyList());
-        }
-        Mockito.when(managedRocksDB.getLiveMetadataForSSTFiles())
-            .thenAnswer(invocation -> 
ManagedRocksDB.getLiveMetadataForSSTFiles(mockedRocksDB));
+        Assertions.assertEquals(Sets.newTreeSet(untrackedSstFile), 
inputSstFiles.keySet(),
+            "Failed for " + tablesToLookup);
       }
-
-      String expectedPrefix = 
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
-          validSSTFileStartRange.charAt(0)) / 2));
-      Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, 
untrackedSstFile);
-      RocksDiffUtils.filterRelevantSstFiles(sstFile, new 
TablePrefixInfo(ImmutableMap.of(validSSTColumnFamilyName,
-              expectedPrefix)), Collections.emptyMap(),
-          ImmutableSet.of(validSSTColumnFamilyName), rocksDBs);
-      Assertions.assertEquals(Sets.newTreeSet(validSstFile, untrackedSstFile), 
sstFile);
     }
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index 069ecccdd60..3901eeeb0e4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -17,11 +17,13 @@
 
 package org.apache.hadoop.ozone.freon;
 
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_LOG_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,14 +36,16 @@
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -56,8 +60,11 @@
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotVersion;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -98,6 +105,7 @@ public static void init() throws Exception {
     conf.setFromObject(raftClientConfig);
     // Enable filesystem snapshot feature for the test regardless of the 
default
     conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
+    conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, -1);
 
     // Set DB CF write buffer to a much lower value so that flush and 
compaction
     // happens much more frequently without having to create a lot of keys.
@@ -133,9 +141,9 @@ private static String getSnapshotDBKey(String volumeName, 
String bucketName,
     return dbKeyPrefix + OM_KEY_PREFIX + snapshotName;
   }
 
-  private DifferSnapshotInfo getDifferSnapshotInfo(
-      OMMetadataManager omMetadataManager, String volumeName, String 
bucketName,
-      String snapshotName, ManagedRocksDB snapshotDB) throws IOException {
+  private DifferSnapshotVersion getDifferSnapshotInfo(
+      OMMetadataManager omMetadataManager, OmSnapshotLocalDataManager 
localDataManager,
+      String volumeName, String bucketName, String snapshotName) throws 
IOException {
 
     final String dbKey = getSnapshotDBKey(volumeName, bucketName, 
snapshotName);
     final SnapshotInfo snapshotInfo =
@@ -144,16 +152,22 @@ private DifferSnapshotInfo getDifferSnapshotInfo(
 
     // Use RocksDB transaction sequence number in SnapshotInfo, which is
     // persisted at the time of snapshot creation, as the snapshot generation
-    return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotId(),
-        snapshotInfo.getDbTxSequenceNumber(),
-        omMetadataManager.getTableBucketPrefix(volumeName, bucketName),
-        snapshotDB);
+    try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider 
snapshotLocalData =
+             localDataManager.getOmSnapshotLocalData(snapshotInfo)) {
+      NavigableMap<Integer, List<SstFileInfo>> versionSstFiles = 
snapshotLocalData.getSnapshotLocalData()
+          .getVersionSstFileInfos().entrySet().stream()
+          .collect(toMap(Map.Entry::getKey, entry -> 
entry.getValue().getSstFiles(),
+              (u, v) -> {
+              throw new IllegalStateException(String.format("Duplicate key 
%s", u));
+            }, TreeMap::new));
+      DifferSnapshotInfo dsi = new DifferSnapshotInfo((version) -> 
Paths.get(checkpointPath),
+          snapshotInfo.getSnapshotId(), snapshotInfo.getDbTxSequenceNumber(), 
versionSstFiles);
+      return new DifferSnapshotVersion(dsi, 0, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG);
+    }
   }
 
   @Test
-  public void testDAGReconstruction()
-      throws IOException, InterruptedException, TimeoutException {
-
+  public void testDAGReconstruction() throws IOException {
     // Generate keys
     RandomKeyGenerator randomKeyGenerator =
         new RandomKeyGenerator(cluster.getConf());
@@ -200,28 +214,22 @@ public void testDAGReconstruction()
     // Get snapshot SST diff list
     OzoneManager ozoneManager = cluster.getOzoneManager();
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    TablePrefixInfo bucketPrefix = 
omMetadataManager.getTableBucketPrefix(volumeName, bucketName);
+    OmSnapshotLocalDataManager localDataManager = 
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager();
     RDBStore rdbStore = (RDBStore) omMetadataManager.getStore();
     RocksDBCheckpointDiffer differ = rdbStore.getRocksDBCheckpointDiffer();
     UncheckedAutoCloseableSupplier<OmSnapshot> snapDB1 = 
ozoneManager.getOmSnapshotManager()
         .getActiveSnapshot(volumeName, bucketName, "snap1");
     UncheckedAutoCloseableSupplier<OmSnapshot> snapDB2 = 
ozoneManager.getOmSnapshotManager()
         .getActiveSnapshot(volumeName, bucketName, "snap2");
-    DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap1",
-        ((RDBStore) snapDB1.get()
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb());
-    DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap2", ((RDBStore) snapDB2.get()
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb());
+    DifferSnapshotVersion snap1 = getDifferSnapshotInfo(omMetadataManager, 
localDataManager,
+        volumeName, bucketName, "snap1");
+    DifferSnapshotVersion snap2 = getDifferSnapshotInfo(omMetadataManager, 
localDataManager,
+        volumeName, bucketName, "snap2");
 
       // RocksDB does checkpointing in a separate thread, wait for it
-    final File checkpointSnap1 = new File(snap1.getDbPath());
-    GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000);
-    final File checkpointSnap2 = new File(snap2.getDbPath());
-    GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);
-
-    List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    List<SstFileInfo> sstDiffList21 = differ.getSSTDiffList(snap2, snap1, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
     LOG.debug("Got diff list: {}", sstDiffList21);
 
     // Delete 1000 keys, take a 3rd snapshot, and do another diff
@@ -233,23 +241,19 @@ public void testDAGReconstruction()
     LOG.debug("Snapshot created: {}", resp);
     UncheckedAutoCloseableSupplier<OmSnapshot> snapDB3 = 
ozoneManager.getOmSnapshotManager()
         .getActiveSnapshot(volumeName, bucketName, "snap3");
-    DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap3",
-        ((RDBStore) snapDB3.get()
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb());
-    final File checkpointSnap3 = new File(snap3.getDbPath());
-    GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
+    DifferSnapshotVersion snap3 = getDifferSnapshotInfo(omMetadataManager, 
localDataManager, volumeName, bucketName,
+        "snap3");
 
-    List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    List<SstFileInfo> sstDiffList32 = differ.getSSTDiffList(snap3, snap2, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
 
     // snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
-    List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    List<SstFileInfo> sstDiffList31 = differ.getSSTDiffList(snap3, snap1, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
 
     // Same snapshot. Result should be empty list
-    List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    List<SstFileInfo> sstDiffList22 = differ.getSSTDiffList(snap2, snap2, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
     assertThat(sstDiffList22).isEmpty();
     snapDB1.close();
     snapDB2.close();
@@ -258,33 +262,29 @@ public void testDAGReconstruction()
     cluster.restartOzoneManager();
     ozoneManager = cluster.getOzoneManager();
     omMetadataManager = ozoneManager.getMetadataManager();
+    localDataManager = 
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager();
     snapDB1 = ozoneManager.getOmSnapshotManager()
         .getActiveSnapshot(volumeName, bucketName, "snap1");
     snapDB2 = ozoneManager.getOmSnapshotManager()
         .getActiveSnapshot(volumeName, bucketName, "snap2");
-    snap1 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap1",
-        ((RDBStore) snapDB1.get()
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb());
-    snap2 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap2", ((RDBStore) snapDB2.get()
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb());
+    snap1 = getDifferSnapshotInfo(omMetadataManager, localDataManager,
+        volumeName, bucketName, "snap1");
+    snap2 = getDifferSnapshotInfo(omMetadataManager, localDataManager,
+        volumeName, bucketName, "snap2");
     snapDB3 = ozoneManager.getOmSnapshotManager()
         .getActiveSnapshot(volumeName, bucketName, "snap3");
-    snap3 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap3",
-        ((RDBStore) snapDB3.get()
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb());
-    List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    snap3 = getDifferSnapshotInfo(omMetadataManager, localDataManager,
+        volumeName, bucketName, "snap3");
+    List<SstFileInfo> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
     assertEquals(sstDiffList21, sstDiffList21Run2);
 
-    List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    List<SstFileInfo> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
     assertEquals(sstDiffList32, sstDiffList32Run2);
 
-    List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
-        .orElse(Collections.emptyList());
+    List<SstFileInfo> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1, 
bucketPrefix,
+            COLUMN_FAMILIES_TO_TRACK_IN_DAG, 
true).orElse(Collections.emptyList());
     assertEquals(sstDiffList31, sstDiffList31Run2);
     snapDB1.close();
     snapDB2.close();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index db5ac4d6340..3e6ccf771dc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -313,7 +313,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) throws 
IOException {
         cacheCleanupServiceInterval, compactNonSnapshotDiffTables, 
ozoneManager.getMetadataManager().getLock());
 
     this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
-        ozoneManager, snapDiffJobCf, snapDiffReportCf,
+        ozoneManager, snapshotLocalDataManager, snapDiffJobCf, 
snapDiffReportCf,
         columnFamilyOptions, codecRegistry);
 
     diffCleanupServiceInterval = ozoneManager.getConfiguration()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index e5bc8dcfa91..219fc01f0a5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import static java.util.stream.Collectors.toMap;
 import static org.apache.commons.lang3.StringUtils.leftPad;
 import static 
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE;
@@ -60,6 +61,7 @@
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_DAG_WALK;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_FULL_DIFF;
+import static org.apache.ozone.rocksdiff.RocksDiffUtils.filterRelevantSstFiles;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
@@ -78,9 +80,11 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
@@ -90,7 +94,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.io.file.PathUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -109,6 +112,8 @@
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
@@ -117,6 +122,7 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.WithObjectID;
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider;
 import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
 import org.apache.hadoop.ozone.snapshot.ListSnapshotDiffJobResponse;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
@@ -125,10 +131,10 @@
 import org.apache.hadoop.ozone.util.ClosableIterator;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
 import org.apache.ozone.rocksdb.util.SstFileSetReader;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
-import org.apache.ozone.rocksdiff.RocksDiffUtils;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -153,6 +159,7 @@ public class SnapshotDiffManager implements AutoCloseable {
   private final ManagedRocksDB db;
   private final RocksDBCheckpointDiffer differ;
   private final OzoneManager ozoneManager;
+  private final OMMetadataManager activeOmMetadataManager;
   private final CodecRegistry codecRegistry;
   private final ManagedColumnFamilyOptions familyOptions;
   // TODO: [SNAPSHOT] Use different wait time based of job status.
@@ -194,11 +201,13 @@ public class SnapshotDiffManager implements AutoCloseable 
{
           (SnapshotInfo fromSnapshotInfo, SnapshotInfo toSnapshotInfo) ->
               fromSnapshotInfo.getSnapshotId() + DELIMITER +
                   toSnapshotInfo.getSnapshotId();
+  private final OmSnapshotLocalDataManager snapshotLocalDataManager;
 
   @SuppressWarnings("parameternumber")
   public SnapshotDiffManager(ManagedRocksDB db,
                              RocksDBCheckpointDiffer differ,
                              OzoneManager ozoneManager,
+                             OmSnapshotLocalDataManager 
snapshotLocalDataManager,
                              ColumnFamilyHandle snapDiffJobCfh,
                              ColumnFamilyHandle snapDiffReportCfh,
                              ManagedColumnFamilyOptions familyOptions,
@@ -206,6 +215,8 @@ public SnapshotDiffManager(ManagedRocksDB db,
     this.db = db;
     this.differ = differ;
     this.ozoneManager = ozoneManager;
+    this.activeOmMetadataManager = ozoneManager.getMetadataManager();
+    this.snapshotLocalDataManager = snapshotLocalDataManager;
     this.familyOptions = familyOptions;
     this.codecRegistry = codecRegistry;
     this.defaultWaitTime = ozoneManager.getConfiguration().getTimeDuration(
@@ -350,37 +361,34 @@ private void deleteDir(Path path) {
   /**
    * Convert from SnapshotInfo to DifferSnapshotInfo.
    */
-  private DifferSnapshotInfo getDSIFromSI(SnapshotInfo snapshotInfo,
-      OmSnapshot omSnapshot, final String volumeName, final String bucketName)
-      throws IOException {
-
-    final OMMetadataManager snapshotOMMM = omSnapshot.getMetadataManager();
-    final String checkpointPath =
-        snapshotOMMM.getStore().getDbLocation().getPath();
+  private static DifferSnapshotInfo getDSIFromSI(OMMetadataManager 
activeOmMetadataManager,
+      SnapshotInfo snapshotInfo, OmSnapshotLocalData snapshotLocalData) throws 
IOException {
     final UUID snapshotId = snapshotInfo.getSnapshotId();
     final long dbTxSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
+    NavigableMap<Integer, List<SstFileInfo>> versionSstFiles = 
snapshotLocalData.getVersionSstFileInfos()
+        .entrySet().stream().collect(toMap(Map.Entry::getKey, entry -> 
entry.getValue().getSstFiles(),
+            (u, v) -> {
+            throw new IllegalStateException(String.format("Duplicate key %s", 
u));
+          }, TreeMap::new));
+    if (versionSstFiles.isEmpty()) {
+      throw new IOException(String.format("No versions found corresponding to 
%s", snapshotId));
+    }
     return new DifferSnapshotInfo(
-        checkpointPath,
-        snapshotId,
-        dbTxSequenceNumber,
-        snapshotOMMM.getTableBucketPrefix(volumeName, bucketName),
-        ((RDBStore)snapshotOMMM.getStore()).getDb().getManagedRocksDb());
+        version -> OmSnapshotManager.getSnapshotPath(activeOmMetadataManager, 
snapshotId, version),
+        snapshotId, dbTxSequenceNumber, versionSstFiles);
   }
 
   @VisibleForTesting
-  protected Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
-                                                  Set<String> tablesToLookUp) {
-    return RdbUtil.getSSTFilesForComparison(((RDBStore)snapshot
-        .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
-        tablesToLookUp);
+  protected Set<SstFileInfo> getSSTFileSetForSnapshot(OmSnapshot snapshot, 
Set<String> tablesToLookUp) {
+    return RdbUtil.getSSTFilesForComparison(
+        
((RDBStore)snapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb(),
 tablesToLookUp);
   }
 
   @VisibleForTesting
-  protected Map<Object, String> getSSTFileMapForSnapshot(OmSnapshot snapshot,
+  protected Map<Object, SstFileInfo> getSSTFileMapForSnapshot(OmSnapshot 
snapshot,
       Set<String> tablesToLookUp) throws IOException {
     return RdbUtil.getSSTFilesWithInodesForComparison(((RDBStore)snapshot
-            .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
-        tablesToLookUp);
+        .getMetadataManager().getStore()).getDb().getManagedRocksDb(), 
tablesToLookUp);
   }
 
   /**
@@ -1061,10 +1069,12 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap(
     // tombstone is not loaded.
     // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
     if (skipNativeDiff || !isNativeLibsLoaded) {
-      Set<String> inputFiles = getSSTFileListForSnapshot(fromSnapshot, 
tablesToLookUp);
-      ManagedRocksDB fromDB = 
((RDBStore)fromSnapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb();
-      RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, 
tablesToLookUp, fromDB);
-      deltaFiles.addAll(inputFiles);
+      Set<SstFileInfo> inputFiles = 
filterRelevantSstFiles(getSSTFileSetForSnapshot(fromSnapshot, tablesToLookUp),
+          tablesToLookUp, tablePrefixes);
+      Path fromSnapshotPath = 
fromSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath();
+      for (SstFileInfo sstFileInfo : inputFiles) {
+        
deltaFiles.add(sstFileInfo.getFilePath(fromSnapshotPath).toAbsolutePath().toString());
+      }
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Computed Delta SST File Set, Total count = {} ", 
deltaFiles.size());
@@ -1171,21 +1181,22 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
       throws IOException {
     // TODO: [SNAPSHOT] Refactor the parameter list
     Optional<Set<String>> deltaFiles = Optional.empty();
-
     // Check if compaction DAG is available, use that if so
     if (differ != null && fsInfo != null && tsInfo != null && !useFullDiff) {
-      String volume = fsInfo.getVolumeName();
-      String bucket = fsInfo.getBucketName();
-      // Construct DifferSnapshotInfo
-      final DifferSnapshotInfo fromDSI =
-          getDSIFromSI(fsInfo, fromSnapshot, volume, bucket);
-      final DifferSnapshotInfo toDSI =
-          getDSIFromSI(tsInfo, toSnapshot, volume, bucket);
-
-      recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK);
-      LOG.debug("Calling RocksDBCheckpointDiffer");
-      try {
-        deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, 
tablesToLookUp, diffDir).map(HashSet::new);
+      try (ReadableOmSnapshotLocalDataProvider snapLocalDataProvider = 
snapshotLocalDataManager.getOmSnapshotLocalData(
+          toSnapshot.getSnapshotID(), fromSnapshot.getSnapshotID())) {
+        OmSnapshotLocalData toSnapshotLocalData = 
snapLocalDataProvider.getSnapshotLocalData();
+        OmSnapshotLocalData fromSnapshotLocalData = 
snapLocalDataProvider.getPreviousSnapshotLocalData();
+        // Construct DifferSnapshotInfo
+        final DifferSnapshotInfo fromDSI = 
getDSIFromSI(activeOmMetadataManager, fsInfo, fromSnapshotLocalData);
+        final DifferSnapshotInfo toDSI = getDSIFromSI(activeOmMetadataManager, 
tsInfo, toSnapshotLocalData);
+
+        recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK);
+        LOG.debug("Calling RocksDBCheckpointDiffer");
+        final Map<Integer, Integer> versionMap = 
toSnapshotLocalData.getVersionSstFileInfos().entrySet()
+            .stream().collect(toMap(Map.Entry::getKey, entry -> 
entry.getValue().getPreviousSnapshotVersion()));
+        deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, 
versionMap, tablePrefixInfo, tablesToLookUp,
+            diffDir).map(HashSet::new);
       } catch (Exception exception) {
         recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF);
         LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " 
+
@@ -1198,15 +1209,10 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
       //  the slower approach.
       if (!useFullDiff) {
         LOG.warn("RocksDBCheckpointDiffer is not available, falling back to" +
-                " slow path");
+            " slow path");
       }
       recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF);
-      ManagedRocksDB fromDB = 
((RDBStore)fromSnapshot.getMetadataManager().getStore())
-          .getDb().getManagedRocksDb();
-      ManagedRocksDB toDB = 
((RDBStore)toSnapshot.getMetadataManager().getStore())
-          .getDb().getManagedRocksDb();
-      Set<String> diffFiles = getDiffFiles(fromSnapshot, toSnapshot, 
tablesToLookUp);
-      RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixInfo, 
tablesToLookUp, fromDB, toDB);
+      Set<String> diffFiles = getDiffFiles(fromSnapshot, toSnapshot, 
tablesToLookUp, tablePrefixInfo);
       deltaFiles = Optional.of(diffFiles);
     }
 
@@ -1215,25 +1221,42 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
             toSnapshot.getSnapshotTableKey()));
   }
 
-  private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot 
toSnapshot, Set<String> tablesToLookUp) {
+  private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot 
toSnapshot, Set<String> tablesToLookUp,
+      TablePrefixInfo tablePrefixInfo) {
     Set<String> diffFiles;
+    Path fromSnapshotPath = 
fromSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath();
+    Path toSnapshotPath = 
toSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath();
     try {
-      Map<Object, String> fromSnapshotFiles = 
getSSTFileMapForSnapshot(fromSnapshot, tablesToLookUp);
-      Map<Object, String> toSnapshotFiles = 
getSSTFileMapForSnapshot(toSnapshot, tablesToLookUp);
-      diffFiles = Stream.concat(
-          fromSnapshotFiles.entrySet().stream()
-              .filter(e -> !toSnapshotFiles.containsKey(e.getKey())),
-          toSnapshotFiles.entrySet().stream()
-              .filter(e -> !fromSnapshotFiles.containsKey(e.getKey())))
-              .map(Map.Entry::getValue)
-          .collect(Collectors.toSet());
+      diffFiles = new HashSet<>();
+      Map<Object, SstFileInfo> fromSnapshotFiles = 
filterRelevantSstFiles(getSSTFileMapForSnapshot(fromSnapshot,
+          tablesToLookUp), tablesToLookUp, tablePrefixInfo);
+      Map<Object, SstFileInfo> toSnapshotFiles = 
filterRelevantSstFiles(getSSTFileMapForSnapshot(toSnapshot,
+          tablesToLookUp), tablesToLookUp, tablePrefixInfo);
+      for (Map.Entry<Object, SstFileInfo> entry : 
fromSnapshotFiles.entrySet()) {
+        if (!toSnapshotFiles.containsKey(entry.getKey())) {
+          
diffFiles.add(entry.getValue().getFilePath(fromSnapshotPath).toAbsolutePath().toString());
+        }
+      }
+      for (Map.Entry<Object, SstFileInfo> entry : toSnapshotFiles.entrySet()) {
+        if (!fromSnapshotFiles.containsKey(entry.getKey())) {
+          
diffFiles.add(entry.getValue().getFilePath(toSnapshotPath).toAbsolutePath().toString());
+        }
+      }
     } catch (IOException e) {
       // In case of exception during inode read use all files
       LOG.error("Exception occurred while populating delta files for 
snapDiff", e);
       LOG.warn("Falling back to full file list comparison, inode-based 
optimization skipped.");
+      Set<SstFileInfo> fromSnapshotFiles = 
filterRelevantSstFiles(getSSTFileSetForSnapshot(fromSnapshot,
+          tablesToLookUp), tablesToLookUp, tablePrefixInfo);
+      Set<SstFileInfo> toSnapshotFiles = 
filterRelevantSstFiles(getSSTFileSetForSnapshot(toSnapshot,
+          tablesToLookUp), tablesToLookUp, tablePrefixInfo);
       diffFiles = new HashSet<>();
-      diffFiles.addAll(getSSTFileListForSnapshot(fromSnapshot, 
tablesToLookUp));
-      diffFiles.addAll(getSSTFileListForSnapshot(toSnapshot, tablesToLookUp));
+      for (SstFileInfo sstFileInfo : fromSnapshotFiles) {
+        
diffFiles.add(sstFileInfo.getFilePath(fromSnapshotPath).toAbsolutePath().toString());
+      }
+      for (SstFileInfo sstFileInfo : toSnapshotFiles) {
+        
diffFiles.add(sstFileInfo.getFilePath(toSnapshotPath).toAbsolutePath().toString());
+      }
     }
     return diffFiles;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index 840ef6eaeb8..b484ad628c7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -49,6 +49,7 @@
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
 import static org.apache.ratis.util.JavaUtils.attempt;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -84,6 +85,7 @@
 import jakarta.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -103,6 +105,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
@@ -126,6 +129,7 @@
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -136,6 +140,7 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
 import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
+import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider;
 import 
org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap;
 import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
 import 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage;
@@ -147,6 +152,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
 import org.apache.ozone.rocksdb.util.SstFileSetReader;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
@@ -228,6 +234,9 @@ public class TestSnapshotDiffManager {
   @Mock
   private RocksIterator jobTableIterator;
 
+  @Mock
+  private OmSnapshotLocalDataManager localDataManager;
+
   @Mock
   private OmSnapshotManager omSnapshotManager;
 
@@ -378,7 +387,7 @@ public void init() throws RocksDBException, IOException, 
ExecutionException {
           return snapshotCache.get(snapInfo.getSnapshotId());
         });
     when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
-    snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
+    snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager, 
localDataManager,
         snapDiffJobTable, snapDiffReportTable, columnFamilyOptions, 
codecRegistry);
     when(omSnapshotManager.getDiffCleanupServiceInterval()).thenReturn(0L);
   }
@@ -434,10 +443,12 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) 
throws IOException {
     when(differ.getSSTDiffListWithFullPath(
         any(DifferSnapshotInfo.class),
         any(DifferSnapshotInfo.class),
+        anyMap(),
+        any(TablePrefixInfo.class),
         anySet(),
         eq(diffDir))
     ).thenReturn(Optional.of(Lists.newArrayList(randomStrings)));
-
+    mockSnapshotLocalData();
     UncheckedAutoCloseableSupplier<OmSnapshot> rcFromSnapshot =
         omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap1.toString());
     UncheckedAutoCloseableSupplier<OmSnapshot> rcToSnapshot =
@@ -448,28 +459,33 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) 
throws IOException {
     SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
     SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap2);
     when(jobTableIterator.isValid()).thenReturn(false);
-    try (MockedStatic<RdbUtil> mockedRdbUtil = 
Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS);
-         MockedStatic<RocksDiffUtils> mockedRocksDiffUtils = 
Mockito.mockStatic(RocksDiffUtils.class,
-             Mockito.CALLS_REAL_METHODS)) {
-      mockedRdbUtil.when(() -> RdbUtil.getSSTFilesForComparison(any(), any()))
-          
.thenReturn(Collections.singleton(RandomStringUtils.secure().nextAlphabetic(10)));
-      mockedRocksDiffUtils.when(() -> 
RocksDiffUtils.filterRelevantSstFiles(any(), any(), anySet()))
-          .thenAnswer(i -> null);
-      SnapshotDiffManager spy = spy(snapshotDiffManager);
-      doNothing().when(spy).recordActivity(any(), any());
-      doNothing().when(spy).updateProgress(anyString(), anyDouble());
-      Set<String> deltaFiles = spy.getDeltaFiles(
-          fromSnapshot,
-          toSnapshot,
-          Sets.newHashSet("cf1", "cf2"), fromSnapshotInfo,
-          toSnapshotInfo, false,
-          new TablePrefixInfo(Collections.emptyMap()), diffDir, diffJobKey);
-      assertEquals(randomStrings, deltaFiles);
-    }
+
+    SnapshotDiffManager spy = spy(snapshotDiffManager);
+    doNothing().when(spy).recordActivity(any(), any());
+    doNothing().when(spy).updateProgress(anyString(), anyDouble());
+    Set<String> deltaFiles = spy.getDeltaFiles(
+        fromSnapshot,
+        toSnapshot,
+        Sets.newHashSet("cf1", "cf2"), fromSnapshotInfo,
+        toSnapshotInfo, false,
+        new TablePrefixInfo(Collections.emptyMap()), diffDir, diffJobKey);
+    assertEquals(randomStrings, deltaFiles);
+
     rcFromSnapshot.close();
     rcToSnapshot.close();
   }
 
+  private void mockSnapshotLocalData() throws IOException {
+    OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+    ReadableOmSnapshotLocalDataProvider snapProvider = 
mock(ReadableOmSnapshotLocalDataProvider.class);
+    when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(localData);
+    when(snapProvider.getSnapshotLocalData()).thenReturn(localData);
+    OmSnapshotLocalData.VersionMeta versionMeta = 
mock(OmSnapshotLocalData.VersionMeta.class);
+    when(versionMeta.getSstFiles()).thenReturn(Collections.emptyList());
+    when(localData.getVersionSstFileInfos()).thenReturn(ImmutableMap.of(0, 
versionMeta));
+    when(localDataManager.getOmSnapshotLocalData(any(UUID.class), 
any(UUID.class))).thenReturn(snapProvider);
+  }
+
   @ParameterizedTest
   @CsvSource({"0,true", "1,true", "2,true", "5,true", "10,true", "100,true",
       "1000,true", "10000,true", "0,false", "1,false", "2,false", "5,false",
@@ -483,26 +499,27 @@ public void testGetDeltaFilesWithFullDiff(int 
numberOfFiles,
       Set<String> deltaStrings = new HashSet<>();
 
       mockedRdbUtil.when(
-              () -> RdbUtil.getSSTFilesForComparison(any(), anySet()))
-          .thenAnswer((Answer<Set<String>>) invocation -> {
-            Set<String> retVal = IntStream.range(0, numberOfFiles)
+              () -> RdbUtil.getSSTFilesWithInodesForComparison(any(), 
anySet()))
+          .thenAnswer(invocation -> {
+            Map<Object, SstFileInfo> retVal = IntStream.range(0, numberOfFiles)
                 .mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10))
-                .collect(Collectors.toSet());
-            deltaStrings.addAll(retVal);
+                .collect(Collectors.toMap(Function.identity(),
+                    i -> new SstFileInfo(i, null, null, null)));
+            
deltaStrings.addAll(retVal.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
             return retVal;
           });
 
       mockedRocksDiffUtils.when(() ->
-              RocksDiffUtils.filterRelevantSstFiles(anySet(), any(), anyMap(), 
anySet(), any(ManagedRocksDB.class),
-                  any(ManagedRocksDB.class)))
-          .thenAnswer((Answer<Void>) invocationOnMock -> {
-            invocationOnMock.getArgument(0, Set.class).stream()
+              RocksDiffUtils.filterRelevantSstFiles(anyMap(), anySet(), any()))
+          .thenAnswer(invocationOnMock -> {
+            invocationOnMock.getArgument(0, Map.class).entrySet().stream()
                 .findAny().ifPresent(val -> {
-                  assertTrue(deltaStrings.contains(val));
-                  invocationOnMock.getArgument(0, Set.class).remove(val);
-                  deltaStrings.remove(val);
+                  Map.Entry entry = (Map.Entry) val;
+                  assertTrue(deltaStrings.contains(entry.getKey()));
+                  invocationOnMock.getArgument(0, 
Map.class).remove(entry.getKey());
+                  deltaStrings.remove(entry.getKey());
                 });
-            return null;
+            return invocationOnMock.getArgument(0, Map.class);
           });
       UUID snap1 = UUID.randomUUID();
       UUID snap2 = UUID.randomUUID();
@@ -515,11 +532,13 @@ public void testGetDeltaFilesWithFullDiff(int 
numberOfFiles,
         when(differ.getSSTDiffListWithFullPath(
             any(DifferSnapshotInfo.class),
             any(DifferSnapshotInfo.class),
+            anyMap(),
+            any(TablePrefixInfo.class),
             anySet(),
             anyString()))
-            .thenReturn(Optional.ofNullable(Collections.emptyList()));
+            .thenReturn(Optional.empty());
       }
-
+      mockSnapshotLocalData();
       UncheckedAutoCloseableSupplier<OmSnapshot> rcFromSnapshot =
           omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap1.toString());
       UncheckedAutoCloseableSupplier<OmSnapshot> rcToSnapshot =
@@ -542,7 +561,12 @@ public void testGetDeltaFilesWithFullDiff(int 
numberOfFiles,
           false,
           new TablePrefixInfo(Collections.emptyMap()),
           snapDiffDir.getAbsolutePath(), diffJobKey);
-      assertEquals(deltaStrings, deltaFiles);
+      assertEquals(deltaStrings.stream()
+          .map(i -> dbStore.getDbLocation().toPath().resolve(i + 
SST_FILE_EXTENSION).toAbsolutePath().toString())
+          .collect(Collectors.toSet()), deltaFiles);
+      if (useFullDiff && numberOfFiles > 1) {
+        assertThat(deltaFiles).isNotEmpty();
+      }
     }
   }
 
@@ -566,8 +590,7 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
           });
 
       mockedRocksDiffUtils.when(() ->
-              RocksDiffUtils.filterRelevantSstFiles(anySet(), any(), anyMap(), 
anySet(), any(ManagedRocksDB.class),
-                  any(ManagedRocksDB.class)))
+              RocksDiffUtils.filterRelevantSstFiles(anySet(), anySet(), any()))
           .thenAnswer((Answer<Void>) invocationOnMock -> {
             invocationOnMock.getArgument(0, Set.class).stream()
                 .findAny().ifPresent(val -> {
@@ -589,6 +612,8 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
           .getSSTDiffListWithFullPath(
               any(DifferSnapshotInfo.class),
               any(DifferSnapshotInfo.class),
+              anyMap(),
+              any(TablePrefixInfo.class),
               anySet(),
               anyString());
 
@@ -606,6 +631,7 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
       SnapshotDiffManager spy = spy(snapshotDiffManager);
       doNothing().when(spy).recordActivity(any(), any());
       doNothing().when(spy).updateProgress(anyString(), anyDouble());
+      mockSnapshotLocalData();
       Set<String> deltaFiles = spy.getDeltaFiles(
           fromSnapshot,
           toSnapshot,
@@ -1541,33 +1567,36 @@ public void testGetDeltaFilesWithFullDiff() throws  
IOException {
     SnapshotDiffManager spy = spy(snapshotDiffManager);
     UUID snap1 = UUID.randomUUID();
     OmSnapshot fromSnapshot = getMockedOmSnapshot(snap1);
+    Path fromSnapshotPath = 
fromSnapshot.getMetadataManager().getStore().getDbLocation().toPath();
     UUID snap2 = UUID.randomUUID();
     OmSnapshot toSnapshot = getMockedOmSnapshot(snap2);
+    Path toSnapshotPath = 
toSnapshot.getMetadataManager().getStore().getDbLocation().toPath();
     Mockito.doAnswer(invocation -> {
       OmSnapshot snapshot = invocation.getArgument(0);
       if (snapshot == fromSnapshot) {
-        Map<Integer, String> inodeToFileMap = new HashMap<>();
-        inodeToFileMap.put(1, "1.sst");
-        inodeToFileMap.put(2, "2.sst");
-        inodeToFileMap.put(3, "3.sst");
+        Map<Integer, SstFileInfo> inodeToFileMap = new HashMap<>();
+        inodeToFileMap.put(1, new SstFileInfo("1", null, null, null));
+        inodeToFileMap.put(2, new SstFileInfo("2", null, null, null));
+        inodeToFileMap.put(3, new SstFileInfo("3", null, null, null));
         return inodeToFileMap;
       }
       if (snapshot == toSnapshot) {
-        Map<Integer, String> inodeToFileMap = new HashMap<>();
-        inodeToFileMap.put(1, "10.sst");
-        inodeToFileMap.put(2, "20.sst");
-        inodeToFileMap.put(4, "4.sst");
+        Map<Integer, SstFileInfo> inodeToFileMap = new HashMap<>();
+        inodeToFileMap.put(1, new SstFileInfo("10", null, null, null));
+        inodeToFileMap.put(2, new SstFileInfo("20", null, null, null));
+        inodeToFileMap.put(4, new SstFileInfo("4", null, null, null));
         return inodeToFileMap;
       }
       return null;
-    }).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class),
-        Mockito.anySet());
+    }).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class), 
Mockito.anySet());
     doNothing().when(spy).recordActivity(any(), any());
     doNothing().when(spy).updateProgress(anyString(), anyDouble());
     String diffJobKey = snap1 + DELIMITER + snap2;
+
     Set<String> deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, 
Collections.emptySet(), snapshotInfo,
         snapshotInfo, true, new TablePrefixInfo(Collections.emptyMap()), null, 
diffJobKey);
-    Assertions.assertEquals(Sets.newHashSet("3.sst", "4.sst"), deltaFiles);
+    
Assertions.assertEquals(Sets.newHashSet(fromSnapshotPath.resolve("3.sst").toAbsolutePath().toString(),
+            toSnapshotPath.resolve("4.sst").toAbsolutePath().toString()), 
deltaFiles);
   }
 
   @Test
@@ -1585,7 +1614,7 @@ public void testGetSnapshotDiffReportHappyCase() throws 
Exception {
         anyString());
 
     doReturn(testDeltaFiles).when(spy)
-        .getSSTFileListForSnapshot(any(OmSnapshot.class), anySet());
+        .getSSTFileSetForSnapshot(any(OmSnapshot.class), anySet());
 
     doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable),
         any(), anyBoolean(), any(), any(), any(), any(), any(), any(), 
anyString());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to