This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 322ca93b4a HDDS-13025. Refactor KeyDeletingService to use 
ReclaimableKeyFilter (#8450)
322ca93b4a is described below

commit 322ca93b4a6358b5dda6ed2f62abb64274ad3975
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri May 23 22:18:14 2025 -0400

    HDDS-13025. Refactor KeyDeletingService to use ReclaimableKeyFilter (#8450)
---
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   5 +
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   1 +
 .../org/apache/hadoop/ozone/om/TestKeyPurging.java |   2 +-
 ...TestSnapshotDeletingServiceIntegrationTest.java |   8 +-
 .../TestSnapshotDirectoryCleaningService.java      |   1 -
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  40 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  79 +++-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     | 156 ------
 .../hadoop/ozone/om/PendingKeysDeletion.java       |  18 +-
 .../om/service/AbstractKeyDeletingService.java     |  51 +-
 .../ozone/om/service/KeyDeletingService.java       | 523 +++++++--------------
 .../hadoop/ozone/om/snapshot/SnapshotUtils.java    |   5 +
 .../ozone/om/service/TestKeyDeletingService.java   |  55 ++-
 13 files changed, 378 insertions(+), 566 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index d9b08494f0..43a0895142 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -399,8 +399,13 @@ public final class OMConfigKeys {
   public static final String OZONE_THREAD_NUMBER_DIR_DELETION =
       "ozone.thread.number.dir.deletion";
 
+  public static final String OZONE_THREAD_NUMBER_KEY_DELETION =
+      "ozone.thread.number.key.deletion";
+
   public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10;
 
+  public static final int OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT = 10;
+
   public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
       "ozone.snapshot.filtering.limit.per.task";
   public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 57caaa45b2..c699a6f6fa 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -123,6 +123,7 @@ private void addPropertiesNotInXml() {
         OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
         OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
         OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION,
+        OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION,
         ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
         ScmConfigKeys.OZONE_SCM_HA_PREFIX,
         S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index 17d4d40a09..fa59754b67 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -126,7 +126,7 @@ public void testKeysPurgingByKeyDeletingService() throws 
Exception {
     GenericTestUtils.waitFor(
         () -> {
           try {
-            return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+            return keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE)
                 .getKeyBlocksList().isEmpty();
           } catch (IOException e) {
             return false;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
index 7e8befa6a1..1662904853 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
@@ -25,6 +25,7 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -127,6 +128,7 @@ public void setup() throws Exception {
     conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT,
         10000, TimeUnit.MILLISECONDS);
+    conf.setInt(OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL, 500);
     conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 500);
     conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
         TimeUnit.MILLISECONDS);
@@ -493,12 +495,12 @@ private KeyDeletingService 
getMockedKeyDeletingService(AtomicBoolean keyDeletion
     KeyManager keyManager = Mockito.spy(om.getKeyManager());
     when(ozoneManager.getKeyManager()).thenReturn(keyManager);
     KeyDeletingService keyDeletingService = Mockito.spy(new 
KeyDeletingService(ozoneManager,
-        ozoneManager.getScmClient().getBlockClient(), keyManager, 10000,
-        100000, cluster.getConf(), false));
+        ozoneManager.getScmClient().getBlockClient(), 10000,
+        100000, cluster.getConf(), 10, false));
     keyDeletingService.shutdown();
     GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 
1000,
         100000);
-    when(keyManager.getPendingDeletionKeys(anyInt())).thenAnswer(i -> {
+    when(keyManager.getPendingDeletionKeys(any(), anyInt())).thenAnswer(i -> {
       // wait for SDS to reach the KDS wait block before processing any key.
       GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000);
       keyDeletionStarted.set(true);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
index 84307c5549..8591c6d1e8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
@@ -240,7 +240,6 @@ public void testExclusiveSizeWithDirectoryDeepClean() 
throws Exception {
         Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
         String snapshotName = snapshotEntry.getValue().getName();
         SnapshotInfo snapshotInfo = 
snapshotInfoTable.get(snapshotEntry.getKey());
-        System.out.println(snapshotInfo.getName() + " " + 
snapshotInfo.getDeepCleanedDeletedDir());
         assertEquals(expectedSize.get(snapshotName),
             snapshotInfo.getExclusiveSize() + 
snapshotInfo.getExclusiveSizeDeltaFromDirDeepCleaning());
         // Since for the test we are using RATIS/THREE
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index fa3e622313..61f46634ec 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -26,7 +26,6 @@
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -113,17 +112,38 @@ ListKeysResult listKeys(String volumeName, String 
bucketName, String startKey,
       throws IOException;
 
   /**
-   * Returns a PendingKeysDeletion. It has a list of pending deletion key info
-   * that ups to the given count.Each entry is a {@link BlockGroup}, which
-   * contains the info about the key name and all its associated block IDs.
-   * Second is a Mapping of Key-Value pair which is updated in the 
deletedTable.
+   * Retrieves pending deletion keys that match a given filter function.
    *
-   * @param count max number of keys to return.
-   * @return a Pair of list of {@link BlockGroup} representing keys and blocks,
-   * and a hashmap for key-value pair to be updated in the deletedTable.
-   * @throws IOException
+   * @param filter a functional interface specifying the filter condition to 
apply
+   *               to the keys. It takes a KeyValue pair containing a string 
key and
+   *               an OmKeyInfo object, and returns a boolean value indicating 
whether
+   *               the key meets the filter criteria.
+   * @param count  the maximum number of keys to retrieve.
+   * @return a PendingKeysDeletion object containing the keys that satisfy the 
filter
+   *         criteria, up to the specified count.
+   * @throws IOException if an I/O error occurs while fetching the keys.
    */
-  PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException;
+  PendingKeysDeletion getPendingDeletionKeys(
+      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, int count)
+      throws IOException;
+
+  /**
+   * Retrieves the keys that are pending deletion in a specified bucket and 
volume.
+   *
+   * @param volume the name of the volume that contains the bucket.
+   * @param bucket the name of the bucket within the volume where keys are 
located.
+   * @param startKey the key from which to start retrieving pending deletions.
+   * @param filter a filter function to determine which keys should be included
+   *               in the pending deletion list.
+   * @param count the maximum number of keys to retrieve that are pending 
deletion.
+   * @return a PendingKeysDeletion object containing the list of keys
+   *         pending deletion based on the specified parameters.
+   * @throws IOException if an I/O error occurs during the operation.
+   */
+  PendingKeysDeletion getPendingDeletionKeys(
+      String volume, String bucket, String startKey,
+      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, int count)
+      throws IOException;
 
   /**
    * Returns a list rename entries from the snapshotRenamedTable.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 6a493c1f31..a29e8fdfad 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -66,6 +66,8 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT;
 import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -113,6 +115,7 @@
 import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -136,6 +139,7 @@
 import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -172,6 +176,7 @@
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Lists;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.function.CheckedFunction;
@@ -254,9 +259,15 @@ public void start(OzoneConfiguration configuration) {
           OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
           OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
           TimeUnit.MILLISECONDS);
+      int keyDeletingServiceCorePoolSize =
+          configuration.getInt(OZONE_THREAD_NUMBER_KEY_DELETION,
+              OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT);
+      if (keyDeletingServiceCorePoolSize <= 0) {
+        keyDeletingServiceCorePoolSize = 1;
+      }
       keyDeletingService = new KeyDeletingService(ozoneManager,
-          scmClient.getBlockClient(), this, blockDeleteInterval,
-          serviceTimeout, configuration, isSnapshotDeepCleaningEnabled);
+          scmClient.getBlockClient(), blockDeleteInterval,
+          serviceTimeout, configuration, keyDeletingServiceCorePoolSize, 
isSnapshotDeepCleaningEnabled);
       keyDeletingService.start();
     }
 
@@ -722,12 +733,66 @@ public ListKeysResult listKeys(String volumeName, String 
bucketName,
   }
 
   @Override
-  public PendingKeysDeletion getPendingDeletionKeys(final int count)
+  public PendingKeysDeletion getPendingDeletionKeys(
+      final CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, 
IOException> filter, final int count)
       throws IOException {
-    OmMetadataManagerImpl omMetadataManager =
-        (OmMetadataManagerImpl) metadataManager;
-    return omMetadataManager
-        .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
+    return getPendingDeletionKeys(null, null, null, filter, count);
+  }
+
+  @Override
+  public PendingKeysDeletion getPendingDeletionKeys(
+      String volume, String bucket, String startKey,
+      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter,
+      int count) throws IOException {
+    List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    Map<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
+    // Bucket prefix would be empty if volume is empty i.e. either null or "".
+    Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
RepeatedOmKeyInfo>>
+             delKeyIter = 
metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) {
+
+      /* Seeking to the start key if it not null. The next key picked up would 
be ensured to start with the bucket
+         prefix, {@link 
org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this.
+       */
+      if (startKey != null) {
+        delKeyIter.seek(startKey);
+      }
+      int currentCount = 0;
+      while (delKeyIter.hasNext() && currentCount < count) {
+        RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
+        Table.KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
+        if (kv != null) {
+          List<BlockGroup> blockGroupList = Lists.newArrayList();
+          // Multiple keys with the same path can be queued in one DB entry
+          RepeatedOmKeyInfo infoList = kv.getValue();
+          for (OmKeyInfo info : infoList.getOmKeyInfoList()) {
+
+            // Skip the key if the filter doesn't allow the file to be deleted.
+            if (filter == null || filter.apply(Table.newKeyValue(kv.getKey(), 
info))) {
+              List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
+                  .flatMap(versionLocations -> 
versionLocations.getLocationList().stream()
+                      .map(b -> new BlockID(b.getContainerID(), 
b.getLocalID()))).collect(Collectors.toList());
+              BlockGroup keyBlocks = 
BlockGroup.newBuilder().setKeyName(kv.getKey())
+                  .addAllBlockIDs(blockIDS).build();
+              blockGroupList.add(keyBlocks);
+              currentCount++;
+            } else {
+              notReclaimableKeyInfo.addOmKeyInfo(info);
+            }
+          }
+
+          List<OmKeyInfo> notReclaimableKeyInfoList = 
notReclaimableKeyInfo.getOmKeyInfoList();
+
+          // If all the versions are not reclaimable, then modify key by just 
purging the key that can be purged.
+          if (!notReclaimableKeyInfoList.isEmpty() &&
+              notReclaimableKeyInfoList.size() != 
infoList.getOmKeyInfoList().size()) {
+            keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
+          }
+          keyBlocksList.addAll(blockGroupList);
+        }
+      }
+    }
+    return new PendingKeysDeletion(keyBlocksList, keysToModify);
   }
 
   private <V, R> List<Table.KeyValue<String, R>> getTableEntries(String 
startKey,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 173f18313c..e1f50b1922 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -34,7 +34,6 @@
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotDirExist;
-import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.isBlockLocationInfoSame;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -55,7 +54,6 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -123,7 +121,6 @@
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.compaction.log.CompactionLogEntry;
 import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1311,159 +1308,6 @@ private PersistedUserVolumeInfo getVolumesByUser(String 
userNameKey)
     }
   }
 
-  /**
-   * Returns a list of pending deletion key info up to the limit.
-   * Each entry is a {@link BlockGroup}, which contains the info about the key
-   * name and all its associated block IDs.
-   *
-   * @param keyCount max number of keys to return.
-   * @param omSnapshotManager SnapshotManager
-   * @return a list of {@link BlockGroup} represent keys and blocks.
-   * @throws IOException
-   */
-  public PendingKeysDeletion getPendingDeletionKeys(final int keyCount,
-                             OmSnapshotManager omSnapshotManager)
-      throws IOException {
-    List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
-    try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
-             keyIter = getDeletedTable().iterator()) {
-      int currentCount = 0;
-      while (keyIter.hasNext() && currentCount < keyCount) {
-        RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
-        KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
-        if (kv != null) {
-          List<BlockGroup> blockGroupList = Lists.newArrayList();
-          // Get volume name and bucket name
-          String[] keySplit = kv.getKey().split(OM_KEY_PREFIX);
-          String bucketKey = getBucketKey(keySplit[1], keySplit[2]);
-          OmBucketInfo bucketInfo = getBucketTable().get(bucketKey);
-          // If Bucket deleted bucketInfo would be null, thus making previous 
snapshot also null.
-          SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null :
-              SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(),
-              bucketInfo.getBucketName(), ozoneManager, snapshotChainManager);
-          // previous snapshot is not active or it has not been flushed to 
disk then don't process the key in this
-          // iteration.
-          if (previousSnapshotInfo != null &&
-              (previousSnapshotInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
-              
!OmSnapshotManager.areSnapshotChangesFlushedToDB(ozoneManager.getMetadataManager(),
-                  previousSnapshotInfo))) {
-            continue;
-          }
-          // Get the latest snapshot in snapshot path.
-          try (UncheckedAutoCloseableSupplier<OmSnapshot> rcLatestSnapshot = 
previousSnapshotInfo == null ? null :
-              
omSnapshotManager.getSnapshot(previousSnapshotInfo.getVolumeName(),
-                  previousSnapshotInfo.getBucketName(), 
previousSnapshotInfo.getName())) {
-
-            // Multiple keys with the same path can be queued in one DB entry
-            RepeatedOmKeyInfo infoList = kv.getValue();
-            for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) {
-              // Skip the key if it exists in the previous snapshot (of the 
same
-              // scope) as in this case its blocks should not be reclaimed
-
-              // If the last snapshot is deleted and the keys renamed in 
between
-              // the snapshots will be cleaned up by KDS. So we need to check
-              // in the renamedTable as well.
-              String dbRenameKey = getRenameKey(info.getVolumeName(),
-                  info.getBucketName(), info.getObjectID());
-
-              if (rcLatestSnapshot != null) {
-                Table<String, OmKeyInfo> prevKeyTable =
-                    rcLatestSnapshot.get()
-                        .getMetadataManager()
-                        .getKeyTable(bucketInfo.getBucketLayout());
-
-                Table<String, RepeatedOmKeyInfo> prevDeletedTable =
-                    
rcLatestSnapshot.get().getMetadataManager().getDeletedTable();
-                String prevKeyTableDBKey = getSnapshotRenamedTable()
-                    .get(dbRenameKey);
-                String prevDelTableDBKey = getOzoneKey(info.getVolumeName(),
-                    info.getBucketName(), info.getKeyName());
-                // format: /volName/bucketName/keyName/objId
-                prevDelTableDBKey = getOzoneDeletePathKey(info.getObjectID(),
-                    prevDelTableDBKey);
-
-                if (prevKeyTableDBKey == null &&
-                    bucketInfo.getBucketLayout().isFileSystemOptimized()) {
-                  long volumeId = getVolumeId(info.getVolumeName());
-                  prevKeyTableDBKey = getOzonePathKey(volumeId,
-                      bucketInfo.getObjectID(),
-                      info.getParentObjectID(),
-                      info.getFileName());
-                } else if (prevKeyTableDBKey == null) {
-                  prevKeyTableDBKey = getOzoneKey(info.getVolumeName(),
-                      info.getBucketName(),
-                      info.getKeyName());
-                }
-
-                OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey);
-                // When key is deleted it is no longer in keyTable, we also
-                // have to check deletedTable of previous snapshot
-                RepeatedOmKeyInfo delOmKeyInfo =
-                    prevDeletedTable.get(prevDelTableDBKey);
-                if (versionExistsInPreviousSnapshot(omKeyInfo,
-                    info, delOmKeyInfo)) {
-                  // If the infoList size is 1, there is nothing to split.
-                  // We either delete it or skip it.
-                  if (!(infoList.getOmKeyInfoList().size() == 1)) {
-                    notReclaimableKeyInfo.addOmKeyInfo(info);
-                  }
-                  continue;
-                }
-              }
-
-              // Add all blocks from all versions of the key to the deletion
-              // list
-              for (OmKeyLocationInfoGroup keyLocations :
-                  info.getKeyLocationVersions()) {
-                List<BlockID> item = keyLocations.getLocationList().stream()
-                    .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
-                    .collect(Collectors.toList());
-                BlockGroup keyBlocks = BlockGroup.newBuilder()
-                    .setKeyName(kv.getKey())
-                    .addAllBlockIDs(item)
-                    .build();
-                blockGroupList.add(keyBlocks);
-              }
-              currentCount++;
-            }
-
-            List<OmKeyInfo> notReclaimableKeyInfoList =
-                notReclaimableKeyInfo.getOmKeyInfoList();
-            // If Bucket deleted bucketInfo would be null, thus making 
previous snapshot also null.
-            SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null :
-                SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(),
-                bucketInfo.getBucketName(), ozoneManager, 
snapshotChainManager);
-            // Check if the previous snapshot in the chain hasn't changed.
-            if 
(Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId),
-                
Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) {
-              // If all the versions are not reclaimable, then do nothing.
-              if (!notReclaimableKeyInfoList.isEmpty() &&
-                  notReclaimableKeyInfoList.size() !=
-                      infoList.getOmKeyInfoList().size()) {
-                keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
-              }
-
-              if (notReclaimableKeyInfoList.size() !=
-                  infoList.getOmKeyInfoList().size()) {
-                keyBlocksList.addAll(blockGroupList);
-              }
-            }
-          }
-        }
-      }
-    }
-    return new PendingKeysDeletion(keyBlocksList, keysToModify);
-  }
-
-  private boolean versionExistsInPreviousSnapshot(OmKeyInfo omKeyInfo,
-      OmKeyInfo info, RepeatedOmKeyInfo delOmKeyInfo) {
-    return (omKeyInfo != null &&
-        info.getObjectID() == omKeyInfo.getObjectID() &&
-        isBlockLocationInfoSame(omKeyInfo, info)) ||
-        delOmKeyInfo != null;
-  }
-
   /**
    * Decide whether the open key is a multipart upload related key.
    * @param openKeyInfo open key related to multipart upload
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
index 7af213f8f1..e1fbdfb107 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
@@ -17,26 +17,34 @@
 
 package org.apache.hadoop.ozone.om;
 
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 
 /**
- * Return class for OMMetadataManager#getPendingDeletionKeys.
+ * Tracks metadata for keys pending deletion and their associated blocks.
+ *
+ * This class maintains:
+ * <ul>
+ *   <li>A list of {@link BlockGroup} entries, where each entry contains
+ *       a key name and its associated block IDs</li>
+ *   <li>A key-value mapping that requires updating after the remaining
+ *       blocks are purged</li>
+ * </ul>
  */
 public class PendingKeysDeletion {
 
-  private HashMap<String, RepeatedOmKeyInfo> keysToModify;
+  private Map<String, RepeatedOmKeyInfo> keysToModify;
   private List<BlockGroup> keyBlocksList;
 
   public PendingKeysDeletion(List<BlockGroup> keyBlocksList,
-       HashMap<String, RepeatedOmKeyInfo> keysToModify) {
+       Map<String, RepeatedOmKeyInfo> keysToModify) {
     this.keysToModify = keysToModify;
     this.keyBlocksList = keyBlocksList;
   }
 
-  public HashMap<String, RepeatedOmKeyInfo> getKeysToModify() {
+  public Map<String, RepeatedOmKeyInfo> getKeysToModify() {
     return keysToModify;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 8b9455b49c..155ea9a37a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -101,12 +101,12 @@ public AbstractKeyDeletingService(String serviceName, 
long interval,
     this.callId = new AtomicLong(0);
   }
 
-  protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
+  protected Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> 
keyBlocksList,
       Map<String, RepeatedOmKeyInfo> keysToModify,
-      String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException 
{
+      String snapTableKey, UUID expectedPreviousSnapshotId) throws 
IOException, InterruptedException {
 
     long startTime = Time.monotonicNow();
-    int delCount = 0;
+    Pair<Integer, Boolean> purgeResult = Pair.of(0, false);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Send {} key(s) to SCM: {}",
           keyBlocksList.size(), keyBlocksList);
@@ -124,15 +124,15 @@ protected int processKeyDeletes(List<BlockGroup> 
keyBlocksList,
         keyBlocksList.size(), Time.monotonicNow() - startTime);
     if (blockDeletionResults != null) {
       long purgeStartTime = Time.monotonicNow();
-      delCount = submitPurgeKeysRequest(blockDeletionResults,
+      purgeResult = submitPurgeKeysRequest(blockDeletionResults,
           keysToModify, snapTableKey, expectedPreviousSnapshotId);
       int limit = 
ozoneManager.getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK,
           OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
       LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. 
Limit per task is {}.",
-          delCount, blockDeletionResults.size(), Time.monotonicNow() - 
purgeStartTime, limit);
+          purgeResult, blockDeletionResults.size(), Time.monotonicNow() - 
purgeStartTime, limit);
     }
     perfMetrics.setKeyDeletingServiceLatencyMs(Time.monotonicNow() - 
startTime);
-    return delCount;
+    return purgeResult;
   }
 
   /**
@@ -141,13 +141,15 @@ protected int processKeyDeletes(List<BlockGroup> 
keyBlocksList,
    * @param results DeleteBlockGroups returned by SCM.
    * @param keysToModify Updated list of RepeatedOmKeyInfo
    */
-  private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
-      Map<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, UUID 
expectedPreviousSnapshotId) {
+  private Pair<Integer, Boolean> 
submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      Map<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, UUID 
expectedPreviousSnapshotId)
+      throws InterruptedException {
     List<String> purgeKeys = new ArrayList<>();
 
     // Put all keys to be purged in a list
     int deletedCount = 0;
     Set<String> failedDeletedKeys = new HashSet<>();
+    boolean purgeSuccess = true;
     for (DeleteBlockGroupResult result : results) {
       String deletedKey = result.getObjectKey();
       if (result.isSuccess()) {
@@ -169,6 +171,7 @@ private int 
submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
       } else {
         // If the block deletion failed, then the deleted keys should also not 
be modified.
         failedDeletedKeys.add(deletedKey);
+        purgeSuccess = false;
       }
     }
 
@@ -219,14 +222,17 @@ private int 
submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
         .build();
 
     // Submit PurgeKeys request to OM
-    try {
-      submitRequest(omRequest);
+    try (BootstrapStateHandler.Lock lock = snapTableKey != null ? 
getBootstrapStateLock().lock() : null) {
+      OzoneManagerProtocolProtos.OMResponse omResponse = 
submitRequest(omRequest);
+      if (omResponse != null) {
+        purgeSuccess = purgeSuccess && omResponse.getSuccess();
+      }
     } catch (ServiceException e) {
       LOG.error("PurgeKey request failed. Will retry at next run.", e);
-      return 0;
+      return Pair.of(0, false);
     }
 
-    return deletedCount;
+    return Pair.of(deletedCount, purgeSuccess);
   }
 
   protected OzoneManagerProtocolProtos.OMResponse submitRequest(OMRequest 
omRequest) throws ServiceException {
@@ -637,4 +643,25 @@ public long getMovedFilesCount() {
   public BootstrapStateHandler.Lock getBootstrapStateLock() {
     return lock;
   }
+
+  /**
+   * Submits SetSnapsnapshotPropertyRequest to OM.
+   * @param setSnapshotPropertyRequests request to be sent to OM
+   */
+  protected void submitSetSnapshotRequests(
+      List<OzoneManagerProtocolProtos.SetSnapshotPropertyRequest> 
setSnapshotPropertyRequests) {
+    if (setSnapshotPropertyRequests.isEmpty()) {
+      return;
+    }
+    OzoneManagerProtocolProtos.OMRequest omRequest = 
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetSnapshotProperty)
+        .addAllSetSnapshotPropertyRequests(setSnapshotPropertyRequests)
+        .setClientId(clientId.toString())
+        .build();
+    try {
+      submitRequest(omRequest);
+    } catch (ServiceException e) {
+      LOG.error("Failed to submit set snapshot property request", e);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index 7ec8ed71e6..faf320ab85 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -19,34 +19,29 @@
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
-import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.DeletingServiceMetrics;
 import org.apache.hadoop.ozone.om.KeyManager;
@@ -56,17 +51,12 @@
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.PendingKeysDeletion;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,46 +71,30 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyDeletingService.class);
 
-  // Use only a single thread for KeyDeletion. Multiple threads would read
-  // from the same table and can send deletion requests for same key multiple
-  // times.
-  private static final int KEY_DELETING_CORE_POOL_SIZE = 1;
-
-  private final KeyManager manager;
   private int keyLimitPerTask;
   private final AtomicLong deletedKeyCount;
   private final AtomicBoolean suspended;
-  private final Map<String, Long> exclusiveSizeMap;
-  private final Map<String, Long> exclusiveReplicatedSizeMap;
-  private final Set<String> completedExclusiveSizeSet;
-  private final Map<String, String> snapshotSeekMap;
-  private AtomicBoolean isRunningOnAOS;
+  private final AtomicBoolean isRunningOnAOS;
   private final boolean deepCleanSnapshots;
   private final SnapshotChainManager snapshotChainManager;
   private DeletingServiceMetrics metrics;
 
   public KeyDeletingService(OzoneManager ozoneManager,
-      ScmBlockLocationProtocol scmClient,
-      KeyManager manager, long serviceInterval,
-      long serviceTimeout, ConfigurationSource conf,
+      ScmBlockLocationProtocol scmClient, long serviceInterval,
+      long serviceTimeout, ConfigurationSource conf, int 
keyDeletionCorePoolSize,
       boolean deepCleanSnapshots) {
     super(KeyDeletingService.class.getSimpleName(), serviceInterval,
-        TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE,
+        TimeUnit.MILLISECONDS, keyDeletionCorePoolSize,
         serviceTimeout, ozoneManager, scmClient);
-    this.manager = manager;
     this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
         OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
     Preconditions.checkArgument(keyLimitPerTask >= 0,
         OZONE_KEY_DELETING_LIMIT_PER_TASK + " cannot be negative.");
     this.deletedKeyCount = new AtomicLong(0);
     this.suspended = new AtomicBoolean(false);
-    this.exclusiveSizeMap = new HashMap<>();
-    this.exclusiveReplicatedSizeMap = new HashMap<>();
-    this.completedExclusiveSizeSet = new HashSet<>();
-    this.snapshotSeekMap = new HashMap<>();
     this.isRunningOnAOS = new AtomicBoolean(false);
     this.deepCleanSnapshots = deepCleanSnapshots;
-    this.snapshotChainManager = 
((OmMetadataManagerImpl)manager.getMetadataManager()).getSnapshotChainManager();
+    this.snapshotChainManager = 
((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
     this.metrics = ozoneManager.getDeletionMetrics();
   }
 
@@ -141,7 +115,20 @@ public boolean isRunningOnAOS() {
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new KeyDeletingTask(this));
+    queue.add(new KeyDeletingTask(this, null));
+    if (deepCleanSnapshots) {
+      Iterator<UUID> iterator = null;
+      try {
+        iterator = snapshotChainManager.iterator(true);
+      } catch (IOException e) {
+        LOG.error("Error while initializing snapshot chain iterator.");
+        return queue;
+      }
+      while (iterator.hasNext()) {
+        UUID snapshotId = iterator.next();
+        queue.add(new KeyDeletingTask(this, snapshotId));
+      }
+    }
     return queue;
   }
 
@@ -186,9 +173,114 @@ public void setKeyLimitPerTask(int keyLimitPerTask) {
    */
   private final class KeyDeletingTask implements BackgroundTask {
     private final KeyDeletingService deletingService;
+    private final UUID snapshotId;
 
-    private KeyDeletingTask(KeyDeletingService service) {
+    private KeyDeletingTask(KeyDeletingService service, UUID snapshotId) {
       this.deletingService = service;
+      this.snapshotId = snapshotId;
+    }
+
+    private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest 
getSetSnapshotRequestUpdatingExclusiveSize(
+        Map<UUID, Long> exclusiveSizeMap, Map<UUID, Long> 
exclusiveReplicatedSizeMap, UUID snapshotID) {
+      OzoneManagerProtocolProtos.SnapshotSize snapshotSize = 
OzoneManagerProtocolProtos.SnapshotSize.newBuilder()
+          .setExclusiveSize(
+              exclusiveSizeMap.getOrDefault(snapshotID, 0L))
+          .setExclusiveReplicatedSize(
+              exclusiveReplicatedSizeMap.getOrDefault(
+                  snapshotID, 0L))
+          .build();
+
+      return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+          .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID))
+          .setSnapshotSize(snapshotSize)
+          .build();
+    }
+
+    /**
+     *
+     * @param currentSnapshotInfo if null, deleted directories in AOS should 
be processed.
+     * @param keyManager KeyManager of the underlying store.
+     */
+    private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, 
KeyManager keyManager,
+        int remainNum) throws IOException, InterruptedException {
+      String volume = null, bucket = null, snapshotTableKey = null;
+      if (currentSnapshotInfo != null) {
+        volume = currentSnapshotInfo.getVolumeName();
+        bucket = currentSnapshotInfo.getBucketName();
+        snapshotTableKey = currentSnapshotInfo.getTableKey();
+      }
+
+      boolean successStatus = true;
+      try {
+        // TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in
+        //  snapshot's deletedTable when active DB's deletedTable
+        //  doesn't have enough entries left.
+        //  OM would have to keep track of which snapshot the key is coming
+        //  from if the above would be done inside getPendingDeletionKeys().
+        OmSnapshotManager omSnapshotManager = 
getOzoneManager().getOmSnapshotManager();
+        // This is to avoid race condition b/w purge request and snapshot 
chain update. For AOS taking the global
+        // snapshotId since AOS could process multiple buckets in one 
iteration. While using path
+        // previous snapshotId for a snapshot since it would process only one 
bucket.
+        UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ?
+            snapshotChainManager.getLatestGlobalSnapshotId() :
+            SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, 
snapshotChainManager);
+
+        IOzoneManagerLock lock = 
getOzoneManager().getMetadataManager().getLock();
+
+        // Purge deleted Keys in the deletedTable && rename entries in the 
snapshotRenamedTable which doesn't have a
+        // reference in the previous snapshot.
+        try (ReclaimableKeyFilter reclaimableKeyFilter = new 
ReclaimableKeyFilter(getOzoneManager(),
+            omSnapshotManager, snapshotChainManager, currentSnapshotInfo, 
keyManager, lock)) {
+          // Get pending keys that can be deleted
+          PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null
+              ? keyManager.getPendingDeletionKeys(reclaimableKeyFilter, 
remainNum)
+              : keyManager.getPendingDeletionKeys(volume, bucket, null, 
reclaimableKeyFilter, remainNum);
+          List<BlockGroup> keyBlocksList = 
pendingKeysDeletion.getKeyBlocksList();
+          //submit purge requests if there are renamed entries to be purged or 
keys to be purged.
+          if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
+            // Validating if the previous snapshot is still the same before 
purging the blocks.
+            SnapshotUtils.validatePreviousSnapshotId(currentSnapshotInfo, 
snapshotChainManager,
+                expectedPreviousSnapshotId);
+            Pair<Integer, Boolean> purgeResult = 
processKeyDeletes(keyBlocksList,
+                 pendingKeysDeletion.getKeysToModify(), snapshotTableKey,
+                 expectedPreviousSnapshotId);
+            remainNum -= purgeResult.getKey();
+            successStatus = purgeResult.getValue();
+            metrics.incrNumKeysProcessed(keyBlocksList.size());
+            metrics.incrNumKeysSentForPurge(purgeResult.getKey());
+            if (successStatus) {
+              deletedKeyCount.addAndGet(purgeResult.getKey());
+            }
+          }
+
+          // Checking remainNum is greater than zero and not equal to the 
initial value if there were some keys to
+          // reclaim. This is to check if all keys have been iterated over and 
all the keys necessary have been
+          // reclaimed.
+          if (remainNum > 0 && successStatus) {
+            List<SetSnapshotPropertyRequest> setSnapshotPropertyRequests = new 
ArrayList<>();
+            Map<UUID, Long> exclusiveReplicatedSizeMap = 
reclaimableKeyFilter.getExclusiveReplicatedSizeMap();
+            Map<UUID, Long> exclusiveSizeMap = 
reclaimableKeyFilter.getExclusiveSizeMap();
+            List<UUID> previousPathSnapshotsInChain =
+                Stream.of(exclusiveSizeMap.keySet(), 
exclusiveReplicatedSizeMap.keySet())
+                
.flatMap(Collection::stream).distinct().collect(Collectors.toList());
+            for (UUID snapshot : previousPathSnapshotsInChain) {
+              
setSnapshotPropertyRequests.add(getSetSnapshotRequestUpdatingExclusiveSize(exclusiveSizeMap,
+                  exclusiveReplicatedSizeMap, snapshot));
+            }
+
+            // Updating directory deep clean flag of snapshot.
+            if (currentSnapshotInfo != null) {
+              
setSnapshotPropertyRequests.add(OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+                  .setSnapshotKey(snapshotTableKey)
+                  .setDeepCleanedDeletedKey(true)
+                  .build());
+            }
+            submitSetSnapshotRequests(setSnapshotPropertyRequests);
+          }
+        }
+      } catch (UncheckedIOException e) {
+        throw e.getCause();
+      }
     }
 
     @Override
@@ -202,326 +294,51 @@ public BackgroundTaskResult call() {
       // task.
       if (shouldRun()) {
         final long run = getRunCount().incrementAndGet();
-        LOG.debug("Running KeyDeletingService {}", run);
-        isRunningOnAOS.set(true);
-        int delCount = 0;
-        try {
-          // TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in
-          //  snapshot's deletedTable when active DB's deletedTable
-          //  doesn't have enough entries left.
-          //  OM would have to keep track of which snapshot the key is coming
-          //  from if the above would be done inside getPendingDeletionKeys().
-          // This is to avoid race condition b/w purge request and snapshot 
chain update. For AOS taking the global
-          // snapshotId since AOS could process multiple buckets in one 
iteration.
-          UUID expectedPreviousSnapshotId = 
snapshotChainManager.getLatestGlobalSnapshotId();
-          PendingKeysDeletion pendingKeysDeletion = manager
-              .getPendingDeletionKeys(getKeyLimitPerTask());
-          List<BlockGroup> keyBlocksList = pendingKeysDeletion
-              .getKeyBlocksList();
-          if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
-            delCount = processKeyDeletes(keyBlocksList,
-                pendingKeysDeletion.getKeysToModify(), null, 
expectedPreviousSnapshotId);
-            deletedKeyCount.addAndGet(delCount);
-            metrics.incrNumKeysProcessed(keyBlocksList.size());
-            metrics.incrNumKeysSentForPurge(delCount);
-          }
-        } catch (IOException e) {
-          LOG.error("Error while running delete keys background task. Will " +
-              "retry at next run.", e);
+        if (snapshotId == null) {
+          LOG.debug("Running KeyDeletingService for active object store, {}", 
run);
+          isRunningOnAOS.set(true);
+        } else {
+          LOG.debug("Running KeyDeletingService for snapshot : {}, {}", 
snapshotId, run);
         }
-
+        int remainNum = keyLimitPerTask;
+        OmSnapshotManager omSnapshotManager = 
getOzoneManager().getOmSnapshotManager();
+        SnapshotInfo snapInfo = null;
         try {
-          if (deepCleanSnapshots && delCount < keyLimitPerTask) {
-            processSnapshotDeepClean(delCount);
-          }
-        } catch (Exception e) {
-          LOG.error("Error while running deep clean on snapshots. Will " +
-              "retry at next run.", e);
-        }
-
-      }
-      isRunningOnAOS.set(false);
-      synchronized (deletingService) {
-        this.deletingService.notify();
-      }
-
-      // By design, no one cares about the results of this call back.
-      return EmptyTaskResult.newResult();
-    }
-
-    @SuppressWarnings("checkstyle:MethodLength")
-    private void processSnapshotDeepClean(int delCount)
-        throws IOException {
-      OmSnapshotManager omSnapshotManager =
-          getOzoneManager().getOmSnapshotManager();
-      OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
-          getOzoneManager().getMetadataManager();
-      SnapshotChainManager snapChainManager = metadataManager
-          .getSnapshotChainManager();
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          getOzoneManager().getMetadataManager().getSnapshotInfoTable();
-      List<String> deepCleanedSnapshots = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
-        while (delCount < keyLimitPerTask && iterator.hasNext()) {
-          List<BlockGroup> keysToPurge = new ArrayList<>();
-          HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
-          SnapshotInfo currSnapInfo = 
snapshotInfoTable.get(iterator.next().getKey());
-          // Deep clean only on active snapshot. Deleted Snapshots will be
-          // cleaned up by SnapshotDeletingService.
-          if (currSnapInfo == null || currSnapInfo.getSnapshotStatus() != 
SNAPSHOT_ACTIVE ||
-              currSnapInfo.getDeepClean()) {
-            continue;
-          }
-
-          SnapshotInfo prevSnapInfo = 
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
-              currSnapInfo);
-          if (prevSnapInfo != null &&
-              (prevSnapInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
-                  
!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
-                      prevSnapInfo))) {
-            continue;
-          }
-
-          try (UncheckedAutoCloseableSupplier<OmSnapshot>
-              rcCurrOmSnapshot = omSnapshotManager.getSnapshot(
-                  currSnapInfo.getVolumeName(),
-                  currSnapInfo.getBucketName(),
-                  currSnapInfo.getName())) {
-            OmSnapshot currOmSnapshot = rcCurrOmSnapshot.get();
-
-            Table<String, RepeatedOmKeyInfo> snapDeletedTable =
-                currOmSnapshot.getMetadataManager().getDeletedTable();
-            Table<String, String> snapRenamedTable =
-                currOmSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-            long volumeId = metadataManager.getVolumeId(
-                currSnapInfo.getVolumeName());
-            // Get bucketInfo for the snapshot bucket to get bucket layout.
-            String dbBucketKey = metadataManager.getBucketKey(
-                currSnapInfo.getVolumeName(), currSnapInfo.getBucketName());
-            OmBucketInfo bucketInfo = metadataManager.getBucketTable()
-                .get(dbBucketKey);
-
-            if (bucketInfo == null) {
-              throw new IllegalStateException("Bucket " + "/" + currSnapInfo
-                  .getVolumeName() + "/" + currSnapInfo.getBucketName() +
-                  " is not found. BucketInfo should not be null for" +
-                  " snapshotted bucket. The OM is in unexpected state.");
+          snapInfo = snapshotId == null ? null :
+              SnapshotUtils.getSnapshotInfo(getOzoneManager(), 
snapshotChainManager, snapshotId);
+          if (snapInfo != null) {
+            if 
(!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
 snapInfo)) {
+              LOG.info("Skipping snapshot processing since changes to snapshot 
{} have not been flushed to disk",
+                  snapInfo);
+              return EmptyTaskResult.newResult();
             }
-
-            String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-            SnapshotInfo previousSnapshot = 
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
-                currSnapInfo);
-            SnapshotInfo previousToPrevSnapshot = null;
-
-            if (previousSnapshot != null) {
-              previousToPrevSnapshot = 
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
-                  previousSnapshot);
+            if (!snapInfo.getDeepCleanedDeletedDir()) {
+              LOG.debug("Snapshot {} hasn't done deleted directory deep 
cleaning yet. Skipping the snapshot in this" +
+                  " iteration.", snapInfo);
+              return EmptyTaskResult.newResult();
             }
-
-            Table<String, OmKeyInfo> previousKeyTable = null;
-            Table<String, String> prevRenamedTable = null;
-            UncheckedAutoCloseableSupplier<OmSnapshot> rcPrevOmSnapshot = null;
-
-            // Split RepeatedOmKeyInfo and update current snapshot
-            // deletedKeyTable and next snapshot deletedKeyTable.
-            if (previousSnapshot != null) {
-              rcPrevOmSnapshot = omSnapshotManager.getSnapshot(
-                  previousSnapshot.getVolumeName(),
-                  previousSnapshot.getBucketName(),
-                  previousSnapshot.getName());
-              OmSnapshot omPreviousSnapshot = rcPrevOmSnapshot.get();
-
-              previousKeyTable = omPreviousSnapshot.getMetadataManager()
-                  .getKeyTable(bucketInfo.getBucketLayout());
-              prevRenamedTable = omPreviousSnapshot
-                  .getMetadataManager().getSnapshotRenamedTable();
-            }
-
-            Table<String, OmKeyInfo> previousToPrevKeyTable = null;
-            UncheckedAutoCloseableSupplier<OmSnapshot> rcPrevToPrevOmSnapshot 
= null;
-            if (previousToPrevSnapshot != null) {
-              rcPrevToPrevOmSnapshot = omSnapshotManager.getSnapshot(
-                  previousToPrevSnapshot.getVolumeName(),
-                  previousToPrevSnapshot.getBucketName(),
-                  previousToPrevSnapshot.getName());
-              OmSnapshot omPreviousToPrevSnapshot = 
rcPrevToPrevOmSnapshot.get();
-
-              previousToPrevKeyTable = omPreviousToPrevSnapshot
-                  .getMetadataManager()
-                  .getKeyTable(bucketInfo.getBucketLayout());
-            }
-
-            try (TableIterator<String, ? extends Table.KeyValue<String,
-                RepeatedOmKeyInfo>> deletedIterator = snapDeletedTable
-                .iterator()) {
-
-              String lastKeyInCurrentRun = null;
-              String deletedTableSeek = snapshotSeekMap.getOrDefault(
-                  currSnapInfo.getTableKey(), snapshotBucketKey);
-              deletedIterator.seek(deletedTableSeek);
-              // To avoid processing the last key from the previous
-              // run again.
-              if (!deletedTableSeek.equals(snapshotBucketKey) &&
-                  deletedIterator.hasNext()) {
-                deletedIterator.next();
-              }
-
-              while (deletedIterator.hasNext() && delCount < keyLimitPerTask) {
-                Table.KeyValue<String, RepeatedOmKeyInfo>
-                    deletedKeyValue = deletedIterator.next();
-                String deletedKey = deletedKeyValue.getKey();
-                lastKeyInCurrentRun = deletedKey;
-
-                // Exit if it is out of the bucket scope.
-                if (!deletedKey.startsWith(snapshotBucketKey)) {
-                  break;
-                }
-
-                RepeatedOmKeyInfo repeatedOmKeyInfo =
-                    deletedKeyValue.getValue();
-
-                List<BlockGroup> blockGroupList = new ArrayList<>();
-                RepeatedOmKeyInfo newRepeatedOmKeyInfo =
-                    new RepeatedOmKeyInfo();
-                for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) 
{
-                  if (previousSnapshot != null) {
-                    // Calculates the exclusive size for the previous
-                    // snapshot. See Java Doc for more info.
-                    calculateExclusiveSize(previousSnapshot,
-                        previousToPrevSnapshot, keyInfo, bucketInfo, volumeId,
-                        snapRenamedTable, previousKeyTable, prevRenamedTable,
-                        previousToPrevKeyTable, exclusiveSizeMap,
-                        exclusiveReplicatedSizeMap);
-                  }
-
-                  if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
-                      keyInfo, bucketInfo, volumeId, null)) {
-                    List<BlockGroup> blocksForKeyDelete = currOmSnapshot
-                        .getMetadataManager()
-                        .getBlocksForKeyDelete(deletedKey);
-                    if (blocksForKeyDelete != null) {
-                      blockGroupList.addAll(blocksForKeyDelete);
-                    }
-                    delCount++;
-                  } else {
-                    newRepeatedOmKeyInfo.addOmKeyInfo(keyInfo);
-                  }
-                }
-
-                if (!newRepeatedOmKeyInfo.getOmKeyInfoList().isEmpty() &&
-                    newRepeatedOmKeyInfo.getOmKeyInfoList().size() !=
-                        repeatedOmKeyInfo.getOmKeyInfoList().size()) {
-                  keysToModify.put(deletedKey, newRepeatedOmKeyInfo);
-                }
-
-                if (newRepeatedOmKeyInfo.getOmKeyInfoList().size() !=
-                    repeatedOmKeyInfo.getOmKeyInfoList().size()) {
-                  keysToPurge.addAll(blockGroupList);
-                }
-              }
-
-              if (delCount < keyLimitPerTask) {
-                // Deep clean is completed, we can update the SnapInfo.
-                deepCleanedSnapshots.add(currSnapInfo.getTableKey());
-                // exclusiveSizeList contains check is used to prevent
-                // case where there is no entry in deletedTable, this
-                // will throw NPE when we submit request.
-                if (previousSnapshot != null && exclusiveSizeMap
-                    .containsKey(previousSnapshot.getTableKey())) {
-                  completedExclusiveSizeSet.add(
-                      previousSnapshot.getTableKey());
-                }
-
-                snapshotSeekMap.remove(currSnapInfo.getTableKey());
-              } else {
-                // There are keys that still needs processing
-                // we can continue from it in the next iteration
-                if (lastKeyInCurrentRun != null) {
-                  snapshotSeekMap.put(currSnapInfo.getTableKey(),
-                      lastKeyInCurrentRun);
-                }
-              }
-
-              if (!keysToPurge.isEmpty()) {
-                processKeyDeletes(keysToPurge,
-                    keysToModify, currSnapInfo.getTableKey(),
-                    
Optional.ofNullable(previousSnapshot).map(SnapshotInfo::getSnapshotId).orElse(null));
-              }
-            } finally {
-              IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot);
+          }
+          try (UncheckedAutoCloseableSupplier<OmSnapshot> omSnapshot = 
snapInfo == null ? null :
+              omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), 
snapInfo.getBucketName(),
+                  snapInfo.getName())) {
+            KeyManager keyManager = snapInfo == null ? 
getOzoneManager().getKeyManager()
+                : omSnapshot.get().getKeyManager();
+            processDeletedKeysForStore(snapInfo, keyManager, remainNum);
+          }
+        } catch (IOException | InterruptedException e) {
+          LOG.error("Error while running delete files background task for 
store {}. Will retry at next run.",
+              snapInfo, e);
+        } finally {
+          if (snapshotId == null) {
+            isRunningOnAOS.set(false);
+            synchronized (deletingService) {
+              this.deletingService.notify();
             }
           }
-
         }
       }
-
-      updateDeepCleanedSnapshots(deepCleanedSnapshots);
-      updateSnapshotExclusiveSize();
-    }
-
-    private void updateSnapshotExclusiveSize() {
-
-      if (completedExclusiveSizeSet.isEmpty()) {
-        return;
-      }
-
-      Iterator<String> completedSnapshotIterator =
-          completedExclusiveSizeSet.iterator();
-      while (completedSnapshotIterator.hasNext()) {
-        ClientId clientId = ClientId.randomId();
-        String dbKey = completedSnapshotIterator.next();
-        SnapshotSize snapshotSize = SnapshotSize.newBuilder()
-                .setExclusiveSize(exclusiveSizeMap.getOrDefault(dbKey, 0L))
-                .setExclusiveReplicatedSize(
-                    exclusiveReplicatedSizeMap.getOrDefault(dbKey, 0L))
-                .build();
-        SetSnapshotPropertyRequest setSnapshotPropertyRequest =
-            SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotKey(dbKey)
-                .setSnapshotSize(snapshotSize)
-                .build();
-
-        OMRequest omRequest = OMRequest.newBuilder()
-            .setCmdType(Type.SetSnapshotProperty)
-            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
-            .setClientId(clientId.toString())
-            .build();
-        submitRequest(omRequest, clientId);
-        exclusiveSizeMap.remove(dbKey);
-        exclusiveReplicatedSizeMap.remove(dbKey);
-        completedSnapshotIterator.remove();
-      }
-    }
-
-    private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) 
{
-      for (String deepCleanedSnapshot: deepCleanedSnapshots) {
-        ClientId clientId = ClientId.randomId();
-        SetSnapshotPropertyRequest setSnapshotPropertyRequest =
-            SetSnapshotPropertyRequest.newBuilder()
-                .setSnapshotKey(deepCleanedSnapshot)
-                .setDeepCleanedDeletedKey(true)
-                .build();
-
-        OMRequest omRequest = OMRequest.newBuilder()
-            .setCmdType(Type.SetSnapshotProperty)
-            .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
-            .setClientId(clientId.toString())
-            .build();
-
-        submitRequest(omRequest, clientId);
-      }
-    }
-
-    public void submitRequest(OMRequest omRequest, ClientId clientId) {
-      try {
-        OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, 
clientId, getRunCount().get());
-      } catch (ServiceException e) {
-        LOG.error("Snapshot deep cleaning request failed. " +
-            "Will retry at next run.", e);
-      }
+      // By design, no one cares about the results of this call back.
+      return EmptyTaskResult.newResult();
     }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index 27416bc95f..9339e844f2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -93,6 +93,11 @@ public static SnapshotInfo getSnapshotInfo(OzoneManager 
ozoneManager,
                                              SnapshotChainManager chainManager,
                                              UUID snapshotId) throws 
IOException {
     String tableKey = chainManager.getTableKey(snapshotId);
+    if (tableKey == null) {
+      LOG.error("Snapshot not found with UUID '{}'", snapshotId);
+      throw new OMException("Snapshot not found with UUID '" + snapshotId + 
"'",
+          FILE_NOT_FOUND);
+    }
     return SnapshotUtils.getSnapshotInfo(ozoneManager, tableKey);
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index a306ae1cf1..3b55255ee7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -22,6 +22,7 @@
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -67,6 +68,7 @@
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.KeyManagerImpl;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OmTestManagers;
@@ -87,6 +89,7 @@
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.OzoneTestBase;
@@ -128,6 +131,7 @@ class TestKeyDeletingService extends OzoneTestBase {
   private KeyManager keyManager;
   private OMMetadataManager metadataManager;
   private KeyDeletingService keyDeletingService;
+  private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
   private ScmBlockLocationTestingClient scmBlockTestingClient;
 
   @BeforeAll
@@ -143,6 +147,8 @@ private void createConfig(File testDir) {
         100, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
         100, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL,
+        100, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
         1, TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
@@ -155,6 +161,7 @@ private void createSubject() throws Exception {
     OmTestManagers omTestManagers = new OmTestManagers(conf, 
scmBlockTestingClient, null);
     keyManager = omTestManagers.getKeyManager();
     keyDeletingService = keyManager.getDeletingService();
+    snapshotDirectoryCleaningService = 
keyManager.getSnapshotDirectoryService();
     writeClient = omTestManagers.getWriteClient();
     om = omTestManagers.getOzoneManager();
     metadataManager = omTestManagers.getMetadataManager();
@@ -207,7 +214,9 @@ void checkIfDeleteServiceIsDeletingKeys()
           () -> getDeletedKeyCount() >= initialDeletedCount + keyCount,
           100, 10000);
       assertThat(getRunCount()).isGreaterThan(initialRunCount);
-      
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+      assertThat(keyManager.getPendingDeletionKeys(new 
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+              
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), 
null,
+              keyManager, om.getMetadataManager().getLock()), 
Integer.MAX_VALUE).getKeyBlocksList())
           .isEmpty();
     }
 
@@ -236,7 +245,7 @@ void checkDeletionForKeysWithMultipleVersions() throws 
Exception {
           1000, 10000);
       assertThat(getRunCount())
           .isGreaterThan(initialRunCount);
-      
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+      assertThat(keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE).getKeyBlocksList())
           .isEmpty();
 
       // The 1st version of the key has 1 block and the 2nd version has 2
@@ -278,7 +287,10 @@ void checkDeletedTableCleanUpForSnapshot() throws 
Exception {
           1000, 10000);
       assertThat(getRunCount())
           .isGreaterThan(initialRunCount);
-      
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+      assertThat(keyManager.getPendingDeletionKeys(new 
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+              
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), 
null,
+              keyManager, om.getMetadataManager().getLock()),
+          Integer.MAX_VALUE).getKeyBlocksList())
           .isEmpty();
 
       // deletedTable should have deleted key of the snapshot bucket
@@ -334,8 +346,9 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
       when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> {
         return omSnapshotManager;
       });
-      KeyDeletingService service = new KeyDeletingService(ozoneManager, 
scmBlockTestingClient, km, 10000,
-          100000, conf, false);
+      when(ozoneManager.getKeyManager()).thenReturn(km);
+      KeyDeletingService service = new KeyDeletingService(ozoneManager, 
scmBlockTestingClient, 10000,
+          100000, conf, 10, false);
       service.shutdown();
       final long initialSnapshotCount = 
metadataManager.countRowsInTable(snapshotInfoTable);
       final long initialDeletedCount = 
metadataManager.countRowsInTable(deletedTable);
@@ -376,7 +389,7 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
           }
         }, 1000, 10000);
         return i.callRealMethod();
-      }).when(omSnapshotManager).getSnapshot(ArgumentMatchers.eq(volumeName), 
ArgumentMatchers.eq(bucketName),
+      
}).when(omSnapshotManager).getActiveSnapshot(ArgumentMatchers.eq(volumeName), 
ArgumentMatchers.eq(bucketName),
           ArgumentMatchers.eq(snap1));
       assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, 
metadataManager);
       doAnswer(i -> {
@@ -385,7 +398,7 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
           Assertions.assertNotEquals(deletePathKey[0], group.getGroupID());
         }
         return pendingKeysDeletion;
-      }).when(km).getPendingDeletionKeys(anyInt());
+      }).when(km).getPendingDeletionKeys(any(), anyInt());
       service.runPeriodicalTaskNow();
       service.runPeriodicalTaskNow();
       assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, 
metadataManager);
@@ -582,9 +595,15 @@ void testSnapshotExclusiveSize() throws Exception {
       // Create Snapshot4
       String snap4 = uniqueObjectName("snap");
       writeClient.createSnapshot(testVolumeName, testBucketName, snap4);
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 4, 
metadataManager);
       createAndCommitKey(testVolumeName, testBucketName, 
uniqueObjectName("key"), 3);
 
       long prevKdsRunCount = getRunCount();
+      long prevSnapshotDirectorServiceCnt = 
snapshotDirectoryCleaningService.getRunCount().get();
+      // Let SnapshotDirectoryCleaningService to run for some iterations
+      GenericTestUtils.waitFor(
+          () -> (snapshotDirectoryCleaningService.getRunCount().get() > 
prevSnapshotDirectorServiceCnt + 20),
+          100, 100000);
       keyDeletingService.resume();
 
       Map<String, Long> expectedSize = new ImmutableMap.Builder<String, Long>()
@@ -597,22 +616,23 @@ void testSnapshotExclusiveSize() throws Exception {
 
       // Let KeyDeletingService to run for some iterations
       GenericTestUtils.waitFor(
-          () -> (getRunCount() > prevKdsRunCount + 5),
-          100, 10000);
-
+          () -> (getRunCount() > prevKdsRunCount + 20),
+          100, 100000);
       // Check if the exclusive size is set.
+      om.awaitDoubleBufferFlush();
       try (TableIterator<String, ? extends Table.KeyValue<String, 
SnapshotInfo>>
                iterator = snapshotInfoTable.iterator()) {
         while (iterator.hasNext()) {
           Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
+          SnapshotInfo snapshotInfo = 
om.getMetadataManager().getSnapshotInfoTable().get(snapshotEntry.getKey());
           String snapshotName = snapshotEntry.getValue().getName();
 
-          Long expected = expectedSize.getOrDefault(snapshotName, 0L);
+          Long expected = expectedSize.getOrDefault(snapshotName, 
snapshotInfo.getExclusiveSize());
           assertNotNull(expected);
           System.out.println(snapshotName);
-          assertEquals(expected, snapshotEntry.getValue().getExclusiveSize());
+          assertEquals(expected, snapshotInfo.getExclusiveSize());
           // Since for the test we are using RATIS/THREE
-          assertEquals(expected * 3, 
snapshotEntry.getValue().getExclusiveReplicatedSize());
+          assertEquals(expected * 3, 
snapshotInfo.getExclusiveReplicatedSize());
         }
       }
     }
@@ -647,7 +667,7 @@ void cleanup() {
 
     @Test
     @DisplayName("Should not update keys when purge request times out during 
key deletion")
-    public void testFailingModifiedKeyPurge() throws IOException {
+    public void testFailingModifiedKeyPurge() throws IOException, 
InterruptedException {
 
       try (MockedStatic<OzoneManagerRatisUtils> mocked =  
mockStatic(OzoneManagerRatisUtils.class,
           CALLS_REAL_METHODS)) {
@@ -781,8 +801,7 @@ private static void checkSnapDeepCleanStatus(Table<String, 
SnapshotInfo> table,
   private static void assertTableRowCount(Table<String, ?> table,
         long count, OMMetadataManager metadataManager)
       throws TimeoutException, InterruptedException {
-    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table,
-            metadataManager), 1000, 120000); // 2 minutes
+    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table, 
metadataManager), 1000, 120000); // 2 minutes
   }
 
   private static boolean assertTableRowCount(long expectedCount,
@@ -918,7 +937,7 @@ private long getRunCount() {
 
   private int countKeysPendingDeletion() {
     try {
-      final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+      final int count = keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE)
           .getKeyBlocksList().size();
       LOG.debug("KeyManager keys pending deletion: {}", count);
       return count;
@@ -929,7 +948,7 @@ private int countKeysPendingDeletion() {
 
   private long countBlocksPendingDeletion() {
     try {
-      return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+      return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE)
           .getKeyBlocksList()
           .stream()
           .map(BlockGroup::getBlockIDList)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to