This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ff41ce6092a HDDS-13849. Refactor getTablePrefix function in 
SnapshotDiff flow (#9235)
ff41ce6092a is described below

commit ff41ce6092ac244cb30b75e2d5bc10835c5789e5
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Nov 10 19:08:51 2025 -0500

    HDDS-13849. Refactor getTablePrefix function in SnapshotDiff flow (#9235)
---
 .../java/org/apache/hadoop/hdds/StringUtils.java   |   7 +
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  10 +-
 .../hadoop/hdds/utils/db/TablePrefixInfo.java      |  52 +++++
 .../apache/hadoop/hdds/utils/db/package-info.java  |  21 ++
 .../org/apache/ozone/rocksdb/util/RdbUtil.java     |  10 +-
 .../apache/ozone/rocksdiff/DifferSnapshotInfo.java |  10 +-
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |  26 ++-
 .../org/apache/ozone/rocksdiff/RocksDiffUtils.java |  55 ++---
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 232 ++++++++++++---------
 .../apache/ozone/rocksdiff/TestRocksDiffUtils.java | 114 +++++-----
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      |  26 ++-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  11 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  25 ++-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  67 ++++++
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  72 ++-----
 .../hadoop/ozone/om/SstFilteringService.java       |  13 +-
 .../snapshot/OMSnapshotMoveTableKeysRequest.java   |  24 ++-
 .../hadoop/ozone/om/snapshot/OmSnapshotUtils.java  |   2 +-
 .../ozone/om/snapshot/SnapshotDiffManager.java     |  71 +++----
 .../hadoop/ozone/om/snapshot/SnapshotUtils.java    |  25 ---
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  22 +-
 .../ozone/om/service/TestKeyDeletingService.java   |   8 +-
 .../ozone/om/snapshot/TestSnapshotDiffManager.java |  55 ++---
 .../hadoop/ozone/repair/ldb/TestLdbRepair.java     |   2 +-
 24 files changed, 540 insertions(+), 420 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
index cfcac1a7712..a3bd1e62ffc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
@@ -123,4 +123,11 @@ public static String 
getLexicographicallyHigherString(String val) {
     charVal[lastIdx] += 1;
     return String.valueOf(charVal);
   }
+
+  public static String getFirstNChars(String str, int n) {
+    if (str == null || str.length() < n) {
+      return str;
+    }
+    return str.substring(0, n);
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index b93626060c8..64bbb371101 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -854,18 +854,14 @@ private int getLastLevel() throws RocksDatabaseException {
   /**
    * Deletes sst files which do not correspond to prefix
    * for given table.
-   * @param prefixPairs a map of TableName to prefixUsed.
+   * @param prefixInfo a map of TableName to prefixUsed.
    */
-  public void deleteFilesNotMatchingPrefix(Map<String, String> prefixPairs) 
throws RocksDatabaseException {
+  public void deleteFilesNotMatchingPrefix(TablePrefixInfo prefixInfo) throws 
RocksDatabaseException {
     try (UncheckedAutoCloseable ignored = acquire()) {
       for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
         String sstFileColumnFamily = 
StringUtils.bytes2String(liveFileMetaData.columnFamilyName());
         int lastLevel = getLastLevel();
 
-        if (!prefixPairs.containsKey(sstFileColumnFamily)) {
-          continue;
-        }
-
         // RocksDB #deleteFile API allows only to delete the last level of
         // SST Files. Any level < last level won't get deleted and
         // only last file of level 0 can be deleted
@@ -876,7 +872,7 @@ public void deleteFilesNotMatchingPrefix(Map<String, 
String> prefixPairs) throws
           continue;
         }
 
-        String prefixForColumnFamily = prefixPairs.get(sstFileColumnFamily);
+        String prefixForColumnFamily = 
prefixInfo.getTablePrefix(sstFileColumnFamily);
         String firstDbKey = 
StringUtils.bytes2String(liveFileMetaData.smallestKey());
         String lastDbKey = 
StringUtils.bytes2String(liveFileMetaData.largestKey());
         boolean isKeyWithPrefixPresent = RocksDiffUtils.isKeyWithPrefixPresent(
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/TablePrefixInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/TablePrefixInfo.java
new file mode 100644
index 00000000000..65d88962362
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/TablePrefixInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Encapsulates a store's prefix info corresponding to tables in a db.
+ */
+public class TablePrefixInfo {
+  private final Map<String, String> tablePrefixes;
+
+  public TablePrefixInfo(Map<String, String> tablePrefixes) {
+    this.tablePrefixes = Collections.unmodifiableMap(tablePrefixes);
+  }
+
+  public String getTablePrefix(String tableName) {
+    return tablePrefixes.getOrDefault(tableName, "");
+  }
+
+  public int size() {
+    return tablePrefixes.size();
+  }
+
+  public Set<String> getTableNames() {
+    return tablePrefixes.keySet();
+  }
+
+  @Override
+  public String toString() {
+    return "TablePrefixInfo{" +
+        "tablePrefixes=" + tablePrefixes +
+        '}';
+  }
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/package-info.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/package-info.java
new file mode 100644
index 00000000000..48b831ecb8f
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *  Util package for rocksdb.
+ */
+package org.apache.hadoop.hdds.utils.db;
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
index 97eaa945fdc..95c4a4aa2bb 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.ozone.rocksdb.util;
 
-import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -43,21 +42,20 @@ public final class RdbUtil {
   private RdbUtil() { }
 
   public static List<LiveFileMetaData> getLiveSSTFilesForCFs(
-      final ManagedRocksDB rocksDB, List<String> cfs) {
-    final Set<String> cfSet = Sets.newHashSet(cfs);
+      final ManagedRocksDB rocksDB, Set<String> cfs) {
     return rocksDB.get().getLiveFilesMetaData().stream()
-        .filter(lfm -> 
cfSet.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
+        .filter(lfm -> 
cfs.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
         .collect(Collectors.toList());
   }
 
   public static Set<String> getSSTFilesForComparison(
-      final ManagedRocksDB rocksDB, List<String> cfs) {
+      final ManagedRocksDB rocksDB, Set<String> cfs) {
     return getLiveSSTFilesForCFs(rocksDB, cfs).stream()
         .map(lfm -> new File(lfm.path(), lfm.fileName()).getPath())
         .collect(Collectors.toCollection(HashSet::new));
   }
 
-  public static Map<Object, String> getSSTFilesWithInodesForComparison(final 
ManagedRocksDB rocksDB, List<String> cfs)
+  public static Map<Object, String> getSSTFilesWithInodesForComparison(final 
ManagedRocksDB rocksDB, Set<String> cfs)
       throws IOException {
     List<LiveFileMetaData> liveSSTFilesForCFs = getLiveSSTFilesForCFs(rocksDB, 
cfs);
     Map<Object, String> inodeToSstMap = new HashMap<>();
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
index 501725ca7c2..c72f56d5f11 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
@@ -17,8 +17,8 @@
 
 package org.apache.ozone.rocksdiff;
 
-import java.util.Map;
 import java.util.UUID;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
 
 /**
@@ -29,17 +29,17 @@ public class DifferSnapshotInfo {
   private final UUID snapshotId;
   private final long snapshotGeneration;
 
-  private final Map<String, String> tablePrefixes;
+  private final TablePrefixInfo tablePrefixes;
 
   private final ManagedRocksDB rocksDB;
 
   public DifferSnapshotInfo(String db, UUID id, long gen,
-                            Map<String, String> prefixes,
+                            TablePrefixInfo tablePrefixInfo,
                             ManagedRocksDB rocksDB) {
     dbPath = db;
     snapshotId = id;
     snapshotGeneration = gen;
-    tablePrefixes = prefixes;
+    tablePrefixes = tablePrefixInfo;
     this.rocksDB = rocksDB;
   }
 
@@ -55,7 +55,7 @@ public long getSnapshotGeneration() {
     return snapshotGeneration;
   }
 
-  public Map<String, String> getTablePrefixes() {
+  public TablePrefixInfo getTablePrefixes() {
     return tablePrefixes;
   }
 
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index da8af3691e0..44cbc45ad6b 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -32,6 +32,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.google.common.graph.MutableGraph;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.BufferedWriter;
@@ -620,12 +621,13 @@ private String trimSSTFilename(String filename) {
    * Read the current Live manifest for a given RocksDB instance (Active or
    * Checkpoint).
    * @param rocksDB open rocksDB instance.
+   * @param tableFilter set of column-family/table names to include when 
collecting live SSTs.
    * @return a list of SST files (without extension) in the DB.
    */
-  public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
+  public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB, Set<String> 
tableFilter) {
     HashSet<String> liveFiles = new HashSet<>();
 
-    final List<String> cfs = Arrays.asList(
+    final Set<String> cfs = Sets.newHashSet(
         org.apache.hadoop.hdds.StringUtils.bytes2String(
             RocksDB.DEFAULT_COLUMN_FAMILY), "keyTable", "directoryTable",
         "fileTable");
@@ -635,6 +637,9 @@ public Set<String> readRocksDBLiveFiles(ManagedRocksDB 
rocksDB) {
         RdbUtil.getLiveSSTFilesForCFs(rocksDB, cfs);
     LOG.debug("SST File Metadata for DB: " + rocksDB.get().getName());
     for (LiveFileMetaData m : liveFileMetaDataList) {
+      if 
(!tableFilter.contains(StringUtils.bytes2String(m.columnFamilyName()))) {
+        continue;
+      }
       LOG.debug("File: {}, Level: {}", m.fileName(), m.level());
       final String trimmedFilename = trimSSTFilename(m.fileName());
       liveFiles.add(trimmedFilename);
@@ -818,6 +823,7 @@ private String getSSTFullPath(String 
sstFilenameWithoutExtension,
    *
    * @param src source snapshot
    * @param dest destination snapshot
+   * @param tablesToLookup tablesToLookup set of table (column family) names 
used to restrict which SST files to return.
    * @param sstFilesDirForSnapDiffJob dir to create hardlinks for SST files
    *                                 for snapDiff job.
    * @return A list of SST files without extension.
@@ -825,10 +831,10 @@ private String getSSTFullPath(String 
sstFilenameWithoutExtension,
    *               "/path/to/sstBackupDir/000060.sst"]
    */
   public synchronized Optional<List<String>> 
getSSTDiffListWithFullPath(DifferSnapshotInfo src,
-      DifferSnapshotInfo dest,
+      DifferSnapshotInfo dest, Set<String> tablesToLookup,
       String sstFilesDirForSnapDiffJob) {
 
-    Optional<List<String>> sstDiffList = getSSTDiffList(src, dest);
+    Optional<List<String>> sstDiffList = getSSTDiffList(src, dest, 
tablesToLookup);
 
     return sstDiffList.map(diffList -> diffList.stream()
         .map(
@@ -852,15 +858,17 @@ public synchronized Optional<List<String>> 
getSSTDiffListWithFullPath(DifferSnap
    *
    * @param src source snapshot
    * @param dest destination snapshot
+   * @param tablesToLookup tablesToLookup Set of column-family (table) names 
to include when reading SST files;
+   *                       must be non-null.
    * @return A list of SST files without extension. e.g. ["000050", "000060"]
    */
   public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo 
src,
-      DifferSnapshotInfo dest) {
+      DifferSnapshotInfo dest, Set<String> tablesToLookup) {
 
     // TODO: Reject or swap if dest is taken after src, once snapshot chain
     //  integration is done.
-    Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB());
-    Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB());
+    Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB(), 
tablesToLookup);
+    Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB(), 
tablesToLookup);
 
     Set<String> fwdDAGSameFiles = new HashSet<>();
     Set<String> fwdDAGDifferentFiles = new HashSet<>();
@@ -896,9 +904,9 @@ public synchronized Optional<List<String>> 
getSSTDiffList(DifferSnapshotInfo src
       }
     }
 
-    if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+    if (src.getTablePrefixes() != null && src.getTablePrefixes().size() != 0) {
       RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, 
src.getTablePrefixes(),
-          compactionDag.getCompactionMap(), src.getRocksDB(), 
dest.getRocksDB());
+          compactionDag.getCompactionMap(), tablesToLookup, src.getRocksDB(), 
dest.getRocksDB());
     }
     return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
   }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
index 86577147b62..7d9512768bc 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.ozone.rocksdiff;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.hdds.StringUtils.getFirstNChars;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
@@ -25,9 +25,8 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
 import org.apache.ozone.compaction.log.CompactionFileInfo;
 import org.apache.ozone.rocksdb.util.SstFileInfo;
@@ -49,41 +48,26 @@ private RocksDiffUtils() {
   public static boolean isKeyWithPrefixPresent(String prefixForColumnFamily,
                                                String firstDbKey,
                                                String lastDbKey) {
-    String firstKeyPrefix = constructBucketKey(firstDbKey);
-    String endKeyPrefix = constructBucketKey(lastDbKey);
+    String firstKeyPrefix = getFirstNChars(firstDbKey, 
prefixForColumnFamily.length());
+    String endKeyPrefix = getFirstNChars(lastDbKey, 
prefixForColumnFamily.length());
     return firstKeyPrefix.compareTo(prefixForColumnFamily) <= 0
         && prefixForColumnFamily.compareTo(endKeyPrefix) <= 0;
   }
 
-  public static String constructBucketKey(String keyName) {
-    if (!keyName.startsWith(OM_KEY_PREFIX)) {
-      keyName = OM_KEY_PREFIX.concat(keyName);
-    }
-    String[] elements = keyName.split(OM_KEY_PREFIX);
-    String volume = elements[1];
-    String bucket = elements[2];
-    StringBuilder builder =
-        new StringBuilder().append(OM_KEY_PREFIX).append(volume);
-
-    if (StringUtils.isNotBlank(bucket)) {
-      builder.append(OM_KEY_PREFIX).append(bucket);
-    }
-    builder.append(OM_KEY_PREFIX);
-    return builder.toString();
-  }
-
   public static void filterRelevantSstFiles(Set<String> inputFiles,
-                                            Map<String, String> 
tableToPrefixMap,
+                                            TablePrefixInfo tablePrefixInfo,
+                                            Set<String> columnFamiliesToLookup,
                                             ManagedRocksDB... dbs) {
-    filterRelevantSstFiles(inputFiles, tableToPrefixMap, 
Collections.emptyMap(), dbs);
+    filterRelevantSstFiles(inputFiles, tablePrefixInfo, 
Collections.emptyMap(), columnFamiliesToLookup, dbs);
   }
 
   /**
    * Filter sst files based on prefixes.
    */
   public static void filterRelevantSstFiles(Set<String> inputFiles,
-                                            Map<String, String> 
tableToPrefixMap,
+                                            TablePrefixInfo tablePrefixInfo,
                                             Map<String, CompactionNode> 
preExistingCompactionNodes,
+                                            Set<String> columnFamiliesToLookup,
                                             ManagedRocksDB... dbs) {
     Map<String, LiveFileMetaData> liveFileMetaDataMap = new HashMap<>();
     int dbIdx = 0;
@@ -100,41 +84,38 @@ public static void filterRelevantSstFiles(Set<String> 
inputFiles,
         compactionNode = new CompactionNode(new 
CompactionFileInfo.Builder(filename)
             .setValues(liveFileMetaDataMap.get(filename)).build());
       }
-      if (shouldSkipNode(compactionNode, tableToPrefixMap)) {
+      if (shouldSkipNode(compactionNode, tablePrefixInfo, 
columnFamiliesToLookup)) {
         fileIterator.remove();
       }
     }
   }
 
   @VisibleForTesting
-  static boolean shouldSkipNode(SstFileInfo node,
-                                Map<String, String> columnFamilyToPrefixMap) {
+  static boolean shouldSkipNode(SstFileInfo node, TablePrefixInfo 
tablePrefixInfo, Set<String> columnFamiliesToLookup) {
     // This is for backward compatibility. Before the compaction log table
     // migration, startKey, endKey and columnFamily information is not 
persisted
     // in compaction log files.
     // Also for the scenario when there is an exception in reading SST files
     // for the file node.
-    if (node.getStartKey() == null || node.getEndKey() == null ||
-        node.getColumnFamily() == null) {
+    if (node.getStartKey() == null || node.getEndKey() == null || 
node.getColumnFamily() == null) {
       LOG.debug("Compaction node with fileName: {} doesn't have startKey, " +
           "endKey and columnFamily details.", node.getFileName());
       return false;
     }
 
-    if (MapUtils.isEmpty(columnFamilyToPrefixMap)) {
-      LOG.debug("Provided columnFamilyToPrefixMap is null or empty.");
+    if (tablePrefixInfo.size() == 0) {
+      LOG.debug("Provided tablePrefixInfo is null or empty.");
       return false;
     }
 
-    if (!columnFamilyToPrefixMap.containsKey(node.getColumnFamily())) {
+    if (!columnFamiliesToLookup.contains(node.getColumnFamily())) {
       LOG.debug("SstFile node: {} is for columnFamily: {} while filter map " +
               "contains columnFamilies: {}.", node.getFileName(),
-          node.getColumnFamily(), columnFamilyToPrefixMap.keySet());
+          node.getColumnFamily(), tablePrefixInfo);
       return true;
     }
 
-    String keyPrefix = columnFamilyToPrefixMap.get(node.getColumnFamily());
-    return !isKeyWithPrefixPresent(keyPrefix, node.getStartKey(),
-        node.getEndKey());
+    String keyPrefix = tablePrefixInfo.getTablePrefix(node.getColumnFamily());
+    return !isKeyWithPrefixPresent(keyPrefix, node.getStartKey(), 
node.getEndKey());
   }
 }
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index fbdb9ea2198..c59f6aeb491 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -86,6 +86,7 @@
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedCheckpoint;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
@@ -237,35 +238,35 @@ public class TestRocksDBCheckpointDiffer {
       )
   );
 
-  private static Map<String, String> columnFamilyToPrefixMap1 =
-      new HashMap<String, String>() {
+  private static TablePrefixInfo columnFamilyToPrefixMap1 =
+      new TablePrefixInfo(new HashMap<String, String>() {
         {
           put("keyTable", "/volume/bucket1/");
           // Simply using bucketName instead of ID for the test.
           put("directoryTable", "/volume/bucket1/");
           put("fileTable", "/volume/bucket1/");
         }
-      };
+      });
 
-  private static Map<String, String> columnFamilyToPrefixMap2 =
-      new HashMap<String, String>() {
+  private static TablePrefixInfo columnFamilyToPrefixMap2 =
+      new TablePrefixInfo(new HashMap<String, String>() {
         {
           put("keyTable", "/volume/bucket2/");
           // Simply using bucketName instead of ID for the test.
           put("directoryTable", "/volume/bucket2/");
           put("fileTable", "/volume/bucket2/");
         }
-      };
+      });
 
-  private static Map<String, String> columnFamilyToPrefixMap3 =
-      new HashMap<String, String>() {
+  private static TablePrefixInfo columnFamilyToPrefixMap3 =
+      new TablePrefixInfo(new HashMap<String, String>() {
         {
           put("keyTable", "/volume/bucket3/");
           // Simply using bucketName instead of ID for the test.
           put("directoryTable", "/volume/bucket3/");
           put("fileTable", "/volume/bucket3/");
         }
-      };
+      });
 
   private static final int NUM_ROW = 250000;
   private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 49999;
@@ -513,7 +514,7 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB() {
     DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
         "/path/to/dbcp4", UUID.randomUUID(), 18000L, null, 
Mockito.mock(ManagedRocksDB.class));
 
-    Map<String, String> prefixMap = ImmutableMap.of("col1", "c", "col2", "d");
+    TablePrefixInfo prefixMap = new TablePrefixInfo(ImmutableMap.of("col1", 
"c", "col2", "d"));
     DifferSnapshotInfo snapshotInfo5 = new DifferSnapshotInfo(
         "/path/to/dbcp2", UUID.randomUUID(), 0L, prefixMap, 
Mockito.mock(ManagedRocksDB.class));
     DifferSnapshotInfo snapshotInfo6 = new DifferSnapshotInfo(
@@ -740,92 +741,102 @@ public void testGetSSTDiffListWithoutDB(String 
description,
       Set<String> expectedSSTDiffFiles,
       boolean expectingException,
       Map<String, String[]> metaDataMap) {
-    try (MockedStatic<RocksDiffUtils> mockedRocksdiffUtil = 
Mockito.mockStatic(RocksDiffUtils.class,
-        Mockito.CALLS_REAL_METHODS)) {
-      mockedRocksdiffUtil.when(() -> 
RocksDiffUtils.constructBucketKey(anyString())).thenAnswer(i -> 
i.getArgument(0));
-      boolean exceptionThrown = false;
-      if (compactionLog != null) {
-        // Construct DAG from compaction log input
-        Arrays.stream(compactionLog.split("\n")).forEach(
-            rocksDBCheckpointDiffer::processCompactionLogLine);
-      } else if (compactionLogEntries != null) {
-        compactionLogEntries.forEach(entry ->
-            rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+
+    boolean exceptionThrown = false;
+    if (compactionLog != null) {
+      // Construct DAG from compaction log input
+      Arrays.stream(compactionLog.split("\n")).forEach(
+          rocksDBCheckpointDiffer::processCompactionLogLine);
+    } else if (compactionLogEntries != null) {
+      compactionLogEntries.forEach(entry ->
+          rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+    } else {
+      throw new IllegalArgumentException("One of compactionLog and " +
+          "compactionLogEntries should be non-null.");
+    }
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
+
+    Set<String> actualSameSstFiles = new HashSet<>();
+    Set<String> actualDiffSstFiles = new HashSet<>();
+
+    try {
+      rocksDBCheckpointDiffer.internalGetSSTDiffList(
+          srcSnapshot,
+          destSnapshot,
+          srcSnapshotSstFiles,
+          destSnapshotSstFiles,
+          actualSameSstFiles,
+          actualDiffSstFiles);
+    } catch (RuntimeException rtEx) {
+      if (!expectingException) {
+        fail("Unexpected exception thrown in test.");
       } else {
-        throw new IllegalArgumentException("One of compactionLog and " +
-            "compactionLogEntries should be non-null.");
+        exceptionThrown = true;
       }
-      rocksDBCheckpointDiffer.loadAllCompactionLogs();
+    }
 
-      Set<String> actualSameSstFiles = new HashSet<>();
-      Set<String> actualDiffSstFiles = new HashSet<>();
+    if (expectingException && !exceptionThrown) {
+      fail("Expecting exception but none thrown.");
+    }
 
+    // Check same and different SST files result
+    assertEquals(expectedSameSstFiles, actualSameSstFiles);
+    assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
+    try (MockedStatic<RdbUtil> mockedHandler = 
Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) {
+      RocksDB rocksDB = Mockito.mock(RocksDB.class);
+      Mockito.when(rocksDB.getName()).thenReturn("dummy");
+      Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB);
+      Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB);
+      Mockito.when(srcSnapshot.getRocksDB().getLiveMetadataForSSTFiles())
+          .thenAnswer(invocation -> 
srcSnapshotSstFiles.stream().filter(metaDataMap::containsKey).map(file -> {
+            LiveFileMetaData liveFileMetaData = 
Mockito.mock(LiveFileMetaData.class);
+            String[] metaData = metaDataMap.get(file);
+            Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + file + 
SST_FILE_EXTENSION);
+            
Mockito.when(liveFileMetaData.smallestKey()).thenReturn(metaData[0].getBytes(UTF_8));
+            
Mockito.when(liveFileMetaData.largestKey()).thenReturn(metaData[1].getBytes(UTF_8));
+            
Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(metaData[2].getBytes(UTF_8));
+            return liveFileMetaData;
+          }).collect(Collectors.toMap(liveFileMetaData -> 
FilenameUtils.getBaseName(liveFileMetaData.fileName()),
+              Function.identity())));
+      Set<String> tablesToLookup;
+      String dummyTable;
+      if (srcSnapshot.getTablePrefixes() != null) {
+        tablesToLookup = srcSnapshot.getTablePrefixes().getTableNames();
+        dummyTable = tablesToLookup.stream().findAny().get();
+      } else {
+        tablesToLookup = mock(Set.class);
+        when(tablesToLookup.contains(anyString())).thenReturn(true);
+        dummyTable = "dummy";
+      }
+      mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any()))
+          .thenAnswer(i -> {
+            Set<String> sstFiles = 
i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles
+                : destSnapshotSstFiles;
+            return sstFiles.stream().map(fileName -> {
+              LiveFileMetaData liveFileMetaData = mock(LiveFileMetaData.class);
+              when(liveFileMetaData.fileName()).thenReturn("/" + fileName + 
SST_FILE_EXTENSION);
+              
when(liveFileMetaData.columnFamilyName()).thenReturn(dummyTable.getBytes(UTF_8));
+              return liveFileMetaData;
+            }).collect(Collectors.toList());
+          });
       try {
-        rocksDBCheckpointDiffer.internalGetSSTDiffList(
-            srcSnapshot,
-            destSnapshot,
-            srcSnapshotSstFiles,
-            destSnapshotSstFiles,
-            actualSameSstFiles,
-            actualDiffSstFiles);
+        Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
+                .map(files -> 
files.stream().sorted().collect(Collectors.toList())).orElse(null),
+            rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot, 
tablesToLookup)
+                .map(i -> 
i.stream().sorted().collect(Collectors.toList())).orElse(null));
       } catch (RuntimeException rtEx) {
         if (!expectingException) {
+          rtEx.printStackTrace();
           fail("Unexpected exception thrown in test.");
         } else {
           exceptionThrown = true;
         }
       }
-
-      if (expectingException && !exceptionThrown) {
-        fail("Expecting exception but none thrown.");
-      }
-
-      // Check same and different SST files result
-      assertEquals(expectedSameSstFiles, actualSameSstFiles);
-      assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
-      try (MockedStatic<RdbUtil> mockedHandler = 
Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) {
-        RocksDB rocksDB = Mockito.mock(RocksDB.class);
-        Mockito.when(rocksDB.getName()).thenReturn("dummy");
-        Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB);
-        Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB);
-        Mockito.when(srcSnapshot.getRocksDB().getLiveMetadataForSSTFiles())
-            .thenAnswer(invocation -> 
srcSnapshotSstFiles.stream().filter(metaDataMap::containsKey).map(file -> {
-              LiveFileMetaData liveFileMetaData = 
Mockito.mock(LiveFileMetaData.class);
-              String[] metaData = metaDataMap.get(file);
-              Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + file 
+ SST_FILE_EXTENSION);
-              
Mockito.when(liveFileMetaData.smallestKey()).thenReturn(metaData[0].getBytes(UTF_8));
-              
Mockito.when(liveFileMetaData.largestKey()).thenReturn(metaData[1].getBytes(UTF_8));
-              
Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(metaData[2].getBytes(UTF_8));
-              return liveFileMetaData;
-            }).collect(Collectors.toMap(liveFileMetaData -> 
FilenameUtils.getBaseName(liveFileMetaData.fileName()),
-                Function.identity())));
-        mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any()))
-            .thenAnswer(i -> {
-              Set<String> sstFiles = 
i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles
-                  : destSnapshotSstFiles;
-              return sstFiles.stream().map(fileName -> {
-                LiveFileMetaData liveFileMetaData = 
Mockito.mock(LiveFileMetaData.class);
-                Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + 
fileName + SST_FILE_EXTENSION);
-                return liveFileMetaData;
-              }).collect(Collectors.toList());
-            });
-        try {
-          Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
-                  .map(files -> 
files.stream().sorted().collect(Collectors.toList())).orElse(null),
-              rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot)
-                  .map(i -> 
i.stream().sorted().collect(Collectors.toList())).orElse(null));
-        } catch (RuntimeException rtEx) {
-          if (!expectingException) {
-            fail("Unexpected exception thrown in test.");
-          } else {
-            exceptionThrown = true;
-          }
-        }
-      }
-      if (expectingException && !exceptionThrown) {
-        fail("Expecting exception but none thrown.");
-      }
     }
+    if (expectingException && !exceptionThrown) {
+      fail("Expecting exception but none thrown.");
+    }
+
   }
 
   /**
@@ -909,13 +920,41 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
     assertEquals(snapshots.size(), expectedDifferResult.size());
 
     int index = 0;
+    List<String> expectedDiffFiles = new ArrayList<>();
     for (DifferSnapshotInfo snap : snapshots) {
-      // Returns a list of SST files to be fed into RocksDiff
-      List<String> sstDiffList = differ.getSSTDiffList(src, 
snap).orElse(Collections.emptyList());
-      LOG.info("SST diff list from '{}' to '{}': {}",
-          src.getDbPath(), snap.getDbPath(), sstDiffList);
+      // Returns a list of SST files to be fed into RocksCheckpointDiffer Dag.
+      List<String> tablesToTrack = new 
ArrayList<>(COLUMN_FAMILIES_TO_TRACK_IN_DAG);
+      // Add some invalid index.
+      tablesToTrack.add("compactionLogTable");
+      Set<String> tableToLookUp = new HashSet<>();
+      for (int i = 0; i < Math.pow(2, tablesToTrack.size()); i++) {
+        tableToLookUp.clear();
+        expectedDiffFiles.clear();
+        int mask = i;
+        while (mask != 0) {
+          int firstSetBitIndex = Integer.numberOfTrailingZeros(mask);
+          tableToLookUp.add(tablesToTrack.get(firstSetBitIndex));
+          mask &= mask - 1;
+        }
+        for (String diffFile : expectedDifferResult.get(index)) {
+          String columnFamily;
+          if 
(rocksDBCheckpointDiffer.getCompactionNodeMap().containsKey(diffFile)) {
+            columnFamily = 
rocksDBCheckpointDiffer.getCompactionNodeMap().get(diffFile).getColumnFamily();
+          } else {
+            columnFamily = 
bytes2String(src.getRocksDB().getLiveMetadataForSSTFiles().get(diffFile).columnFamilyName());
+          }
+          if (columnFamily == null || tableToLookUp.contains(columnFamily)) {
+            expectedDiffFiles.add(diffFile);
+          }
+        }
+        List<String> sstDiffList = differ.getSSTDiffList(src, snap, 
tableToLookUp).orElse(Collections.emptyList());
+        LOG.info("SST diff list from '{}' to '{}': {} tables: {}",
+            src.getDbPath(), snap.getDbPath(), sstDiffList, tableToLookUp);
+
+        assertEquals(expectedDiffFiles, sstDiffList);
+
+      }
 
-      assertEquals(expectedDifferResult.get(index), sstDiffList);
       ++index;
     }
   }
@@ -1573,7 +1612,7 @@ public void testGetSSTDiffListWithoutDB2(
       Set<String> destSnapshotSstFiles,
       Set<String> expectedSameSstFiles,
       Set<String> expectedDiffSstFiles,
-      Map<String, String> columnFamilyToPrefixMap
+      TablePrefixInfo columnFamilyPrefixInfo
   ) {
     compactionLogEntryList.forEach(entry ->
         rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
@@ -1583,9 +1622,9 @@ public void testGetSSTDiffListWithoutDB2(
     // Snapshot is used for logging purpose and short-circuiting traversal.
     // Using gen 0 for this test.
     DifferSnapshotInfo mockedSourceSnapshot = new DifferSnapshotInfo(
-        "/path/to/dbcp1", UUID.randomUUID(), 0L, columnFamilyToPrefixMap, 
null);
+        "/path/to/dbcp1", UUID.randomUUID(), 0L, columnFamilyPrefixInfo, null);
     DifferSnapshotInfo mockedDestinationSnapshot = new DifferSnapshotInfo(
-        "/path/to/dbcp2", UUID.randomUUID(), 0L, columnFamilyToPrefixMap, 
null);
+        "/path/to/dbcp2", UUID.randomUUID(), 0L, columnFamilyPrefixInfo, null);
 
     Set<String> actualSameSstFiles = new HashSet<>();
     Set<String> actualDiffSstFiles = new HashSet<>();
@@ -1621,7 +1660,7 @@ private static Stream<Arguments> shouldSkipNodeCases() {
 
   @ParameterizedTest()
   @MethodSource("shouldSkipNodeCases")
-  public void testShouldSkipNode(Map<String, String> columnFamilyToPrefixMap,
+  public void testShouldSkipNode(TablePrefixInfo tablePrefixInfo,
                                  List<Boolean> expectedResponse) {
     compactionLogEntryList.forEach(entry ->
         rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
@@ -1632,8 +1671,7 @@ public void testShouldSkipNode(Map<String, String> 
columnFamilyToPrefixMap,
         .getCompactionNodeMap().values().stream()
         .sorted(Comparator.comparing(CompactionNode::getFileName))
         .map(node ->
-            RocksDiffUtils.shouldSkipNode(node,
-                columnFamilyToPrefixMap))
+            RocksDiffUtils.shouldSkipNode(node, tablePrefixInfo, 
tablePrefixInfo.getTableNames()))
         .collect(Collectors.toList());
 
     assertEquals(expectedResponse, actualResponse);
@@ -1646,7 +1684,7 @@ private static Stream<Arguments> 
shouldSkipNodeEdgeCases() {
     CompactionNode nullEndKeyNode = new CompactionNode("fileName", 100, 
"startKey", null, "columnFamily");
 
     return Stream.of(
-        Arguments.of(node, Collections.emptyMap(), false),
+        Arguments.of(node, new TablePrefixInfo(Collections.emptyMap()), false),
         Arguments.of(node, columnFamilyToPrefixMap1, true),
         Arguments.of(nullColumnFamilyNode, columnFamilyToPrefixMap1, false),
         Arguments.of(nullStartKeyNode, columnFamilyToPrefixMap1, false),
@@ -1657,7 +1695,7 @@ private static Stream<Arguments> 
shouldSkipNodeEdgeCases() {
   @MethodSource("shouldSkipNodeEdgeCases")
   public void testShouldSkipNodeEdgeCase(
       CompactionNode node,
-      Map<String, String> columnFamilyToPrefixMap,
+      TablePrefixInfo columnFamilyPrefixInfo,
       boolean expectedResponse
   ) {
     compactionLogEntryList.forEach(entry ->
@@ -1666,7 +1704,7 @@ public void testShouldSkipNodeEdgeCase(
     rocksDBCheckpointDiffer.loadAllCompactionLogs();
 
     assertEquals(expectedResponse, RocksDiffUtils.shouldSkipNode(node,
-        columnFamilyToPrefixMap));
+        columnFamilyPrefixInfo, columnFamilyPrefixInfo.getTableNames()));
   }
 
   private void createKeys(ColumnFamilyHandle cfh,
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
index 324c29015e1..a44baf1905f 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
@@ -17,20 +17,24 @@
 
 package org.apache.ozone.rocksdiff;
 
+import static 
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
-import static org.mockito.ArgumentMatchers.anyString;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
 import org.assertj.core.util.Sets;
 import org.junit.jupiter.api.Assertions;
@@ -38,7 +42,6 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDB;
@@ -95,23 +98,35 @@ public void 
testFilterRelevantSstFilesWithPreExistingCompactionInfo(String valid
                                                                       String 
validSSTFileEndRange,
                                                                       String 
invalidSSTFileStartRange,
                                                                       String 
invalidSSTFileEndRange) {
-    try (MockedStatic<RocksDiffUtils> mockedHandler = 
Mockito.mockStatic(RocksDiffUtils.class,
-        Mockito.CALLS_REAL_METHODS)) {
-      mockedHandler.when(() -> 
RocksDiffUtils.constructBucketKey(anyString())).thenAnswer(i -> 
i.getArgument(0));
-      String validSstFile = "filePath/validSSTFile.sst";
-      String invalidSstFile = "filePath/invalidSSTFile.sst";
-      String untrackedSstFile = "filePath/untrackedSSTFile.sst";
-      String expectedPrefix = 
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
-          validSSTFileStartRange.charAt(0)) / 2));
-      Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, 
untrackedSstFile);
-      RocksDiffUtils.filterRelevantSstFiles(sstFile, 
ImmutableMap.of(validSSTColumnFamilyName, expectedPrefix),
-          ImmutableMap.of("validSSTFile", new CompactionNode(validSstFile, 0, 
validSSTFileStartRange,
+    String validSstFile = "filePath/validSSTFile.sst";
+    String invalidSstFile = "filePath/invalidSSTFile.sst";
+    String untrackedSstFile = "filePath/untrackedSSTFile.sst";
+    String expectedPrefix = 
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
+        validSSTFileStartRange.charAt(0)) / 2));
+    Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, 
untrackedSstFile);
+    Set<String> inputSstFiles = new HashSet<>();
+    List<Set<String>> tablesToLookupSet = 
Arrays.asList(ImmutableSet.of(validSSTColumnFamilyName),
+        ImmutableSet.of(invalidColumnFamilyName), 
ImmutableSet.of(validSSTColumnFamilyName, invalidColumnFamilyName),
+        Collections.emptySet());
+    for (Set<String> tablesToLookup : tablesToLookupSet) {
+      inputSstFiles.clear();
+      inputSstFiles.addAll(sstFile);
+      RocksDiffUtils.filterRelevantSstFiles(inputSstFiles,
+          new TablePrefixInfo(
+              new HashMap<String, String>() {{
+                put(invalidColumnFamilyName, 
getLexicographicallyHigherString(invalidSSTFileEndRange));
+                put(validSSTColumnFamilyName, expectedPrefix);
+              }}), ImmutableMap.of("validSSTFile", new 
CompactionNode(validSstFile, 0, validSSTFileStartRange,
                   validSSTFileEndRange, validSSTColumnFamilyName), 
"invalidSSTFile",
-              new CompactionNode(invalidSstFile, 0, invalidSSTFileStartRange,
-                  invalidSSTFileEndRange, invalidColumnFamilyName)));
-      Assertions.assertEquals(Sets.newTreeSet(validSstFile, untrackedSstFile), 
sstFile);
+                new CompactionNode(invalidSstFile, 0, invalidSSTFileStartRange,
+                  invalidSSTFileEndRange, invalidColumnFamilyName)), 
tablesToLookup);
+      if (tablesToLookup.contains(validSSTColumnFamilyName)) {
+        Assertions.assertEquals(Sets.newTreeSet(validSstFile, 
untrackedSstFile), inputSstFiles,
+            "Failed for " + tablesToLookup);
+      } else {
+        Assertions.assertEquals(Sets.newTreeSet(untrackedSstFile), 
inputSstFiles, "Failed for " + tablesToLookup);
+      }
     }
-
   }
 
   private LiveFileMetaData getMockedLiveFileMetadata(String columnFamilyName, 
String startRange,
@@ -133,44 +148,39 @@ public void testFilterRelevantSstFilesFromDB(String 
validSSTColumnFamilyName,
                                                String validSSTFileEndRange,
                                                String invalidSSTFileStartRange,
                                                String invalidSSTFileEndRange) {
-    try (MockedStatic<RocksDiffUtils> mockedHandler = 
Mockito.mockStatic(RocksDiffUtils.class,
-        Mockito.CALLS_REAL_METHODS)) {
-      mockedHandler.when(() -> 
RocksDiffUtils.constructBucketKey(anyString())).thenAnswer(i -> 
i.getArgument(0));
-      for (int numberOfDBs = 1; numberOfDBs < 10; numberOfDBs++) {
-        String validSstFile = "filePath/validSSTFile.sst";
-        String invalidSstFile = "filePath/invalidSSTFile.sst";
-        String untrackedSstFile = "filePath/untrackedSSTFile.sst";
-        int expectedDBKeyIndex = numberOfDBs / 2;
-        ManagedRocksDB[] rocksDBs =
-            IntStream.range(0, numberOfDBs).mapToObj(i -> 
Mockito.mock(ManagedRocksDB.class))
-                .collect(Collectors.toList()).toArray(new 
ManagedRocksDB[numberOfDBs]);
-        for (int i = 0; i < numberOfDBs; i++) {
-          ManagedRocksDB managedRocksDB = rocksDBs[i];
-          RocksDB mockedRocksDB = Mockito.mock(RocksDB.class);
-          Mockito.when(managedRocksDB.get()).thenReturn(mockedRocksDB);
-          if (i == expectedDBKeyIndex) {
-            LiveFileMetaData validLiveFileMetaData = 
getMockedLiveFileMetadata(validSSTColumnFamilyName,
-                validSSTFileStartRange, validSSTFileEndRange, "validSSTFile");
-            LiveFileMetaData invalidLiveFileMetaData = 
getMockedLiveFileMetadata(invalidColumnFamilyName,
-                invalidSSTFileStartRange, invalidSSTFileEndRange, 
"invalidSSTFile");
-            List<LiveFileMetaData> liveFileMetaDatas = 
Arrays.asList(validLiveFileMetaData, invalidLiveFileMetaData);
-            
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(liveFileMetaDatas);
-          } else {
-            
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(Collections.emptyList());
-          }
-          Mockito.when(managedRocksDB.getLiveMetadataForSSTFiles())
-              .thenAnswer(invocation -> 
ManagedRocksDB.getLiveMetadataForSSTFiles(mockedRocksDB));
+    for (int numberOfDBs = 1; numberOfDBs < 10; numberOfDBs++) {
+      String validSstFile = "filePath/validSSTFile.sst";
+      String invalidSstFile = "filePath/invalidSSTFile.sst";
+      String untrackedSstFile = "filePath/untrackedSSTFile.sst";
+      int expectedDBKeyIndex = numberOfDBs / 2;
+      ManagedRocksDB[] rocksDBs =
+          IntStream.range(0, numberOfDBs).mapToObj(i -> 
Mockito.mock(ManagedRocksDB.class))
+              .collect(Collectors.toList()).toArray(new 
ManagedRocksDB[numberOfDBs]);
+      for (int i = 0; i < numberOfDBs; i++) {
+        ManagedRocksDB managedRocksDB = rocksDBs[i];
+        RocksDB mockedRocksDB = Mockito.mock(RocksDB.class);
+        Mockito.when(managedRocksDB.get()).thenReturn(mockedRocksDB);
+        if (i == expectedDBKeyIndex) {
+          LiveFileMetaData validLiveFileMetaData = 
getMockedLiveFileMetadata(validSSTColumnFamilyName,
+              validSSTFileStartRange, validSSTFileEndRange, "validSSTFile");
+          LiveFileMetaData invalidLiveFileMetaData = 
getMockedLiveFileMetadata(invalidColumnFamilyName,
+              invalidSSTFileStartRange, invalidSSTFileEndRange, 
"invalidSSTFile");
+          List<LiveFileMetaData> liveFileMetaDatas = 
Arrays.asList(validLiveFileMetaData, invalidLiveFileMetaData);
+          
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(liveFileMetaDatas);
+        } else {
+          
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(Collections.emptyList());
         }
-
-        String expectedPrefix = 
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
-            validSSTFileStartRange.charAt(0)) / 2));
-        Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, 
untrackedSstFile);
-        RocksDiffUtils.filterRelevantSstFiles(sstFile, 
ImmutableMap.of(validSSTColumnFamilyName, expectedPrefix),
-            Collections.emptyMap(), rocksDBs);
-        Assertions.assertEquals(Sets.newTreeSet(validSstFile, 
untrackedSstFile), sstFile);
+        Mockito.when(managedRocksDB.getLiveMetadataForSSTFiles())
+            .thenAnswer(invocation -> 
ManagedRocksDB.getLiveMetadataForSSTFiles(mockedRocksDB));
       }
 
+      String expectedPrefix = 
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
+          validSSTFileStartRange.charAt(0)) / 2));
+      Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, 
untrackedSstFile);
+      RocksDiffUtils.filterRelevantSstFiles(sstFile, new 
TablePrefixInfo(ImmutableMap.of(validSSTColumnFamilyName,
+              expectedPrefix)), Collections.emptyMap(),
+          ImmutableSet.of(validSSTColumnFamilyName), rocksDBs);
+      Assertions.assertEquals(Sets.newTreeSet(validSstFile, untrackedSstFile), 
sstFile);
     }
-
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index 3f10edae586..069ecccdd60 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -22,7 +22,7 @@
 import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
-import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -146,8 +146,7 @@ private DifferSnapshotInfo getDifferSnapshotInfo(
     // persisted at the time of snapshot creation, as the snapshot generation
     return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotId(),
         snapshotInfo.getDbTxSequenceNumber(),
-        getColumnFamilyToKeyPrefixMap(omMetadataManager, volumeName,
-            bucketName),
+        omMetadataManager.getTableBucketPrefix(volumeName, bucketName),
         snapshotDB);
   }
 
@@ -221,7 +220,8 @@ public void testDAGReconstruction()
     final File checkpointSnap2 = new File(snap2.getDbPath());
     GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);
 
-    List<String> sstDiffList21 = differ.getSSTDiffList(snap2, 
snap1).orElse(Collections.emptyList());
+    List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
     LOG.debug("Got diff list: {}", sstDiffList21);
 
     // Delete 1000 keys, take a 3rd snapshot, and do another diff
@@ -240,13 +240,16 @@ public void testDAGReconstruction()
     final File checkpointSnap3 = new File(snap3.getDbPath());
     GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
 
-    List<String> sstDiffList32 = differ.getSSTDiffList(snap3, 
snap2).orElse(Collections.emptyList());
+    List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
 
     // snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
-    List<String> sstDiffList31 = differ.getSSTDiffList(snap3, 
snap1).orElse(Collections.emptyList());
+    List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
 
     // Same snapshot. Result should be empty list
-    List<String> sstDiffList22 = differ.getSSTDiffList(snap2, 
snap2).orElse(Collections.emptyList());
+    List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
     assertThat(sstDiffList22).isEmpty();
     snapDB1.close();
     snapDB2.close();
@@ -272,13 +275,16 @@ public void testDAGReconstruction()
         volumeName, bucketName, "snap3",
         ((RDBStore) snapDB3.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
-    List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, 
snap1).orElse(Collections.emptyList());
+    List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
     assertEquals(sstDiffList21, sstDiffList21Run2);
 
-    List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, 
snap2).orElse(Collections.emptyList());
+    List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
     assertEquals(sstDiffList32, sstDiffList32Run2);
 
-    List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, 
snap1).orElse(Collections.emptyList());
+    List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1, 
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
+        .orElse(Collections.emptyList());
     assertEquals(sstDiffList31, sstDiffList31Run2);
     snapDB1.close();
     snapDB2.close();
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 7a087227734..baac362da74 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -686,6 +687,15 @@ String getMultipartKey(long volumeId, long bucketId,
   boolean containsIncompleteMPUs(String volume, String bucket)
       throws IOException;
 
+  TablePrefixInfo getTableBucketPrefix(String volume, String bucket) throws 
IOException;
+
+  /**
+   * Computes the bucket prefix for a table.
+   * @return would return "" if the table doesn't have bucket prefixed based 
key.
+   * @throws IOException
+   */
+  String getTableBucketPrefix(String tableName, String volume, String bucket) 
throws IOException;
+
   /**
    * Represents a unique identifier for a specific bucket within a volume.
    *
@@ -724,4 +734,5 @@ public int hashCode() {
       return Objects.hash(volumeId, bucketId);
     }
   }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index cf6694480c0..f4a900435e7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -809,9 +809,10 @@ public PendingKeysDeletion getPendingDeletionKeys(
     int notReclaimableKeyCount = 0;
 
     // Bucket prefix would be empty if volume is empty i.e. either null or "".
-    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
+    Table<String, RepeatedOmKeyInfo> deletedTable = 
metadataManager.getDeletedTable();
+    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, 
deletedTable);
     try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
-             delKeyIter = 
metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) {
+             delKeyIter = deletedTable.iterator(bucketPrefix.orElse(""))) {
 
       /* Seeking to the start key if it not null. The next key picked up would 
be ensured to start with the bucket
          prefix, {@link 
org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this.
@@ -891,7 +892,7 @@ private <V, R> List<KeyValue<String, R>> 
getTableEntries(String startKey,
     return entries;
   }
 
-  private Optional<String> getBucketPrefix(String volumeName, String 
bucketName, boolean isFSO) throws IOException {
+  private Optional<String> getBucketPrefix(String volumeName, String 
bucketName, Table table) throws IOException {
     // Bucket prefix would be empty if both volume & bucket is empty i.e. 
either null or "".
     if (StringUtils.isEmpty(volumeName) && StringUtils.isEmpty(bucketName)) {
       return Optional.empty();
@@ -899,17 +900,17 @@ private Optional<String> getBucketPrefix(String 
volumeName, String bucketName, b
       throw new IOException("One of volume : " + volumeName + ", bucket: " + 
bucketName + " is empty." +
           " Either both should be empty or none of the arguments should be 
empty");
     }
-    return isFSO ? 
Optional.of(metadataManager.getBucketKeyPrefixFSO(volumeName, bucketName)) :
-        Optional.of(metadataManager.getBucketKeyPrefix(volumeName, 
bucketName));
+    return Optional.of(metadataManager.getTableBucketPrefix(table.getName(), 
volumeName, bucketName));
   }
 
   @Override
   public List<KeyValue<String, String>> getRenamesKeyEntries(
       String volume, String bucket, String startKey,
       CheckedFunction<KeyValue<String, String>, Boolean, IOException> filter, 
int size) throws IOException {
-    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
+    Table<String, String> snapshotRenamedTable = 
metadataManager.getSnapshotRenamedTable();
+    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, 
snapshotRenamedTable);
     try (TableIterator<String, ? extends KeyValue<String, String>>
-             renamedKeyIter = 
metadataManager.getSnapshotRenamedTable().iterator(bucketPrefix.orElse(""))) {
+             renamedKeyIter = 
snapshotRenamedTable.iterator(bucketPrefix.orElse(""))) {
       return getTableEntries(startKey, renamedKeyIter, Function.identity(), 
filter, size);
     }
   }
@@ -957,9 +958,10 @@ public List<KeyValue<String, List<OmKeyInfo>>> 
getDeletedKeyEntries(
       String volume, String bucket, String startKey,
       CheckedFunction<KeyValue<String, RepeatedOmKeyInfo>, Boolean, 
IOException> filter,
       int size) throws IOException {
-    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
+    Table<String, RepeatedOmKeyInfo> deletedTable = 
metadataManager.getDeletedTable();
+    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, 
deletedTable);
     try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
-             delKeyIter = 
metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) {
+             delKeyIter = deletedTable.iterator(bucketPrefix.orElse(""))) {
       return getTableEntries(startKey, delKeyIter, 
RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size);
     }
   }
@@ -2263,8 +2265,9 @@ private void slimLocationVersion(OmKeyInfo... keyInfos) {
   @Override
   public TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> 
getDeletedDirEntries(
       String volume, String bucket) throws IOException {
-    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, true);
-    return 
metadataManager.getDeletedDirTable().iterator(bucketPrefix.orElse(""));
+    Table<String, OmKeyInfo> deletedDirTable = 
metadataManager.getDeletedDirTable();
+    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, 
deletedDirTable);
+    return deletedDirTable.iterator(bucketPrefix.orElse(""));
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 4ca647e4ea6..e7826708b89 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -28,6 +28,18 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_RENAMED_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
@@ -72,6 +84,7 @@
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.TypedTable;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -1848,6 +1861,60 @@ public boolean containsIncompleteMPUs(String volume, 
String bucket)
     return false;
   }
 
+  // NOTE: Update both getTableBucketPrefix(volume, bucket) & 
getTableBucketPrefix(tableName, volume, bucket)
+  // simultaneously. Implemented duplicate functions to avoid computing 
bucketKeyPrefix redundantly for each and
+  // every table over and over again.
+  @Override
+  public TablePrefixInfo getTableBucketPrefix(String volume, String bucket) 
throws IOException {
+    String keyPrefix = getBucketKeyPrefix(volume, bucket);
+    String keyPrefixFso = getBucketKeyPrefixFSO(volume, bucket);
+    // Set value to 12 to avoid creating too big a HashTable unnecessarily.
+    Map<String, String> tablePrefixMap = new HashMap<>(12, 1.0f);
+
+    tablePrefixMap.put(VOLUME_TABLE, getVolumeKey(volume));
+    tablePrefixMap.put(BUCKET_TABLE, getBucketKey(volume, bucket));
+
+    tablePrefixMap.put(KEY_TABLE, keyPrefix);
+    tablePrefixMap.put(DELETED_TABLE, keyPrefix);
+    tablePrefixMap.put(SNAPSHOT_RENAMED_TABLE, keyPrefix);
+    tablePrefixMap.put(OPEN_KEY_TABLE, keyPrefix);
+    tablePrefixMap.put(MULTIPART_INFO_TABLE, keyPrefix);
+    tablePrefixMap.put(SNAPSHOT_INFO_TABLE, keyPrefix);
+
+    tablePrefixMap.put(FILE_TABLE, keyPrefixFso);
+    tablePrefixMap.put(DIRECTORY_TABLE, keyPrefixFso);
+    tablePrefixMap.put(DELETED_DIR_TABLE, keyPrefixFso);
+    tablePrefixMap.put(OPEN_FILE_TABLE, keyPrefixFso);
+
+    return new TablePrefixInfo(tablePrefixMap);
+  }
+
+  @Override
+  public String getTableBucketPrefix(String tableName, String volume, String 
bucket) throws IOException {
+    switch (tableName) {
+    case VOLUME_TABLE:
+      return getVolumeKey(volume);
+    case BUCKET_TABLE:
+      return getBucketKey(volume, bucket);
+    case KEY_TABLE:
+    case DELETED_TABLE:
+    case SNAPSHOT_RENAMED_TABLE:
+    case OPEN_KEY_TABLE:
+    case MULTIPART_INFO_TABLE:
+    case SNAPSHOT_INFO_TABLE:
+      return getBucketKeyPrefix(volume, bucket);
+    case FILE_TABLE:
+    case DIRECTORY_TABLE:
+    case DELETED_DIR_TABLE:
+    case OPEN_FILE_TABLE:
+      return getBucketKeyPrefixFSO(volume, bucket);
+    default:
+      LOG.warn("Unknown table name '{}' passed to getTableBucketPrefix 
(volume='{}', bucket='{}'). " +
+          "Returning empty string.", tableName, volume, bucket);
+      return "";
+    }
+  }
+
   @Override
   public void close() throws IOException {
     stop();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 06215c8df76..453eb3b3b07 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -61,6 +61,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.RemovalListener;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import jakarta.annotation.Nonnull;
 import java.io.File;
@@ -549,14 +550,11 @@ public static DBCheckpoint createOmSnapshotCheckpoint(
     // Clean up active DB's deletedTable right after checkpoint is taken,
     // Snapshot create is processed as a single transaction and
     // transactions are flushed sequentially so, no need to take any lock as 
of now.
-    deleteKeysFromDelKeyTableInSnapshotScope(omMetadataManager,
-        snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), 
batchOperation);
-    // Clean up deletedDirectoryTable as well
-    deleteKeysFromDelDirTableInSnapshotScope(omMetadataManager,
-        snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), 
batchOperation);
-    // Remove entries from snapshotRenamedTable
-    deleteKeysFromSnapRenamedTableInSnapshotScope(omMetadataManager,
-        snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), 
batchOperation);
+    for (Table<String, ?> table : 
ImmutableList.of(omMetadataManager.getDeletedTable(),
+        omMetadataManager.getDeletedDirTable(), 
omMetadataManager.getSnapshotRenamedTable())) {
+      deleteKeysFromTableWithBucketPrefix(omMetadataManager, table,
+          snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), 
batchOperation);
+    }
 
     if (snapshotDirExist) {
       LOG.info("Checkpoint: {} for snapshot {} already exists.",
@@ -573,51 +571,21 @@ public static DBCheckpoint createOmSnapshotCheckpoint(
 
   /**
    * Helper method to perform batch delete range operation on a given key 
prefix.
-   * @param prefix prefix of keys to be deleted
+   * @param metadataManager metadatManager instance
    * @param table table from which keys are to be deleted
+   * @param volume volume corresponding to the bucket
+   * @param bucket bucket corresponding to which keys need to be deleted from 
the table
    * @param batchOperation batch operation
    */
-  private static void deleteKeysFromTableWithPrefix(
-      String prefix, Table<String, ?> table, BatchOperation batchOperation) 
throws IOException {
+  private static void deleteKeysFromTableWithBucketPrefix(OMMetadataManager 
metadataManager,
+      Table<String, ?> table, String volume, String bucket, BatchOperation 
batchOperation) throws IOException {
+    String prefix = metadataManager.getTableBucketPrefix(table.getName(), 
volume, bucket);
     String endKey = getLexicographicallyHigherString(prefix);
     LOG.debug("Deleting key range from {} - startKey: {}, endKey: {}",
         table.getName(), prefix, endKey);
     table.deleteRangeWithBatch(batchOperation, prefix, endKey);
   }
 
-  /**
-   * Helper method to delete DB keys in the snapshot scope (bucket)
-   * from active DB's deletedDirectoryTable.
-   * @param omMetadataManager OMMetadataManager instance
-   * @param volumeName volume name
-   * @param bucketName bucket name
-   * @param batchOperation batch operation
-   */
-  private static void deleteKeysFromSnapRenamedTableInSnapshotScope(
-      OMMetadataManager omMetadataManager, String volumeName,
-      String bucketName, BatchOperation batchOperation) throws IOException {
-
-    final String keyPrefix = omMetadataManager.getBucketKeyPrefix(volumeName, 
bucketName);
-    deleteKeysFromTableWithPrefix(keyPrefix, 
omMetadataManager.getSnapshotRenamedTable(), batchOperation);
-  }
-
-  /**
-   * Helper method to delete DB keys in the snapshot scope (bucket)
-   * from active DB's deletedDirectoryTable.
-   * @param omMetadataManager OMMetadataManager instance
-   * @param volumeName volume name
-   * @param bucketName bucket name
-   * @param batchOperation batch operation
-   */
-  private static void deleteKeysFromDelDirTableInSnapshotScope(
-      OMMetadataManager omMetadataManager, String volumeName,
-      String bucketName, BatchOperation batchOperation) throws IOException {
-
-    // Range delete start key (inclusive)
-    final String keyPrefix = 
omMetadataManager.getBucketKeyPrefixFSO(volumeName, bucketName);
-    deleteKeysFromTableWithPrefix(keyPrefix, 
omMetadataManager.getDeletedDirTable(), batchOperation);
-  }
-
   @VisibleForTesting
   public SnapshotDiffManager getSnapshotDiffManager() {
     return snapshotDiffManager;
@@ -628,22 +596,6 @@ public SnapshotDiffCleanupService 
getSnapshotDiffCleanupService() {
     return snapshotDiffCleanupService;
   }
 
-  /**
-   * Helper method to delete DB keys in the snapshot scope (bucket)
-   * from active DB's deletedTable.
-   * @param omMetadataManager OMMetadataManager instance
-   * @param volumeName volume name
-   * @param bucketName bucket name
-   * @param batchOperation batch operation
-   */
-  private static void deleteKeysFromDelKeyTableInSnapshotScope(
-      OMMetadataManager omMetadataManager, String volumeName,
-      String bucketName, BatchOperation batchOperation) throws IOException {
-    // Range delete prefix (inclusive)
-    final String keyPrefix = omMetadataManager.getBucketKeyPrefix(volumeName, 
bucketName);
-    deleteKeysFromTableWithPrefix(keyPrefix, 
omMetadataManager.getDeletedTable(), batchOperation);
-  }
-
   /**
    * Captures the list of SST files for keyTable, fileTable and directoryTable 
in the DB.
    * @param store AOS or snapshot DB for not defragged or defragged snapshot 
respectively.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index 4b5002eb6c4..8ff0a71d68d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -20,14 +20,12 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
-import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +40,7 @@
 import org.apache.hadoop.hdds.utils.db.RocksDatabase;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -182,11 +181,9 @@ public BackgroundTaskResult call() throws Exception {
 
             LOG.debug("Processing snapshot {} to filter relevant SST Files",
                 snapShotTableKey);
-
-            Map<String, String> columnFamilyNameToPrefixMap =
-                
getColumnFamilyToKeyPrefixMap(ozoneManager.getMetadataManager(),
-                    snapshotInfo.getVolumeName(),
-                    snapshotInfo.getBucketName());
+            TablePrefixInfo bucketPrefixInfo =
+                
ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(),
+                snapshotInfo.getBucketName());
 
             try (
                 UncheckedAutoCloseableSupplier<OmSnapshot> 
snapshotMetadataReader =
@@ -200,7 +197,7 @@ public BackgroundTaskResult call() throws Exception {
               RocksDatabase db = rdbStore.getDb();
               try (BootstrapStateHandler.Lock lock = getBootstrapStateLock()
                   .lock()) {
-                db.deleteFilesNotMatchingPrefix(columnFamilyNameToPrefixMap);
+                db.deleteFilesNotMatchingPrefix(bucketPrefixInfo);
               }
               markSSTFilteredFlagForSnapshot(snapshotInfo);
               snapshotLimit--;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java
index 63cc010b790..fef5dc76c4d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveTableKeysRequest.java
@@ -80,22 +80,21 @@ public OMRequest preExecute(OzoneManager ozoneManager) 
throws IOException {
     UUID fromSnapshotID = 
fromProtobuf(moveTableKeysRequest.getFromSnapshotID());
     SnapshotInfo fromSnapshot = SnapshotUtils.getSnapshotInfo(ozoneManager,
         snapshotChainManager, fromSnapshotID);
-    String bucketKeyPrefix = 
omMetadataManager.getBucketKeyPrefix(fromSnapshot.getVolumeName(),
-        fromSnapshot.getBucketName());
-    String bucketKeyPrefixFSO = 
omMetadataManager.getBucketKeyPrefixFSO(fromSnapshot.getVolumeName(),
-        fromSnapshot.getBucketName());
+
 
     Set<String> keys = new HashSet<>();
     List<SnapshotMoveKeyInfos> deletedKeys = new 
ArrayList<>(moveTableKeysRequest.getDeletedKeysList().size());
 
     //validate deleted key starts with bucket prefix.[/<volName>/<bucketName>/]
+    String deletedTablePrefix = 
omMetadataManager.getTableBucketPrefix(omMetadataManager.getDeletedTable().getName(),
+        fromSnapshot.getVolumeName(), fromSnapshot.getBucketName());
     for (SnapshotMoveKeyInfos deletedKey : 
moveTableKeysRequest.getDeletedKeysList()) {
       // Filter only deleted keys with at least one keyInfo per key.
       if (!deletedKey.getKeyInfosList().isEmpty()) {
         deletedKeys.add(deletedKey);
-        if (!deletedKey.getKey().startsWith(bucketKeyPrefix)) {
+        if (!deletedKey.getKey().startsWith(deletedTablePrefix)) {
           OMException ex = new OMException("Deleted Key: " + deletedKey + " 
doesn't start with prefix "
-              + bucketKeyPrefix, OMException.ResultCodes.INVALID_KEY_NAME);
+              + deletedTablePrefix, OMException.ResultCodes.INVALID_KEY_NAME);
           if (LOG.isDebugEnabled()) {
             
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.SNAPSHOT_MOVE_TABLE_KEYS,
                 null, ex));
@@ -117,14 +116,17 @@ public OMRequest preExecute(OzoneManager ozoneManager) 
throws IOException {
     }
 
     keys.clear();
+    String renamedTablePrefix = omMetadataManager.getTableBucketPrefix(
+        omMetadataManager.getSnapshotRenamedTable().getName(), 
fromSnapshot.getVolumeName(),
+        fromSnapshot.getBucketName());
     List<HddsProtos.KeyValue> renamedKeysList = new 
ArrayList<>(moveTableKeysRequest.getRenamedKeysList().size());
     //validate rename key starts with bucket prefix.[/<volName>/<bucketName>/]
     for (HddsProtos.KeyValue renamedKey : 
moveTableKeysRequest.getRenamedKeysList()) {
       if (renamedKey.hasKey() && renamedKey.hasValue()) {
         renamedKeysList.add(renamedKey);
-        if (!renamedKey.getKey().startsWith(bucketKeyPrefix)) {
+        if (!renamedKey.getKey().startsWith(renamedTablePrefix)) {
           OMException ex = new OMException("Rename Key: " + renamedKey + " 
doesn't start with prefix "
-              + bucketKeyPrefix, OMException.ResultCodes.INVALID_KEY_NAME);
+              + renamedTablePrefix, OMException.ResultCodes.INVALID_KEY_NAME);
           if (LOG.isDebugEnabled()) {
             
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.SNAPSHOT_MOVE_TABLE_KEYS,
                 null, ex));
@@ -147,15 +149,17 @@ public OMRequest preExecute(OzoneManager ozoneManager) 
throws IOException {
     keys.clear();
 
     // Filter only deleted dirs with only one keyInfo per key.
+    String deletedDirTablePrefix = omMetadataManager.getTableBucketPrefix(
+        omMetadataManager.getDeletedDirTable().getName(), 
fromSnapshot.getVolumeName(), fromSnapshot.getBucketName());
     List<SnapshotMoveKeyInfos> deletedDirs = new 
ArrayList<>(moveTableKeysRequest.getDeletedDirsList().size());
     //validate deleted key starts with bucket FSO path 
prefix.[/<volId>/<bucketId>/]
     for (SnapshotMoveKeyInfos deletedDir : 
moveTableKeysRequest.getDeletedDirsList()) {
       // Filter deleted directories with exactly one keyInfo per key.
       if (deletedDir.getKeyInfosList().size() == 1) {
         deletedDirs.add(deletedDir);
-        if (!deletedDir.getKey().startsWith(bucketKeyPrefixFSO)) {
+        if (!deletedDir.getKey().startsWith(deletedDirTablePrefix)) {
           OMException ex = new OMException("Deleted dir: " + deletedDir + " 
doesn't start with prefix " +
-              bucketKeyPrefixFSO, OMException.ResultCodes.INVALID_KEY_NAME);
+              deletedDirTablePrefix, OMException.ResultCodes.INVALID_KEY_NAME);
           if (LOG.isDebugEnabled()) {
             
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.SNAPSHOT_MOVE_TABLE_KEYS,
                 null, ex));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
index 497c7a064b8..55132f71d5c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
@@ -86,7 +86,7 @@ public static Object getINode(Path file) throws IOException {
    * @throws IOException if an I/O error occurs
    */
   public static String getFileInodeAndLastModifiedTimeString(Path file) throws 
IOException {
-    Object inode = Files.readAttributes(file, 
BasicFileAttributes.class).fileKey();
+    Object inode = getINode(file);
     FileTime mTime = Files.getLastModifiedTime(file);
     return String.format("%s-%s", inode, mTime.toMillis());
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 21c2b5979a7..e5bc8dcfa91 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 import static org.apache.commons.lang3.StringUtils.leftPad;
+import static 
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.DELETE;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY;
@@ -38,11 +39,8 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
-import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
-import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
-import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getSnapshotInfo;
 import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_ALREADY_CANCELLED_JOB;
 import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_ALREADY_DONE_JOB;
@@ -102,6 +100,7 @@
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
@@ -360,18 +359,17 @@ private DifferSnapshotInfo getDSIFromSI(SnapshotInfo 
snapshotInfo,
         snapshotOMMM.getStore().getDbLocation().getPath();
     final UUID snapshotId = snapshotInfo.getSnapshotId();
     final long dbTxSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
-
     return new DifferSnapshotInfo(
         checkpointPath,
         snapshotId,
         dbTxSequenceNumber,
-        getColumnFamilyToKeyPrefixMap(snapshotOMMM, volumeName, bucketName),
+        snapshotOMMM.getTableBucketPrefix(volumeName, bucketName),
         ((RDBStore)snapshotOMMM.getStore()).getDb().getManagedRocksDb());
   }
 
   @VisibleForTesting
   protected Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
-                                                  List<String> tablesToLookUp) 
{
+                                                  Set<String> tablesToLookUp) {
     return RdbUtil.getSSTFilesForComparison(((RDBStore)snapshot
         .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
         tablesToLookUp);
@@ -379,7 +377,7 @@ protected Set<String> getSSTFileListForSnapshot(OmSnapshot 
snapshot,
 
   @VisibleForTesting
   protected Map<Object, String> getSSTFileMapForSnapshot(OmSnapshot snapshot,
-      List<String> tablesToLookUp) throws IOException {
+      Set<String> tablesToLookUp) throws IOException {
     return RdbUtil.getSSTFilesWithInodesForComparison(((RDBStore)snapshot
             .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
         tablesToLookUp);
@@ -893,9 +891,7 @@ void generateSnapshotDiffReport(final String jobKey,
 
       final BucketLayout bucketLayout = getBucketLayout(volumeName, bucketName,
           fromSnapshot.getMetadataManager());
-      Map<String, String> tablePrefixes =
-          getColumnFamilyToKeyPrefixMap(toSnapshot.getMetadataManager(),
-              volumeName, bucketName);
+      TablePrefixInfo tablePrefixes = 
toSnapshot.getMetadataManager().getTableBucketPrefix(volumeName, bucketName);
 
       boolean useFullDiff = snapshotForceFullDiff || forceFullDiff;
       boolean performNonNativeDiff = diffDisableNativeLibs || 
disableNativeDiff;
@@ -964,9 +960,8 @@ void generateSnapshotDiffReport(final String jobKey,
             if (bucketLayout.isFileSystemOptimized()) {
               long bucketId = toSnapshot.getMetadataManager()
                   .getBucketId(volumeName, bucketName);
-              String tablePrefix = getTablePrefix(tablePrefixes,
-                  fromSnapshot.getMetadataManager()
-                      .getDirectoryTable().getName());
+              String tablePrefix = 
tablePrefixes.getTablePrefix(fromSnapshot.getMetadataManager()
+                  .getDirectoryTable().getName());
               oldParentIdPathMap.get().putAll(new FSODirectoryPathResolver(
                   tablePrefix, bucketId,
                   fromSnapshot.getMetadataManager().getDirectoryTable())
@@ -1050,7 +1045,7 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap(
       final OmSnapshot fromSnapshot, final OmSnapshot toSnapshot,
       final SnapshotInfo fsInfo, final SnapshotInfo tsInfo,
       final boolean useFullDiff, final boolean skipNativeDiff,
-      final Map<String, String> tablePrefixes,
+      final TablePrefixInfo tablePrefixes,
       final PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
       final PersistentMap<byte[], byte[]> newObjIdToKeyMap,
       final PersistentMap<byte[], Boolean> objectIdToIsDirMap,
@@ -1058,7 +1053,7 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap(
       final Optional<Set<Long>> newParentIds,
       final String diffDir, final String jobKey) throws IOException, 
RocksDBException {
 
-    List<String> tablesToLookUp = Collections.singletonList(fsTable.getName());
+    Set<String> tablesToLookUp = Collections.singleton(fsTable.getName());
     Set<String> deltaFiles = getDeltaFiles(fromSnapshot, toSnapshot,
         tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir, 
jobKey);
 
@@ -1068,7 +1063,7 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap(
     if (skipNativeDiff || !isNativeLibsLoaded) {
       Set<String> inputFiles = getSSTFileListForSnapshot(fromSnapshot, 
tablesToLookUp);
       ManagedRocksDB fromDB = 
((RDBStore)fromSnapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb();
-      RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, fromDB);
+      RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, 
tablesToLookUp, fromDB);
       deltaFiles.addAll(inputFiles);
     }
     if (LOG.isDebugEnabled()) {
@@ -1090,13 +1085,12 @@ void addToObjectIdMap(Table<String, ? extends 
WithParentObjectId> fsTable,
       PersistentMap<byte[], Boolean> objectIdToIsDirMap,
       Optional<Set<Long>> oldParentIds,
       Optional<Set<Long>> newParentIds,
-      Map<String, String> tablePrefixes, String jobKey) throws IOException, 
RocksDBException {
+      TablePrefixInfo tablePrefixes, String jobKey) throws IOException, 
RocksDBException {
     if (deltaFiles.isEmpty()) {
       return;
     }
-    String tablePrefix = getTablePrefix(tablePrefixes, fsTable.getName());
-    boolean isDirectoryTable =
-        fsTable.getName().equals(DIRECTORY_TABLE);
+    String tablePrefix = tablePrefixes.getTablePrefix(fsTable.getName());
+    boolean isDirectoryTable = fsTable.getName().equals(DIRECTORY_TABLE);
     SstFileSetReader sstFileReader = new SstFileSetReader(deltaFiles);
     validateEstimatedKeyChangesAreInLimits(sstFileReader);
     long totalEstimatedKeysToProcess = sstFileReader.getEstimatedTotalKeys();
@@ -1106,9 +1100,7 @@ void addToObjectIdMap(Table<String, ? extends 
WithParentObjectId> fsTable,
     double[] checkpoint = new double[1];
     checkpoint[0] = stepIncreasePct;
     if (Strings.isNotEmpty(tablePrefix)) {
-      char[] upperBoundCharArray = tablePrefix.toCharArray();
-      upperBoundCharArray[upperBoundCharArray.length - 1] += 1;
-      sstFileReaderUpperBound = String.valueOf(upperBoundCharArray);
+      sstFileReaderUpperBound = getLexicographicallyHigherString(tablePrefix);
     }
     try (Stream<String> keysToCheck = nativeRocksToolsLoaded ?
         sstFileReader.getKeyStreamWithTombstone(sstFileReaderLowerBound, 
sstFileReaderUpperBound)
@@ -1170,11 +1162,11 @@ void addToObjectIdMap(Table<String, ? extends 
WithParentObjectId> fsTable,
   @SuppressWarnings("checkstyle:ParameterNumber")
   Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
                             OmSnapshot toSnapshot,
-                            List<String> tablesToLookUp,
+                            Set<String> tablesToLookUp,
                             SnapshotInfo fsInfo,
                             SnapshotInfo tsInfo,
                             boolean useFullDiff,
-                            Map<String, String> tablePrefixes,
+                            TablePrefixInfo tablePrefixInfo,
                             String diffDir, String jobKey)
       throws IOException {
     // TODO: [SNAPSHOT] Refactor the parameter list
@@ -1193,7 +1185,7 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
       recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK);
       LOG.debug("Calling RocksDBCheckpointDiffer");
       try {
-        deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, 
diffDir).map(HashSet::new);
+        deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, 
tablesToLookUp, diffDir).map(HashSet::new);
       } catch (Exception exception) {
         recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF);
         LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " 
+
@@ -1214,7 +1206,7 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
       ManagedRocksDB toDB = 
((RDBStore)toSnapshot.getMetadataManager().getStore())
           .getDb().getManagedRocksDb();
       Set<String> diffFiles = getDiffFiles(fromSnapshot, toSnapshot, 
tablesToLookUp);
-      RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixes, fromDB, 
toDB);
+      RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixInfo, 
tablesToLookUp, fromDB, toDB);
       deltaFiles = Optional.of(diffFiles);
     }
 
@@ -1223,7 +1215,7 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
             toSnapshot.getSnapshotTableKey()));
   }
 
-  private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot 
toSnapshot, List<String> tablesToLookUp) {
+  private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot 
toSnapshot, Set<String> tablesToLookUp) {
     Set<String> diffFiles;
     try {
       Map<Object, String> fromSnapshotFiles = 
getSSTFileMapForSnapshot(fromSnapshot, tablesToLookUp);
@@ -1303,7 +1295,7 @@ long generateDiffReport(
       final boolean isFSOBucket,
       final Optional<Map<Long, Path>> oldParentIdPathMap,
       final Optional<Map<Long, Path>> newParentIdPathMap,
-      final Map<String, String> tablePrefix) {
+      final TablePrefixInfo tablePrefix) {
     LOG.info("Starting diff report generation for jobId: {}.", jobId);
     ColumnFamilyHandle deleteDiffColumnFamily = null;
     ColumnFamilyHandle renameDiffColumnFamily = null;
@@ -1394,8 +1386,7 @@ long generateDiffReport(
               modifyDiffs.add(codecRegistry.asRawData(entry));
             }
           } else {
-            String keyPrefix = getTablePrefix(tablePrefix,
-                (isDirectoryObject ? fsDirTable : fsTable).getName());
+            String keyPrefix = tablePrefix.getTablePrefix((isDirectoryObject ? 
fsDirTable : fsTable).getName());
             String oldKey = resolveBucketRelativePath(isFSOBucket,
                 oldParentIdPathMap, oldKeyName, true);
             String newKey = resolveBucketRelativePath(isFSOBucket,
@@ -1658,26 +1649,12 @@ private boolean areKeysEqual(WithObjectID oldKey, 
WithObjectID newKey) {
     return false;
   }
 
-  /**
-   * Get table prefix given a tableName.
-   */
-  private String getTablePrefix(Map<String, String> tablePrefixes,
-                                String tableName) {
-    // In case of FSO - either File/Directory table
-    // the key Prefix would be volumeId/bucketId and
-    // in case of non-fso - volumeName/bucketName
-    if (tableName.equals(DIRECTORY_TABLE) || tableName.equals(FILE_TABLE)) {
-      return tablePrefixes.get(DIRECTORY_TABLE);
-    }
-    return tablePrefixes.get(KEY_TABLE);
-  }
-
   /**
    * check if the given key is in the bucket specified by tablePrefix map.
    */
-  boolean isKeyInBucket(String key, Map<String, String> tablePrefixes,
+  boolean isKeyInBucket(String key, TablePrefixInfo tablePrefixInfo,
                         String tableName) {
-    return key.startsWith(getTablePrefix(tablePrefixes, tableName));
+    return key.startsWith(tablePrefixInfo.getTablePrefix(tableName));
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index 63e7e38d518..5897f4ae891 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -18,18 +18,13 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
-import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
-import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TIMEOUT;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
@@ -216,26 +211,6 @@ public static UUID getPreviousSnapshotId(SnapshotInfo 
snapInfo, SnapshotChainMan
     return null;
   }
 
-  /**
-   * Return a map column family to prefix for the keys in the table for
-   * the given volume and bucket.
-   * Column families, map is returned for, are keyTable, dirTable and 
fileTable.
-   */
-  public static Map<String, String> getColumnFamilyToKeyPrefixMap(
-      OMMetadataManager omMetadataManager,
-      String volumeName,
-      String bucketName
-  ) throws IOException {
-    String keyPrefix = omMetadataManager.getBucketKeyPrefix(volumeName, 
bucketName);
-    String keyPrefixFso = omMetadataManager.getBucketKeyPrefixFSO(volumeName, 
bucketName);
-
-    Map<String, String> columnFamilyToPrefixMap = new HashMap<>();
-    columnFamilyToPrefixMap.put(KEY_TABLE, keyPrefix);
-    columnFamilyToPrefixMap.put(DIRECTORY_TABLE, keyPrefixFso);
-    columnFamilyToPrefixMap.put(FILE_TABLE, keyPrefixFso);
-    return columnFamilyToPrefixMap;
-  }
-
   /**
    * Returns merged repeatedKeyInfo entry with the existing deleted entry in 
the table.
    * @param snapshotMoveKeyInfos keyInfos to be added.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index d021cc75250..52cd9fb15ca 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -17,9 +17,14 @@
 
 package org.apache.hadoop.ozone.om;
 
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_RENAMED_TABLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -87,7 +92,7 @@ private <V> List<Table.KeyValue<String, V>> mockTableIterator(
         for (int k = 0; k < numberOfKeysPerBucket; k++) {
           String key = String.format("/%s%010d/%s%010d/%s%010d", 
volumeNamePrefix, i, bucketNamePrefix, j,
               keyPrefix, k);
-          V value = valueClass == String.class ? (V) key : 
Mockito.mock(valueClass);
+          V value = valueClass == String.class ? (V) key : mock(valueClass);
           values.put(key, value);
 
           if ((volumeNumberFilter == null || i == volumeNumberFilter) &&
@@ -122,11 +127,12 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, 
int numberOfBucketsPer
     String keyPrefix = "key";
     OzoneConfiguration configuration = new OzoneConfiguration();
     OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class);
-    when(metadataManager.getBucketKeyPrefix(anyString(), 
anyString())).thenAnswer(i ->
-        "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/");
     KeyManagerImpl km = new KeyManagerImpl(null, null, metadataManager, 
configuration, null, null, null);
     Table<String, RepeatedOmKeyInfo> mockedDeletedTable = 
Mockito.mock(Table.class);
+    when(mockedDeletedTable.getName()).thenReturn(DELETED_TABLE);
     when(metadataManager.getDeletedTable()).thenReturn(mockedDeletedTable);
+    when(metadataManager.getTableBucketPrefix(eq(DELETED_TABLE), anyString(), 
anyString()))
+        .thenAnswer(i -> "/" + i.getArguments()[1] + "/" + i.getArguments()[2] 
+ "/");
     CheckedFunction<Table.KeyValue<String, RepeatedOmKeyInfo>, Boolean, 
IOException> filter =
         (kv) -> Long.parseLong(kv.getKey().split(keyPrefix)[1]) % 2 == 0;
     List<Table.KeyValue<String, List<OmKeyInfo>>> expectedEntries = 
mockTableIterator(
@@ -166,11 +172,12 @@ public void testGetRenameKeyEntries(int numberOfVolumes, 
int numberOfBucketsPerV
     String keyPrefix = "";
     OzoneConfiguration configuration = new OzoneConfiguration();
     OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class);
-    when(metadataManager.getBucketKeyPrefix(anyString(), 
anyString())).thenAnswer(i ->
-        "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/");
     KeyManagerImpl km = new KeyManagerImpl(null, null, metadataManager, 
configuration, null, null, null);
     Table<String, String> mockedRenameTable = Mockito.mock(Table.class);
+    when(mockedRenameTable.getName()).thenReturn(SNAPSHOT_RENAMED_TABLE);
     
when(metadataManager.getSnapshotRenamedTable()).thenReturn(mockedRenameTable);
+    when(metadataManager.getTableBucketPrefix(eq(SNAPSHOT_RENAMED_TABLE), 
anyString(), anyString()))
+        .thenAnswer(i -> "/" + i.getArguments()[1] + "/" + i.getArguments()[2] 
+ "/");
     CheckedFunction<Table.KeyValue<String, String>, Boolean, IOException> 
filter =
         (kv) -> Long.parseLong(kv.getKey().split("/")[3]) % 2 == 0;
     List<Table.KeyValue<String, String>> expectedEntries = mockTableIterator(
@@ -204,11 +211,12 @@ public void testGetDeletedDirEntries(int numberOfVolumes, 
int numberOfBucketsPer
     startVolumeNumber = null;
     OzoneConfiguration configuration = new OzoneConfiguration();
     OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class);
-    when(metadataManager.getBucketKeyPrefixFSO(anyString(), 
anyString())).thenAnswer(i ->
-        "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/");
     KeyManagerImpl km = new KeyManagerImpl(null, null, metadataManager, 
configuration, null, null, null);
     Table<String, OmKeyInfo> mockedDeletedDirTable = Mockito.mock(Table.class);
+    when(mockedDeletedDirTable.getName()).thenReturn(DELETED_DIR_TABLE);
     
when(metadataManager.getDeletedDirTable()).thenReturn(mockedDeletedDirTable);
+    when(metadataManager.getTableBucketPrefix(eq(DELETED_DIR_TABLE), 
anyString(), anyString()))
+        .thenAnswer(i -> "/" + i.getArguments()[1] + "/" + i.getArguments()[2] 
+ "/");
     List<Table.KeyValue<String, OmKeyInfo>> expectedEntries = 
mockTableIterator(
         OmKeyInfo.class, mockedDeletedDirTable, numberOfVolumes, 
numberOfBucketsPerVolume, numberOfKeysPerBucket,
         volumeNamePrefix, bucketNamePrefix, keyPrefix, volumeNumber, 
bucketNumber, startVolumeNumber, startBucketNumber,
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 4ab921752ce..3dcddaeeafa 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -90,6 +90,7 @@
 import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
 import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.SstFilteringService;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -151,6 +152,7 @@ class TestKeyDeletingService extends OzoneTestBase {
   private OMMetadataManager metadataManager;
   private KeyDeletingService keyDeletingService;
   private DirectoryDeletingService directoryDeletingService;
+  private SstFilteringService sstFilteringService;
   private ScmBlockLocationTestingClient scmBlockTestingClient;
   private DeletingServiceMetrics metrics;
 
@@ -184,7 +186,7 @@ private void createConfig(File testDir, int delintervalMs) {
   private void createSubject() throws Exception {
     OmTestManagers omTestManagers = new OmTestManagers(conf, 
scmBlockTestingClient, null);
     keyManager = omTestManagers.getKeyManager();
-
+    sstFilteringService = keyManager.getSnapshotSstFilteringService();
     keyDeletingService = keyManager.getDeletingService();
     directoryDeletingService = keyManager.getDirDeletingService();
     writeClient = omTestManagers.getWriteClient();
@@ -559,6 +561,7 @@ void testSnapshotDeepClean() throws Exception {
           om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
 
       // Suspend KeyDeletingService
+      sstFilteringService.pause();
       keyDeletingService.suspend();
       directoryDeletingService.suspend();
 
@@ -627,6 +630,7 @@ void testSnapshotDeepClean() throws Exception {
         assertTableRowCount(deletedTable, initialDeletedCount, 
metadataManager);
         checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, true);
       }
+      sstFilteringService.resume();
     }
 
     @Test
@@ -804,7 +808,9 @@ void setup(@TempDir File testDir) throws Exception {
 
     @AfterEach
     void resume() {
+      directoryDeletingService.resume();
       keyDeletingService.resume();
+      sstFilteringService.resume();
     }
 
     @AfterAll
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index ec896cb3dda..840ef6eaeb8 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -61,7 +61,6 @@
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyDouble;
 import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyList;
 import static org.mockito.Mockito.anyMap;
 import static org.mockito.Mockito.anySet;
 import static org.mockito.Mockito.anyString;
@@ -117,6 +116,7 @@
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
@@ -434,6 +434,7 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) 
throws IOException {
     when(differ.getSSTDiffListWithFullPath(
         any(DifferSnapshotInfo.class),
         any(DifferSnapshotInfo.class),
+        anySet(),
         eq(diffDir))
     ).thenReturn(Optional.of(Lists.newArrayList(randomStrings)));
 
@@ -452,16 +453,17 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) 
throws IOException {
              Mockito.CALLS_REAL_METHODS)) {
       mockedRdbUtil.when(() -> RdbUtil.getSSTFilesForComparison(any(), any()))
           
.thenReturn(Collections.singleton(RandomStringUtils.secure().nextAlphabetic(10)));
-      mockedRocksDiffUtils.when(() -> 
RocksDiffUtils.filterRelevantSstFiles(any(), any())).thenAnswer(i -> null);
+      mockedRocksDiffUtils.when(() -> 
RocksDiffUtils.filterRelevantSstFiles(any(), any(), anySet()))
+          .thenAnswer(i -> null);
       SnapshotDiffManager spy = spy(snapshotDiffManager);
       doNothing().when(spy).recordActivity(any(), any());
       doNothing().when(spy).updateProgress(anyString(), anyDouble());
       Set<String> deltaFiles = spy.getDeltaFiles(
           fromSnapshot,
           toSnapshot,
-          Arrays.asList("cf1", "cf2"), fromSnapshotInfo,
+          Sets.newHashSet("cf1", "cf2"), fromSnapshotInfo,
           toSnapshotInfo, false,
-          Collections.emptyMap(), diffDir, diffJobKey);
+          new TablePrefixInfo(Collections.emptyMap()), diffDir, diffJobKey);
       assertEquals(randomStrings, deltaFiles);
     }
     rcFromSnapshot.close();
@@ -481,7 +483,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles,
       Set<String> deltaStrings = new HashSet<>();
 
       mockedRdbUtil.when(
-              () -> RdbUtil.getSSTFilesForComparison(any(), anyList()))
+              () -> RdbUtil.getSSTFilesForComparison(any(), anySet()))
           .thenAnswer((Answer<Set<String>>) invocation -> {
             Set<String> retVal = IntStream.range(0, numberOfFiles)
                 .mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10))
@@ -491,7 +493,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles,
           });
 
       mockedRocksDiffUtils.when(() ->
-              RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap(), 
anyMap(), any(ManagedRocksDB.class),
+              RocksDiffUtils.filterRelevantSstFiles(anySet(), any(), anyMap(), 
anySet(), any(ManagedRocksDB.class),
                   any(ManagedRocksDB.class)))
           .thenAnswer((Answer<Void>) invocationOnMock -> {
             invocationOnMock.getArgument(0, Set.class).stream()
@@ -513,6 +515,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles,
         when(differ.getSSTDiffListWithFullPath(
             any(DifferSnapshotInfo.class),
             any(DifferSnapshotInfo.class),
+            anySet(),
             anyString()))
             .thenReturn(Optional.ofNullable(Collections.emptyList()));
       }
@@ -533,11 +536,11 @@ public void testGetDeltaFilesWithFullDiff(int 
numberOfFiles,
       Set<String> deltaFiles = spy.getDeltaFiles(
           fromSnapshot,
           toSnapshot,
-          Arrays.asList("cf1", "cf2"),
+          Sets.newHashSet("cf1", "cf2"),
           fromSnapshotInfo,
           toSnapshotInfo,
           false,
-          Collections.emptyMap(),
+          new TablePrefixInfo(Collections.emptyMap()),
           snapDiffDir.getAbsolutePath(), diffJobKey);
       assertEquals(deltaStrings, deltaFiles);
     }
@@ -553,7 +556,7 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
       Set<String> deltaStrings = new HashSet<>();
 
       mockedRdbUtil.when(
-              () -> RdbUtil.getSSTFilesForComparison(any(), anyList()))
+              () -> RdbUtil.getSSTFilesForComparison(any(), anySet()))
           .thenAnswer((Answer<Set<String>>) invocation -> {
             Set<String> retVal = IntStream.range(0, numberOfFiles)
                 .mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10))
@@ -563,7 +566,7 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
           });
 
       mockedRocksDiffUtils.when(() ->
-              RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap(), 
anyMap(), any(ManagedRocksDB.class),
+              RocksDiffUtils.filterRelevantSstFiles(anySet(), any(), anyMap(), 
anySet(), any(ManagedRocksDB.class),
                   any(ManagedRocksDB.class)))
           .thenAnswer((Answer<Void>) invocationOnMock -> {
             invocationOnMock.getArgument(0, Set.class).stream()
@@ -586,6 +589,7 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
           .getSSTDiffListWithFullPath(
               any(DifferSnapshotInfo.class),
               any(DifferSnapshotInfo.class),
+              anySet(),
               anyString());
 
       UncheckedAutoCloseableSupplier<OmSnapshot> rcFromSnapshot =
@@ -605,11 +609,11 @@ public void testGetDeltaFilesWithDifferThrowException(int 
numberOfFiles)
       Set<String> deltaFiles = spy.getDeltaFiles(
           fromSnapshot,
           toSnapshot,
-          Arrays.asList("cf1", "cf2"),
+          Sets.newHashSet("cf1", "cf2"),
           fromSnapshotInfo,
           toSnapshotInfo,
           false,
-          Collections.emptyMap(),
+          new TablePrefixInfo(Collections.emptyMap()),
           snapDiffDir.getAbsolutePath(), diffJobKey);
       assertEquals(deltaStrings, deltaFiles);
 
@@ -702,7 +706,7 @@ public void testObjectIdMapWithTombstoneEntries(boolean 
nativeLibraryLoaded,
             String keyName = split[split.length - 1];
             return Integer.parseInt(keyName.substring(3)) % 2 == 0;
           }
-      ).when(spy).isKeyInBucket(anyString(), anyMap(), anyString());
+      ).when(spy).isKeyInBucket(anyString(), any(), anyString());
       assertFalse(isKeyInBucket);
 
       PersistentMap<byte[], byte[]> oldObjectIdKeyMap =
@@ -720,7 +724,7 @@ public void testObjectIdMapWithTombstoneEntries(boolean 
nativeLibraryLoaded,
           nativeLibraryLoaded, oldObjectIdKeyMap, newObjectIdKeyMap,
           objectIdsToCheck, Optional.of(oldParentIds),
           Optional.of(newParentIds),
-          ImmutableMap.of(DIRECTORY_TABLE, "", KEY_TABLE, "", FILE_TABLE, ""), 
"");
+          new TablePrefixInfo(ImmutableMap.of(DIRECTORY_TABLE, "", KEY_TABLE, 
"", FILE_TABLE, "")), "");
 
       try (ClosableIterator<Map.Entry<byte[], byte[]>> oldObjectIdIter =
                oldObjectIdKeyMap.iterator()) {
@@ -854,8 +858,7 @@ public void testGenerateDiffReport() throws IOException {
         return keyInfo;
       });
       when(fromSnapTable.getName()).thenReturn("table");
-      Map<String, String> tablePrefixes = mock(Map.class);
-      when(tablePrefixes.get(anyString())).thenReturn("");
+      TablePrefixInfo tablePrefixes = new 
TablePrefixInfo(Collections.emptyMap());
       SnapshotDiffManager spy = spy(snapshotDiffManager);
       doReturn(true).when(spy)
           .areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapName,
@@ -1248,7 +1251,7 @@ public void testGenerateDiffReportWhenThereInEntry() {
         false,
         Optional.empty(),
         Optional.empty(),
-        Collections.emptyMap());
+        new TablePrefixInfo(Collections.emptyMap()));
 
     assertEquals(0, totalDiffEntries);
   }
@@ -1290,7 +1293,7 @@ public void testGenerateDiffReportFailure() throws 
IOException {
             false,
             Optional.empty(),
             Optional.empty(),
-            Collections.emptyMap())
+            new TablePrefixInfo(Collections.emptyMap()))
     );
     assertEquals("Old and new key name both are null",
         exception.getMessage());
@@ -1558,12 +1561,12 @@ public void testGetDeltaFilesWithFullDiff() throws  
IOException {
       }
       return null;
     }).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class),
-        Mockito.anyList());
+        Mockito.anySet());
     doNothing().when(spy).recordActivity(any(), any());
     doNothing().when(spy).updateProgress(anyString(), anyDouble());
     String diffJobKey = snap1 + DELIMITER + snap2;
-    Set<String> deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, 
Collections.emptyList(), snapshotInfo,
-        snapshotInfo, true, Collections.emptyMap(), null, diffJobKey);
+    Set<String> deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, 
Collections.emptySet(), snapshotInfo,
+        snapshotInfo, true, new TablePrefixInfo(Collections.emptyMap()), null, 
diffJobKey);
     Assertions.assertEquals(Sets.newHashSet("3.sst", "4.sst"), deltaFiles);
   }
 
@@ -1577,21 +1580,21 @@ public void testGetSnapshotDiffReportHappyCase() throws 
Exception {
     SnapshotDiffManager spy = spy(snapshotDiffManager);
 
     doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class),
-        any(OmSnapshot.class), anyList(), eq(fromSnapInfo), eq(toSnapInfo),
-        eq(false), anyMap(), anyString(),
+        any(OmSnapshot.class), anySet(), eq(fromSnapInfo), eq(toSnapInfo),
+        eq(false), any(), anyString(),
         anyString());
 
     doReturn(testDeltaFiles).when(spy)
-        .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList());
+        .getSSTFileListForSnapshot(any(OmSnapshot.class), anySet());
 
     doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable),
-        any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap(), 
anyString());
+        any(), anyBoolean(), any(), any(), any(), any(), any(), any(), 
anyString());
     doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt());
 
     doReturn(10L).when(spy).generateDiffReport(anyString(),
         any(), any(), any(), any(), any(), any(), any(),
         anyString(), anyString(), anyString(), anyString(), anyBoolean(),
-        any(), any(), anyMap());
+        any(), any(), any());
     doReturn(LEGACY).when(spy).getBucketLayout(VOLUME_NAME, BUCKET_NAME,
         omMetadataManager);
 
diff --git 
a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/repair/ldb/TestLdbRepair.java
 
b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/repair/ldb/TestLdbRepair.java
index ca0cad1200d..2553af6d974 100644
--- 
a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/repair/ldb/TestLdbRepair.java
+++ 
b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/repair/ldb/TestLdbRepair.java
@@ -149,7 +149,7 @@ public void testRocksDBManualCompaction() throws Exception {
     List<ColumnFamilyDescriptor> cfDescList = 
RocksDBUtils.getColumnFamilyDescriptors(dbPath.toString());
     try (ManagedRocksDB db = ManagedRocksDB.openReadOnly(dbPath.toString(), 
cfDescList, cfHandleList)) {
       List<LiveFileMetaData> liveFileMetaDataList = RdbUtil
-          .getLiveSSTFilesForCFs(db, Collections.singletonList(TEST_CF_NAME));
+          .getLiveSSTFilesForCFs(db, Collections.singleton(TEST_CF_NAME));
       for (LiveFileMetaData liveMetadata : liveFileMetaDataList) {
         assertEquals(0, liveMetadata.numDeletions(),
             "Tombstones found in file: " + liveMetadata.fileName());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to