This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ad5a507dfa HDDS-13035. SnapshotDeletingService should hold write locks 
while purging deleted snapshots (#8554)
ad5a507dfa is described below

commit ad5a507dfa86caae16b872867d99778a639ffd4c
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jun 13 13:46:53 2025 -0400

    HDDS-13035. SnapshotDeletingService should hold write locks while purging 
deleted snapshots (#8554)
---
 .../TestDirectoryDeletingServiceWithFSO.java       |   1 -
 ...TestSnapshotDeletingServiceIntegrationTest.java | 494 ++++++++-------------
 .../ozone/om/service/DirectoryDeletingService.java |  23 +-
 .../ozone/om/service/KeyDeletingService.java       |  23 +-
 .../ozone/om/service/SnapshotDeletingService.java  |  49 +-
 .../ozone/om/service/TestKeyDeletingService.java   |   4 +-
 6 files changed, 209 insertions(+), 385 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index 1933925384..3364939f8b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -574,7 +574,6 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
     DirectoryDeletingService dirDeletingService = 
cluster.getOzoneManager().getKeyManager().getDirDeletingService();
     // Suspend KeyDeletingService
     dirDeletingService.suspend();
-    GenericTestUtils.waitFor(() -> !dirDeletingService.isRunningOnAOS(), 1000, 
10000);
     Random random = new Random();
     final String testVolumeName = "volume" + random.nextInt();
     final String testBucketName = "bucket" + random.nextInt();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java
similarity index 60%
rename from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
rename to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java
index 48d1c2f978..080844d1f5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.snapshot;
+package org.apache.hadoop.ozone.om.service;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
@@ -23,35 +23,34 @@
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -63,12 +62,11 @@
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -76,22 +74,25 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
-import org.apache.hadoop.ozone.om.service.KeyDeletingService;
-import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.tag.Flaky;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestMethodOrder;
-import org.mockito.Mockito;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedConstruction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -161,6 +162,10 @@ public void closeAllSnapshots() {
     while (!rcSnaps.isEmpty()) {
       rcSnaps.pop().close();
     }
+    // Resume services
+    om.getKeyManager().getDirDeletingService().resume();
+    om.getKeyManager().getDeletingService().resume();
+    om.getKeyManager().getSnapshotDeletingService().resume();
   }
 
   private UncheckedAutoCloseableSupplier<OmSnapshot> getOmSnapshot(String 
volume, String bucket, String snapshotName)
@@ -532,233 +537,6 @@ public void testSnapshotWithFSO() throws Exception {
     snap1 = null;
   }
 
-  private DirectoryDeletingService 
getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
-                                                                     
AtomicBoolean dirDeletionStarted)
-      throws InterruptedException, TimeoutException, IOException {
-    OzoneManager ozoneManager = Mockito.spy(om);
-    om.getKeyManager().getDirDeletingService().shutdown();
-    KeyManager keyManager = Mockito.spy(om.getKeyManager());
-    when(ozoneManager.getKeyManager()).thenReturn(keyManager);
-    GenericTestUtils.waitFor(() -> 
om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
-        100000);
-    DirectoryDeletingService directoryDeletingService = Mockito.spy(new 
DirectoryDeletingService(10000,
-        TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1, 
false));
-    directoryDeletingService.shutdown();
-    GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount() 
== 0, 1000,
-        100000);
-    doAnswer(i -> {
-      // Wait for SDS to reach DDS wait block before processing any deleted 
directories.
-      GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
-      dirDeletionStarted.set(true);
-      return i.callRealMethod();
-    }).when(keyManager).getDeletedDirEntries();
-    return directoryDeletingService;
-  }
-
-  private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean 
keyDeletionWaitStarted,
-                                                         AtomicBoolean 
keyDeletionStarted)
-      throws InterruptedException, TimeoutException, IOException {
-    OzoneManager ozoneManager = Mockito.spy(om);
-    om.getKeyManager().getDeletingService().shutdown();
-    GenericTestUtils.waitFor(() -> 
om.getKeyManager().getDeletingService().getThreadCount() == 0, 1000,
-        100000);
-    KeyManager keyManager = Mockito.spy(om.getKeyManager());
-    when(ozoneManager.getKeyManager()).thenReturn(keyManager);
-    KeyDeletingService keyDeletingService = Mockito.spy(new 
KeyDeletingService(ozoneManager,
-        ozoneManager.getScmClient().getBlockClient(), 10000,
-        100000, cluster.getConf(), 10, false));
-    keyDeletingService.shutdown();
-    GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 
1000,
-        100000);
-    when(keyManager.getPendingDeletionKeys(any(), anyInt())).thenAnswer(i -> {
-      // wait for SDS to reach the KDS wait block before processing any key.
-      GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000);
-      keyDeletionStarted.set(true);
-      return i.callRealMethod();
-    });
-    return keyDeletingService;
-  }
-
-  @SuppressWarnings("checkstyle:parameternumber")
-  private SnapshotDeletingService 
getMockedSnapshotDeletingService(KeyDeletingService keyDeletingService,
-                                                                   
DirectoryDeletingService directoryDeletingService,
-                                                                   
AtomicBoolean snapshotDeletionStarted,
-                                                                   
AtomicBoolean keyDeletionWaitStarted,
-                                                                   
AtomicBoolean dirDeletionWaitStarted,
-                                                                   
AtomicBoolean keyDeletionStarted,
-                                                                   
AtomicBoolean dirDeletionStarted,
-                                                                   OzoneBucket 
testBucket)
-      throws InterruptedException, TimeoutException, IOException {
-    OzoneManager ozoneManager = Mockito.spy(om);
-    om.getKeyManager().getSnapshotDeletingService().shutdown();
-    GenericTestUtils.waitFor(() -> 
om.getKeyManager().getSnapshotDeletingService().getThreadCount() == 0, 1000,
-        100000);
-    KeyManager keyManager = Mockito.spy(om.getKeyManager());
-    OmMetadataManagerImpl omMetadataManager = 
Mockito.spy((OmMetadataManagerImpl)om.getMetadataManager());
-    SnapshotChainManager unMockedSnapshotChainManager =
-        
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager();
-    SnapshotChainManager snapshotChainManager = 
Mockito.spy(unMockedSnapshotChainManager);
-    OmSnapshotManager omSnapshotManager = 
Mockito.spy(om.getOmSnapshotManager());
-    when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
-    when(ozoneManager.getKeyManager()).thenReturn(keyManager);
-    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
-    
when(omMetadataManager.getSnapshotChainManager()).thenReturn(snapshotChainManager);
-    when(keyManager.getDeletingService()).thenReturn(keyDeletingService);
-    
when(keyManager.getDirDeletingService()).thenReturn(directoryDeletingService);
-    SnapshotDeletingService snapshotDeletingService = Mockito.spy(new 
SnapshotDeletingService(10000,
-        100000, ozoneManager));
-    snapshotDeletingService.shutdown();
-    GenericTestUtils.waitFor(() -> snapshotDeletingService.getThreadCount() == 
0, 1000,
-        100000);
-    when(snapshotChainManager.iterator(anyBoolean())).thenAnswer(i -> {
-      Iterator<UUID> itr = (Iterator<UUID>) i.callRealMethod();
-      return Lists.newArrayList(itr).stream().filter(uuid -> {
-        try {
-          SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(om, 
snapshotChainManager, uuid);
-          return snapshotInfo.getBucketName().equals(testBucket.getName()) &&
-              snapshotInfo.getVolumeName().equals(testBucket.getVolumeName());
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }).iterator();
-    });
-    when(snapshotChainManager.getLatestGlobalSnapshotId())
-        .thenAnswer(i -> 
unMockedSnapshotChainManager.getLatestGlobalSnapshotId());
-    when(snapshotChainManager.getOldestGlobalSnapshotId())
-        .thenAnswer(i -> 
unMockedSnapshotChainManager.getOldestGlobalSnapshotId());
-    doAnswer(i -> {
-      // KDS wait block reached in SDS.
-      GenericTestUtils.waitFor(() -> {
-        return keyDeletingService.isRunningOnAOS();
-      }, 1000, 100000);
-      keyDeletionWaitStarted.set(true);
-      return i.callRealMethod();
-    }).when(snapshotDeletingService).waitForKeyDeletingService();
-    doAnswer(i -> {
-      // DDS wait block reached in SDS.
-      GenericTestUtils.waitFor(directoryDeletingService::isRunningOnAOS, 1000, 
100000);
-      dirDeletionWaitStarted.set(true);
-      return i.callRealMethod();
-    }).when(snapshotDeletingService).waitForDirDeletingService();
-    doAnswer(i -> {
-      // Assert KDS & DDS is not running when SDS starts moving entries & 
assert all wait block, KDS processing
-      // AOS block & DDS AOS block have been executed.
-      Assertions.assertTrue(keyDeletionWaitStarted.get());
-      Assertions.assertTrue(dirDeletionWaitStarted.get());
-      Assertions.assertTrue(keyDeletionStarted.get());
-      Assertions.assertTrue(dirDeletionStarted.get());
-      Assertions.assertFalse(keyDeletingService.isRunningOnAOS());
-      Assertions.assertFalse(directoryDeletingService.isRunningOnAOS());
-      snapshotDeletionStarted.set(true);
-      return i.callRealMethod();
-    }).when(omSnapshotManager).getSnapshot(anyString(), anyString(), 
anyString());
-    return snapshotDeletingService;
-  }
-
-  @Test
-  @Order(4)
-  @Flaky("HDDS-11847")
-  public void testParallelExcecutionOfKeyDeletionAndSnapshotDeletion() throws 
Exception {
-    AtomicBoolean keyDeletionWaitStarted = new AtomicBoolean(false);
-    AtomicBoolean dirDeletionWaitStarted = new AtomicBoolean(false);
-    AtomicBoolean keyDeletionStarted = new AtomicBoolean(false);
-    AtomicBoolean dirDeletionStarted = new AtomicBoolean(false);
-    AtomicBoolean snapshotDeletionStarted = new AtomicBoolean(false);
-    Random random = new Random();
-    String bucketName = "bucket" + random.nextInt();
-    BucketArgs bucketArgs = new BucketArgs.Builder()
-        .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
-        .build();
-    OzoneBucket testBucket = TestDataUtil.createBucket(
-        client, VOLUME_NAME, bucketArgs, bucketName);
-    // mock keyDeletingService
-    KeyDeletingService keyDeletingService = 
getMockedKeyDeletingService(keyDeletionWaitStarted, keyDeletionStarted);
-
-    // mock dirDeletingService
-    DirectoryDeletingService directoryDeletingService = 
getMockedDirectoryDeletingService(dirDeletionWaitStarted,
-        dirDeletionStarted);
-
-    // mock snapshotDeletingService.
-    SnapshotDeletingService snapshotDeletingService = 
getMockedSnapshotDeletingService(keyDeletingService,
-        directoryDeletingService, snapshotDeletionStarted, 
keyDeletionWaitStarted, dirDeletionWaitStarted,
-        keyDeletionStarted, dirDeletionStarted, testBucket);
-    createSnapshotFSODataForBucket(testBucket);
-    List<Table.KeyValue<String, String>> renamesKeyEntries;
-    List<Table.KeyValue<String, List<OmKeyInfo>>> deletedKeyEntries;
-    List<Table.KeyValue<String, OmKeyInfo>> deletedDirEntries;
-    try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
-             om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(), 
testBucket.getName(),
-                 testBucket.getName() + "snap2")) {
-      renamesKeyEntries = 
snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
-          testBucket.getName(), "", (kv) -> true, 1000);
-      deletedKeyEntries = 
snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
-          testBucket.getName(), "", (kv) -> true, 1000);
-      deletedDirEntries = 
snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
-          testBucket.getName(), 1000);
-    }
-    Thread keyDeletingThread = new Thread(() -> {
-      try {
-        keyDeletingService.runPeriodicalTaskNow();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    });
-    Thread directoryDeletingThread = new Thread(() -> {
-      try {
-        directoryDeletingService.runPeriodicalTaskNow();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    });
-    ExecutorService snapshotDeletingThread = Executors.newFixedThreadPool(1);
-    Runnable snapshotDeletionRunnable = () -> {
-      try {
-        snapshotDeletingService.runPeriodicalTaskNow();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    };
-    keyDeletingThread.start();
-    directoryDeletingThread.start();
-    Future<?> future = snapshotDeletingThread.submit(snapshotDeletionRunnable);
-    GenericTestUtils.waitFor(snapshotDeletionStarted::get, 1000, 30000);
-    future.get();
-    try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
-             om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(), 
testBucket.getName(),
-                 testBucket.getName() + "snap2")) {
-      Assertions.assertEquals(Collections.emptyList(),
-          
snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
-          testBucket.getName(), "", (kv) -> true, 1000));
-      Assertions.assertEquals(Collections.emptyList(),
-          
snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
-          testBucket.getName(), "", (kv) -> true, 1000));
-      Assertions.assertEquals(Collections.emptyList(),
-          
snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
-          testBucket.getName(), 1000));
-    }
-    List<Table.KeyValue<String, String>> aosRenamesKeyEntries =
-        om.getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
-            testBucket.getName(), "", (kv) -> true, 1000);
-    List<Table.KeyValue<String, List<OmKeyInfo>>> aosDeletedKeyEntries =
-        om.getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
-            testBucket.getName(), "", (kv) -> true, 1000);
-    List<Table.KeyValue<String, OmKeyInfo>> aosDeletedDirEntries =
-        om.getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
-            testBucket.getName(), 1000);
-    renamesKeyEntries.forEach(entry -> 
Assertions.assertTrue(aosRenamesKeyEntries.contains(entry)));
-    deletedKeyEntries.forEach(entry -> 
Assertions.assertTrue(aosDeletedKeyEntries.contains(entry)));
-    deletedDirEntries.forEach(entry -> 
Assertions.assertTrue(aosDeletedDirEntries.contains(entry)));
-    Mockito.reset(snapshotDeletingService);
-    SnapshotInfo snap2 = SnapshotUtils.getSnapshotInfo(om, 
testBucket.getVolumeName(),
-        testBucket.getName(), testBucket.getName() + "snap2");
-    Assertions.assertEquals(snap2.getSnapshotStatus(), 
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
-    future = snapshotDeletingThread.submit(snapshotDeletionRunnable);
-    future.get();
-    Assertions.assertThrows(IOException.class, () -> 
SnapshotUtils.getSnapshotInfo(om, testBucket.getVolumeName(),
-        testBucket.getName(), testBucket.getName() + "snap2"));
-    cluster.restartOzoneManager();
-  }
-
   /*
       Flow
       ----
@@ -846,74 +624,166 @@ private synchronized void 
createSnapshotDataForBucket(OzoneBucket bucket) throws
         bucket.getName()));
   }
 
-  /*
-      Flow
-      ----
-      create dir0/key0
-      create dir1/key1
-      overwrite dir0/key0
-      create dir2/key2
-      create snap1
-      rename dir1/key1 -> dir1/key10
-      delete dir1/key10
-      delete dir2
-      create snap2
-      delete snap2
-   */
-  private synchronized void createSnapshotFSODataForBucket(OzoneBucket bucket) 
throws Exception {
-    Table<String, SnapshotInfo> snapshotInfoTable =
-        om.getMetadataManager().getSnapshotInfoTable();
-    Table<String, RepeatedOmKeyInfo> deletedTable =
-        om.getMetadataManager().getDeletedTable();
-    Table<String, OmKeyInfo> deletedDirTable =
-        om.getMetadataManager().getDeletedDirTable();
-    Table<String, OmKeyInfo> keyTable =
-        
om.getMetadataManager().getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
-    Table<String, OmDirectoryInfo> dirTable =
-        om.getMetadataManager().getDirectoryTable();
-    Table<String, String> renameTable = 
om.getMetadataManager().getSnapshotRenamedTable();
-    OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
-        om.getMetadataManager();
-    Map<String, Integer> countMap =
-        metadataManager.listTables().entrySet().stream()
-            .collect(Collectors.toMap(Map.Entry::getKey, e -> {
-              try {
-                return (int)metadataManager.countRowsInTable(e.getValue());
-              } catch (IOException ex) {
-                throw new RuntimeException(ex);
-              }
-            }));
-    TestDataUtil.createKey(bucket, "dir0/" + bucket.getName() + "key0", 
CONTENT.array());
-    TestDataUtil.createKey(bucket, "dir1/" + bucket.getName() + "key1", 
CONTENT.array());
-    assertTableRowCount(keyTable, countMap.get(keyTable.getName()) + 2);
-    assertTableRowCount(dirTable, countMap.get(dirTable.getName()) + 2);
+  private MockedConstruction<ReclaimableKeyFilter> 
getMockedReclaimableKeyFilter(String volume, String bucket,
+      AtomicBoolean kdsWaitStarted, AtomicBoolean sdsLockWaitStarted,
+      AtomicBoolean sdsLockAcquired, AtomicBoolean kdsFinished, 
ReclaimableKeyFilter keyFilter) throws IOException {
+
+    return mockConstruction(ReclaimableKeyFilter.class,
+        (mocked, context) -> {
+          when(mocked.apply(any())).thenAnswer(i -> {
+            Table.KeyValue<String, OmKeyInfo> keyInfo = i.getArgument(0);
+            if (!keyInfo.getValue().getVolumeName().equals(volume) ||
+                !keyInfo.getValue().getBucketName().equals(bucket)) {
+              return keyFilter.apply(i.getArgument(0));
+            }
+            keyFilter.apply(i.getArgument(0));
+            //Notify SDS that Kds has started for the bucket.
+            kdsWaitStarted.set(true);
+            GenericTestUtils.waitFor(sdsLockWaitStarted::get, 1000, 10000);
+            // Wait for 1 more second so that the command moves to lock wait.
+            Thread.sleep(1000);
+            return keyFilter.apply(i.getArgument(0));
+          });
+          doAnswer(i -> {
+            assertTrue(sdsLockWaitStarted.get());
+            assertFalse(sdsLockAcquired.get());
+            kdsFinished.set(true);
+            keyFilter.close();
+            return null;
+          }).when(mocked).close();
+          when(mocked.getExclusiveReplicatedSizeMap()).thenAnswer(i -> 
keyFilter.getExclusiveReplicatedSizeMap());
+          when(mocked.getExclusiveSizeMap()).thenAnswer(i -> 
keyFilter.getExclusiveSizeMap());
+        });
+  }
 
-    // Overwrite bucket1key0, This is a newer version of the key which should
-    // reclaimed as this is a different version of the key.
-    TestDataUtil.createKey(bucket, "dir0/" + bucket.getName() + "key0", 
CONTENT.array());
-    TestDataUtil.createKey(bucket, "dir2/" + bucket.getName() + "key2", 
CONTENT.array());
-    assertTableRowCount(keyTable, countMap.get(keyTable.getName()) + 3);
-    assertTableRowCount(dirTable, countMap.get(dirTable.getName()) + 3);
-    assertTableRowCount(deletedTable, countMap.get(deletedTable.getName()) + 
1);
-    // create snap1
-    client.getProxy().createSnapshot(bucket.getVolumeName(), bucket.getName(),
-        bucket.getName() + "snap1");
-    bucket.renameKey("dir1/" + bucket.getName() + "key1", "dir1/" + 
bucket.getName() + "key10");
-    bucket.renameKey("dir1/", "dir10/");
-    assertTableRowCount(renameTable, countMap.get(renameTable.getName()) + 2);
-    client.getProxy().deleteKey(bucket.getVolumeName(), bucket.getName(),
-        "dir10/" + bucket.getName() + "key10", false);
-    assertTableRowCount(deletedTable, countMap.get(deletedTable.getName()) + 
1);
-    // Key 2 is deleted here, which will be reclaimed here as
-    // it is not being referenced by previous snapshot.
-    client.getProxy().deleteKey(bucket.getVolumeName(), bucket.getName(), 
"dir2", true);
-    assertTableRowCount(deletedDirTable, 
countMap.get(deletedDirTable.getName()) + 1);
-    client.getProxy().createSnapshot(bucket.getVolumeName(), bucket.getName(),
-        bucket.getName() + "snap2");
-    // Delete Snapshot 2.
-    client.getProxy().deleteSnapshot(bucket.getVolumeName(), bucket.getName(),
-        bucket.getName() + "snap2");
-    assertTableRowCount(snapshotInfoTable, 
countMap.get(snapshotInfoTable.getName()) +  2);
+  @ParameterizedTest
+  @CsvSource({"true, 0", "true, 1", "false, 0", "false, 1", "false, 2"})
+  @DisplayName("Tests Snapshot Deleting Service while KeyDeletingService is 
already running.")
+  @Order(4)
+  public void testSnapshotDeletingServiceWaitsForKeyDeletingService(boolean 
kdsRunningOnAOS,
+      int snasphotDeleteIndex) throws Exception {
+    SnapshotChainManager snapshotChainManager =
+        
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager();
+    GenericTestUtils.waitFor(() -> {
+      try {
+        Iterator<UUID> itr = snapshotChainManager.iterator(false);
+        while (itr.hasNext()) {
+          SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(om, 
snapshotChainManager, itr.next());
+          assertEquals(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, 
snapshotInfo.getSnapshotStatus());
+        }
+        return true;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 1000, 30000);
+    om.awaitDoubleBufferFlush();
+    // Suspend the services first
+    om.getKeyManager().getDirDeletingService().suspend();
+    om.getKeyManager().getDeletingService().suspend();
+    om.getKeyManager().getSnapshotDeletingService().suspend();
+    String volume = "vol" + RandomStringUtils.secure().nextNumeric(3),
+        bucket = "bucket" + RandomStringUtils.secure().nextNumeric(3);
+    client.getObjectStore().createVolume(volume);
+    OzoneVolume ozoneVolume = client.getObjectStore().getVolume(volume);
+    ozoneVolume.createBucket(bucket);
+    OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucket);
+
+    // Create snap0
+    client.getObjectStore().createSnapshot(volume, bucket, "snap0");
+    client.getObjectStore().getSnapshotInfo(volume, bucket, "snap0");
+    UUID snap1Id = client.getObjectStore().getSnapshotInfo(volume, bucket, 
"snap0").getSnapshotId();
+
+    // Create snap1
+    TestDataUtil.createKey(ozoneBucket, "key", CONTENT.array());
+    client.getObjectStore().createSnapshot(volume, bucket, "snap1");
+    UUID snap2Id = client.getObjectStore().getSnapshotInfo(volume, bucket, 
"snap1").getSnapshotId();
+
+    ozoneBucket.renameKey("key", "renamedKey");
+    ozoneBucket.deleteKey("renamedKey");
+    om.awaitDoubleBufferFlush();
+    UUID snap3Id;
+    ReclaimableKeyFilter keyFilter;
+    SnapshotInfo snapInfo;
+    // Create snap3 to test snapshot 3 deep cleaning otherwise just run on AOS.
+    if (kdsRunningOnAOS) {
+      snap3Id = null;
+      snapInfo = null;
+      keyFilter = new ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+          
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
+          snapInfo, om.getKeyManager(), om.getMetadataManager().getLock());
+    } else {
+
+      client.getObjectStore().createSnapshot(volume, bucket, "snap2");
+      snap3Id = client.getObjectStore().getSnapshotInfo(volume, bucket, 
"snap2").getSnapshotId();
+      om.awaitDoubleBufferFlush();
+      SnapshotInfo snap = om.getMetadataManager().getSnapshotInfo(volume, 
bucket, "snap2");
+      snap.setDeepCleanedDeletedDir(true);
+      om.getMetadataManager().getSnapshotInfoTable().put(snap.getTableKey(), 
snap);
+      assertTrue(om.getMetadataManager().getSnapshotInfo(volume, bucket, 
"snap2")
+          .isDeepCleanedDeletedDir());
+      snapInfo = SnapshotUtils.getSnapshotInfo(om, volume, bucket, "snap2");
+      keyFilter = new ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+          
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
+          snapInfo, getOmSnapshot(volume, bucket, 
"snap2").get().getKeyManager(),
+          om.getMetadataManager().getLock());
+    }
+
+
+    MultiSnapshotLocks sdsMultiLocks = new 
MultiSnapshotLocks(cluster.getOzoneManager().getMetadataManager().getLock(),
+        SNAPSHOT_GC_LOCK, true);
+    AtomicBoolean kdsWaitStarted = new AtomicBoolean(false);
+    AtomicBoolean kdsFinished = new AtomicBoolean(false);
+    AtomicBoolean sdsLockWaitStarted = new AtomicBoolean(false);
+    AtomicBoolean sdsLockAcquired = new AtomicBoolean(false);
+
+    try (MockedConstruction<MultiSnapshotLocks> mockedMultiSnapshotLock = 
mockConstruction(MultiSnapshotLocks.class,
+        (mocked, context) -> {
+          when(mocked.acquireLock(anyList())).thenAnswer(i -> {
+            List<UUID> ids = i.getArgument(0);
+            List<UUID> expectedIds = Arrays.asList(snap1Id, snap2Id, 
snap3Id).subList(snasphotDeleteIndex, Math.min(3,
+                snasphotDeleteIndex + 
2)).stream().filter(Objects::nonNull).collect(Collectors.toList());
+            if (expectedIds.equals(ids) && !sdsLockWaitStarted.get() && 
!sdsLockAcquired.get()) {
+              sdsLockWaitStarted.set(true);
+              OMLockDetails lockDetails = sdsMultiLocks.acquireLock(ids);
+              assertTrue(kdsFinished::get);
+              sdsLockAcquired.set(true);
+              return lockDetails;
+            }
+            return sdsMultiLocks.acquireLock(ids);
+          });
+          doAnswer(i -> {
+            sdsMultiLocks.releaseLock();
+            return null;
+          }).when(mocked).releaseLock();
+        })) {
+      KeyDeletingService kds = new KeyDeletingService(om, 
om.getScmClient().getBlockClient(), 500, 10000,
+          om.getConfiguration(), 1, true);
+      kds.shutdown();
+      KeyDeletingService.KeyDeletingTask task = kds.new 
KeyDeletingTask(snap3Id);
+
+      CompletableFuture.supplyAsync(() -> {
+        try (MockedConstruction<ReclaimableKeyFilter> mockedReclaimableFilter 
= getMockedReclaimableKeyFilter(
+            volume, bucket, kdsWaitStarted, sdsLockWaitStarted, 
sdsLockAcquired, kdsFinished, keyFilter)) {
+          return task.call();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      SnapshotDeletingService sds = new SnapshotDeletingService(500, 10000, 
om);
+      sds.shutdown();
+      GenericTestUtils.waitFor(kdsWaitStarted::get, 1000, 30000);
+      client.getObjectStore().deleteSnapshot(volume, bucket, "snap" + 
snasphotDeleteIndex);
+      sds.runPeriodicalTaskNow();
+      om.awaitDoubleBufferFlush();
+      if (snasphotDeleteIndex == 2) {
+        sds.runPeriodicalTaskNow();
+      }
+      assertTrue(sdsLockWaitStarted.get());
+      assertTrue(sdsLockAcquired.get());
+      assertThrows(IOException.class, () -> SnapshotUtils.getSnapshotInfo(om, 
volume, bucket,
+          "snap" + snasphotDeleteIndex));
+    }
   }
 
   private void verifySnapshotChain(SnapshotInfo deletedSnapshot,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index f96099323d..28668eb686 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -39,7 +39,6 @@
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -149,7 +148,6 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
   // from parent directory info from deleted directory table concurrently
   // and send deletion requests.
   private int ratisByteLimit;
-  private final AtomicBoolean isRunningOnAOS;
   private final SnapshotChainManager snapshotChainManager;
   private final boolean deepCleanSnapshots;
   private final ExecutorService deletionThreadPool;
@@ -173,7 +171,6 @@ public DirectoryDeletingService(long interval, TimeUnit 
unit,
 
     // always go to 90% of max limit for request as other header will be added
     this.ratisByteLimit = (int) (limit * 0.9);
-    this.isRunningOnAOS = new AtomicBoolean(false);
     registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), 
configuration);
     this.snapshotChainManager = 
((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
     this.deepCleanSnapshots = deepCleanSnapshots;
@@ -200,14 +197,10 @@ private synchronized void 
updateAndRestart(OzoneConfiguration conf) {
     start();
   }
 
-  public boolean isRunningOnAOS() {
-    return isRunningOnAOS.get();
-  }
-
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new DirDeletingTask(this, null));
+    queue.add(new DirDeletingTask(null));
     if (deepCleanSnapshots) {
       Iterator<UUID> iterator = null;
       try {
@@ -218,7 +211,7 @@ public BackgroundTaskQueue getTasks() {
       }
       while (iterator.hasNext()) {
         UUID snapshotId = iterator.next();
-        queue.add(new DirDeletingTask(this, snapshotId));
+        queue.add(new DirDeletingTask(snapshotId));
       }
     }
     return queue;
@@ -468,11 +461,9 @@ private OzoneManagerProtocolProtos.OMResponse 
submitPurgePaths(List<PurgePathReq
 
   @VisibleForTesting
   final class DirDeletingTask implements BackgroundTask {
-    private final DirectoryDeletingService directoryDeletingService;
     private final UUID snapshotId;
 
-    private DirDeletingTask(DirectoryDeletingService service, UUID snapshotId) 
{
-      this.directoryDeletingService = service;
+    DirDeletingTask(UUID snapshotId) {
       this.snapshotId = snapshotId;
     }
 
@@ -650,7 +641,6 @@ public BackgroundTaskResult call() {
         final long run = getRunCount().incrementAndGet();
         if (snapshotId == null) {
           LOG.debug("Running DirectoryDeletingService for active object store, 
{}", run);
-          isRunningOnAOS.set(true);
         } else {
           LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}", 
snapshotId, run);
         }
@@ -681,13 +671,6 @@ public BackgroundTaskResult call() {
         } catch (IOException | ExecutionException | InterruptedException e) {
           LOG.error("Error while running delete files background task for 
store {}. Will retry at next run.",
               snapInfo, e);
-        } finally {
-          if (snapshotId == null) {
-            isRunningOnAOS.set(false);
-            synchronized (directoryDeletingService) {
-              this.directoryDeletingService.notify();
-            }
-          }
         }
       }
       // By design, no one cares about the results of this call back.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index d875e95e85..781184b862 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -34,7 +34,6 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -87,7 +86,6 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
 
   private int keyLimitPerTask;
   private final AtomicLong deletedKeyCount;
-  private final AtomicBoolean isRunningOnAOS;
   private final boolean deepCleanSnapshots;
   private final SnapshotChainManager snapshotChainManager;
 
@@ -103,7 +101,6 @@ public KeyDeletingService(OzoneManager ozoneManager,
     Preconditions.checkArgument(keyLimitPerTask >= 0,
         OZONE_KEY_DELETING_LIMIT_PER_TASK + " cannot be negative.");
     this.deletedKeyCount = new AtomicLong(0);
-    this.isRunningOnAOS = new AtomicBoolean(false);
     this.deepCleanSnapshots = deepCleanSnapshots;
     this.snapshotChainManager = 
((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
     this.scmClient = scmClient;
@@ -119,10 +116,6 @@ public AtomicLong getDeletedKeyCount() {
     return deletedKeyCount;
   }
 
-  public boolean isRunningOnAOS() {
-    return isRunningOnAOS.get();
-  }
-
   Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksList,
       Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntries,
       String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException 
{
@@ -261,7 +254,7 @@ private Pair<Integer, Boolean> 
submitPurgeKeysRequest(List<DeleteBlockGroupResul
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new KeyDeletingTask(this, null));
+    queue.add(new KeyDeletingTask(null));
     if (deepCleanSnapshots) {
       Iterator<UUID> iterator = null;
       try {
@@ -272,7 +265,7 @@ public BackgroundTaskQueue getTasks() {
       }
       while (iterator.hasNext()) {
         UUID snapshotId = iterator.next();
-        queue.add(new KeyDeletingTask(this, snapshotId));
+        queue.add(new KeyDeletingTask(snapshotId));
       }
     }
     return queue;
@@ -295,11 +288,9 @@ public void setKeyLimitPerTask(int keyLimitPerTask) {
    */
   @VisibleForTesting
   final class KeyDeletingTask implements BackgroundTask {
-    private final KeyDeletingService deletingService;
     private final UUID snapshotId;
 
-    KeyDeletingTask(KeyDeletingService service, UUID snapshotId) {
-      this.deletingService = service;
+    KeyDeletingTask(UUID snapshotId) {
       this.snapshotId = snapshotId;
     }
 
@@ -427,7 +418,6 @@ public BackgroundTaskResult call() {
         final long run = getRunCount().incrementAndGet();
         if (snapshotId == null) {
           LOG.debug("Running KeyDeletingService for active object store, {}", 
run);
-          isRunningOnAOS.set(true);
         } else {
           LOG.debug("Running KeyDeletingService for snapshot : {}, {}", 
snapshotId, run);
         }
@@ -464,13 +454,6 @@ public BackgroundTaskResult call() {
         } catch (IOException e) {
           LOG.error("Error while running delete files background task for 
store {}. Will retry at next run.",
               snapInfo, e);
-        } finally {
-          if (snapshotId == null) {
-            isRunningOnAOS.set(false);
-            synchronized (deletingService) {
-              this.deletingService.notify();
-            }
-          }
         }
       }
       // By design, no one cares about the results of this call back.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index 96ae98a19b..75e9a20cdf 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -22,6 +22,7 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
@@ -52,6 +53,8 @@
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
@@ -86,7 +89,8 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
   private final int keyLimitPerTask;
   private final int snapshotDeletionPerTask;
   private final int ratisByteLimit;
-  private final long serviceTimeout;
+  private final MultiSnapshotLocks snapshotIdLocks;
+  private final List<UUID> lockIds;
 
   public SnapshotDeletingService(long interval, long serviceTimeout,
                                  OzoneManager ozoneManager)
@@ -112,32 +116,9 @@ public SnapshotDeletingService(long interval, long 
serviceTimeout,
     this.keyLimitPerTask = conf.getInt(
         OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK,
         OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
-    this.serviceTimeout = serviceTimeout;
-  }
-
-  // Wait for a notification from KeyDeletingService if the key deletion is 
running. This is to ensure, merging of
-  // entries do not start while the AOS is still processing the deleted keys.
-  @VisibleForTesting
-  public void waitForKeyDeletingService() throws InterruptedException {
-    KeyDeletingService keyDeletingService = 
getOzoneManager().getKeyManager().getDeletingService();
-    synchronized (keyDeletingService) {
-      while (keyDeletingService.isRunningOnAOS()) {
-        keyDeletingService.wait(serviceTimeout);
-      }
-    }
-  }
-
-  // Wait for a notification from DirectoryDeletingService if the directory 
deletion is running. This is to ensure,
-  // merging of entries do not start while the AOS is still processing the 
deleted keys.
-  @VisibleForTesting
-  public void waitForDirDeletingService() throws InterruptedException {
-    DirectoryDeletingService directoryDeletingService = 
getOzoneManager().getKeyManager()
-        .getDirDeletingService();
-    synchronized (directoryDeletingService) {
-      while (directoryDeletingService.isRunningOnAOS()) {
-        directoryDeletingService.wait(serviceTimeout);
-      }
-    }
+    IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock();
+    this.snapshotIdLocks = new MultiSnapshotLocks(lock, SNAPSHOT_GC_LOCK, 
true);
+    this.lockIds = new ArrayList<>(2);
   }
 
   private class SnapshotDeletingTask implements BackgroundTask {
@@ -173,16 +154,22 @@ public BackgroundTaskResult call() throws 
InterruptedException {
                 snapInfo.getTableKey());
             continue;
           }
-
           // nextSnapshot = null means entries would be moved to AOS.
           if (nextSnapshot == null) {
             LOG.info("Snapshot: {} entries will be moved to AOS.", 
snapInfo.getTableKey());
-            waitForKeyDeletingService();
-            waitForDirDeletingService();
           } else {
             LOG.info("Snapshot: {} entries will be moved to next active 
snapshot: {}",
                 snapInfo.getTableKey(), nextSnapshot.getTableKey());
           }
+          lockIds.clear();
+          lockIds.add(snapInfo.getSnapshotId());
+          if (nextSnapshot != null) {
+            lockIds.add(nextSnapshot.getSnapshotId());
+          }
+          // Acquire write lock on current snapshot and next snapshot in chain.
+          if (!snapshotIdLocks.acquireLock(lockIds).isLockAcquired()) {
+            continue;
+          }
           try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot = 
omSnapshotManager.getSnapshot(
               snapInfo.getVolumeName(), snapInfo.getBucketName(), 
snapInfo.getName())) {
             KeyManager snapshotKeyManager = snapshot.get().getKeyManager();
@@ -229,6 +216,8 @@ public BackgroundTaskResult call() throws 
InterruptedException {
             } else {
               snapshotsToBePurged.add(snapInfo.getTableKey());
             }
+          } finally {
+            snapshotIdLocks.releaseLock();
           }
           successRunCount.incrementAndGet();
           snapshotLimit--;
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index b3178580b5..674c78d16a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -347,7 +347,7 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
       keyDeletingService.suspend();
       SnapshotDeletingService snapshotDeletingService = 
om.getKeyManager().getSnapshotDeletingService();
       snapshotDeletingService.suspend();
-      GenericTestUtils.waitFor(() -> !keyDeletingService.isRunningOnAOS(), 
1000, 10000);
+
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       OzoneManager ozoneManager = Mockito.spy(om);
@@ -618,7 +618,7 @@ public void 
testKeyDeletingServiceWithDeepCleanedSnapshots() throws Exception {
       when(kds.getTasks()).thenAnswer(i -> {
         BackgroundTaskQueue queue = new BackgroundTaskQueue();
         for (UUID id : snapshotIds) {
-          queue.add(kds.new KeyDeletingTask(kds, id));
+          queue.add(kds.new KeyDeletingTask(id));
         }
         return queue;
       });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to