This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ad5a507dfa HDDS-13035. SnapshotDeletingService should hold write locks
while purging deleted snapshots (#8554)
ad5a507dfa is described below
commit ad5a507dfa86caae16b872867d99778a639ffd4c
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jun 13 13:46:53 2025 -0400
HDDS-13035. SnapshotDeletingService should hold write locks while purging
deleted snapshots (#8554)
---
.../TestDirectoryDeletingServiceWithFSO.java | 1 -
...TestSnapshotDeletingServiceIntegrationTest.java | 494 ++++++++-------------
.../ozone/om/service/DirectoryDeletingService.java | 23 +-
.../ozone/om/service/KeyDeletingService.java | 23 +-
.../ozone/om/service/SnapshotDeletingService.java | 49 +-
.../ozone/om/service/TestKeyDeletingService.java | 4 +-
6 files changed, 209 insertions(+), 385 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index 1933925384..3364939f8b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -574,7 +574,6 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
DirectoryDeletingService dirDeletingService =
cluster.getOzoneManager().getKeyManager().getDirDeletingService();
// Suspend KeyDeletingService
dirDeletingService.suspend();
- GenericTestUtils.waitFor(() -> !dirDeletingService.isRunningOnAOS(), 1000,
10000);
Random random = new Random();
final String testVolumeName = "volume" + random.nextInt();
final String testBucketName = "bucket" + random.nextInt();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java
similarity index 60%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java
index 48d1c2f978..080844d1f5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.ozone.om.snapshot;
+package org.apache.hadoop.ozone.om.service;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
@@ -23,35 +23,34 @@
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.IOUtils;
@@ -63,12 +62,11 @@
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -76,22 +74,25 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
-import org.apache.hadoop.ozone.om.service.KeyDeletingService;
-import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
-import org.mockito.Mockito;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedConstruction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,6 +162,10 @@ public void closeAllSnapshots() {
while (!rcSnaps.isEmpty()) {
rcSnaps.pop().close();
}
+ // Resume services
+ om.getKeyManager().getDirDeletingService().resume();
+ om.getKeyManager().getDeletingService().resume();
+ om.getKeyManager().getSnapshotDeletingService().resume();
}
private UncheckedAutoCloseableSupplier<OmSnapshot> getOmSnapshot(String
volume, String bucket, String snapshotName)
@@ -532,233 +537,6 @@ public void testSnapshotWithFSO() throws Exception {
snap1 = null;
}
- private DirectoryDeletingService
getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
-
AtomicBoolean dirDeletionStarted)
- throws InterruptedException, TimeoutException, IOException {
- OzoneManager ozoneManager = Mockito.spy(om);
- om.getKeyManager().getDirDeletingService().shutdown();
- KeyManager keyManager = Mockito.spy(om.getKeyManager());
- when(ozoneManager.getKeyManager()).thenReturn(keyManager);
- GenericTestUtils.waitFor(() ->
om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
- 100000);
- DirectoryDeletingService directoryDeletingService = Mockito.spy(new
DirectoryDeletingService(10000,
- TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1,
false));
- directoryDeletingService.shutdown();
- GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount()
== 0, 1000,
- 100000);
- doAnswer(i -> {
- // Wait for SDS to reach DDS wait block before processing any deleted
directories.
- GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
- dirDeletionStarted.set(true);
- return i.callRealMethod();
- }).when(keyManager).getDeletedDirEntries();
- return directoryDeletingService;
- }
-
- private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean
keyDeletionWaitStarted,
- AtomicBoolean
keyDeletionStarted)
- throws InterruptedException, TimeoutException, IOException {
- OzoneManager ozoneManager = Mockito.spy(om);
- om.getKeyManager().getDeletingService().shutdown();
- GenericTestUtils.waitFor(() ->
om.getKeyManager().getDeletingService().getThreadCount() == 0, 1000,
- 100000);
- KeyManager keyManager = Mockito.spy(om.getKeyManager());
- when(ozoneManager.getKeyManager()).thenReturn(keyManager);
- KeyDeletingService keyDeletingService = Mockito.spy(new
KeyDeletingService(ozoneManager,
- ozoneManager.getScmClient().getBlockClient(), 10000,
- 100000, cluster.getConf(), 10, false));
- keyDeletingService.shutdown();
- GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0,
1000,
- 100000);
- when(keyManager.getPendingDeletionKeys(any(), anyInt())).thenAnswer(i -> {
- // wait for SDS to reach the KDS wait block before processing any key.
- GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000);
- keyDeletionStarted.set(true);
- return i.callRealMethod();
- });
- return keyDeletingService;
- }
-
- @SuppressWarnings("checkstyle:parameternumber")
- private SnapshotDeletingService
getMockedSnapshotDeletingService(KeyDeletingService keyDeletingService,
-
DirectoryDeletingService directoryDeletingService,
-
AtomicBoolean snapshotDeletionStarted,
-
AtomicBoolean keyDeletionWaitStarted,
-
AtomicBoolean dirDeletionWaitStarted,
-
AtomicBoolean keyDeletionStarted,
-
AtomicBoolean dirDeletionStarted,
- OzoneBucket
testBucket)
- throws InterruptedException, TimeoutException, IOException {
- OzoneManager ozoneManager = Mockito.spy(om);
- om.getKeyManager().getSnapshotDeletingService().shutdown();
- GenericTestUtils.waitFor(() ->
om.getKeyManager().getSnapshotDeletingService().getThreadCount() == 0, 1000,
- 100000);
- KeyManager keyManager = Mockito.spy(om.getKeyManager());
- OmMetadataManagerImpl omMetadataManager =
Mockito.spy((OmMetadataManagerImpl)om.getMetadataManager());
- SnapshotChainManager unMockedSnapshotChainManager =
-
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager();
- SnapshotChainManager snapshotChainManager =
Mockito.spy(unMockedSnapshotChainManager);
- OmSnapshotManager omSnapshotManager =
Mockito.spy(om.getOmSnapshotManager());
- when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
- when(ozoneManager.getKeyManager()).thenReturn(keyManager);
- when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
-
when(omMetadataManager.getSnapshotChainManager()).thenReturn(snapshotChainManager);
- when(keyManager.getDeletingService()).thenReturn(keyDeletingService);
-
when(keyManager.getDirDeletingService()).thenReturn(directoryDeletingService);
- SnapshotDeletingService snapshotDeletingService = Mockito.spy(new
SnapshotDeletingService(10000,
- 100000, ozoneManager));
- snapshotDeletingService.shutdown();
- GenericTestUtils.waitFor(() -> snapshotDeletingService.getThreadCount() ==
0, 1000,
- 100000);
- when(snapshotChainManager.iterator(anyBoolean())).thenAnswer(i -> {
- Iterator<UUID> itr = (Iterator<UUID>) i.callRealMethod();
- return Lists.newArrayList(itr).stream().filter(uuid -> {
- try {
- SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(om,
snapshotChainManager, uuid);
- return snapshotInfo.getBucketName().equals(testBucket.getName()) &&
- snapshotInfo.getVolumeName().equals(testBucket.getVolumeName());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }).iterator();
- });
- when(snapshotChainManager.getLatestGlobalSnapshotId())
- .thenAnswer(i ->
unMockedSnapshotChainManager.getLatestGlobalSnapshotId());
- when(snapshotChainManager.getOldestGlobalSnapshotId())
- .thenAnswer(i ->
unMockedSnapshotChainManager.getOldestGlobalSnapshotId());
- doAnswer(i -> {
- // KDS wait block reached in SDS.
- GenericTestUtils.waitFor(() -> {
- return keyDeletingService.isRunningOnAOS();
- }, 1000, 100000);
- keyDeletionWaitStarted.set(true);
- return i.callRealMethod();
- }).when(snapshotDeletingService).waitForKeyDeletingService();
- doAnswer(i -> {
- // DDS wait block reached in SDS.
- GenericTestUtils.waitFor(directoryDeletingService::isRunningOnAOS, 1000,
100000);
- dirDeletionWaitStarted.set(true);
- return i.callRealMethod();
- }).when(snapshotDeletingService).waitForDirDeletingService();
- doAnswer(i -> {
- // Assert KDS & DDS is not running when SDS starts moving entries &
assert all wait block, KDS processing
- // AOS block & DDS AOS block have been executed.
- Assertions.assertTrue(keyDeletionWaitStarted.get());
- Assertions.assertTrue(dirDeletionWaitStarted.get());
- Assertions.assertTrue(keyDeletionStarted.get());
- Assertions.assertTrue(dirDeletionStarted.get());
- Assertions.assertFalse(keyDeletingService.isRunningOnAOS());
- Assertions.assertFalse(directoryDeletingService.isRunningOnAOS());
- snapshotDeletionStarted.set(true);
- return i.callRealMethod();
- }).when(omSnapshotManager).getSnapshot(anyString(), anyString(),
anyString());
- return snapshotDeletingService;
- }
-
- @Test
- @Order(4)
- @Flaky("HDDS-11847")
- public void testParallelExcecutionOfKeyDeletionAndSnapshotDeletion() throws
Exception {
- AtomicBoolean keyDeletionWaitStarted = new AtomicBoolean(false);
- AtomicBoolean dirDeletionWaitStarted = new AtomicBoolean(false);
- AtomicBoolean keyDeletionStarted = new AtomicBoolean(false);
- AtomicBoolean dirDeletionStarted = new AtomicBoolean(false);
- AtomicBoolean snapshotDeletionStarted = new AtomicBoolean(false);
- Random random = new Random();
- String bucketName = "bucket" + random.nextInt();
- BucketArgs bucketArgs = new BucketArgs.Builder()
- .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
- .build();
- OzoneBucket testBucket = TestDataUtil.createBucket(
- client, VOLUME_NAME, bucketArgs, bucketName);
- // mock keyDeletingService
- KeyDeletingService keyDeletingService =
getMockedKeyDeletingService(keyDeletionWaitStarted, keyDeletionStarted);
-
- // mock dirDeletingService
- DirectoryDeletingService directoryDeletingService =
getMockedDirectoryDeletingService(dirDeletionWaitStarted,
- dirDeletionStarted);
-
- // mock snapshotDeletingService.
- SnapshotDeletingService snapshotDeletingService =
getMockedSnapshotDeletingService(keyDeletingService,
- directoryDeletingService, snapshotDeletionStarted,
keyDeletionWaitStarted, dirDeletionWaitStarted,
- keyDeletionStarted, dirDeletionStarted, testBucket);
- createSnapshotFSODataForBucket(testBucket);
- List<Table.KeyValue<String, String>> renamesKeyEntries;
- List<Table.KeyValue<String, List<OmKeyInfo>>> deletedKeyEntries;
- List<Table.KeyValue<String, OmKeyInfo>> deletedDirEntries;
- try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
- om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(),
testBucket.getName(),
- testBucket.getName() + "snap2")) {
- renamesKeyEntries =
snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
- testBucket.getName(), "", (kv) -> true, 1000);
- deletedKeyEntries =
snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
- testBucket.getName(), "", (kv) -> true, 1000);
- deletedDirEntries =
snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
- testBucket.getName(), 1000);
- }
- Thread keyDeletingThread = new Thread(() -> {
- try {
- keyDeletingService.runPeriodicalTaskNow();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- Thread directoryDeletingThread = new Thread(() -> {
- try {
- directoryDeletingService.runPeriodicalTaskNow();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- ExecutorService snapshotDeletingThread = Executors.newFixedThreadPool(1);
- Runnable snapshotDeletionRunnable = () -> {
- try {
- snapshotDeletingService.runPeriodicalTaskNow();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- keyDeletingThread.start();
- directoryDeletingThread.start();
- Future<?> future = snapshotDeletingThread.submit(snapshotDeletionRunnable);
- GenericTestUtils.waitFor(snapshotDeletionStarted::get, 1000, 30000);
- future.get();
- try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
- om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(),
testBucket.getName(),
- testBucket.getName() + "snap2")) {
- Assertions.assertEquals(Collections.emptyList(),
-
snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
- testBucket.getName(), "", (kv) -> true, 1000));
- Assertions.assertEquals(Collections.emptyList(),
-
snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
- testBucket.getName(), "", (kv) -> true, 1000));
- Assertions.assertEquals(Collections.emptyList(),
-
snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
- testBucket.getName(), 1000));
- }
- List<Table.KeyValue<String, String>> aosRenamesKeyEntries =
- om.getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(),
- testBucket.getName(), "", (kv) -> true, 1000);
- List<Table.KeyValue<String, List<OmKeyInfo>>> aosDeletedKeyEntries =
- om.getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(),
- testBucket.getName(), "", (kv) -> true, 1000);
- List<Table.KeyValue<String, OmKeyInfo>> aosDeletedDirEntries =
- om.getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(),
- testBucket.getName(), 1000);
- renamesKeyEntries.forEach(entry ->
Assertions.assertTrue(aosRenamesKeyEntries.contains(entry)));
- deletedKeyEntries.forEach(entry ->
Assertions.assertTrue(aosDeletedKeyEntries.contains(entry)));
- deletedDirEntries.forEach(entry ->
Assertions.assertTrue(aosDeletedDirEntries.contains(entry)));
- Mockito.reset(snapshotDeletingService);
- SnapshotInfo snap2 = SnapshotUtils.getSnapshotInfo(om,
testBucket.getVolumeName(),
- testBucket.getName(), testBucket.getName() + "snap2");
- Assertions.assertEquals(snap2.getSnapshotStatus(),
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
- future = snapshotDeletingThread.submit(snapshotDeletionRunnable);
- future.get();
- Assertions.assertThrows(IOException.class, () ->
SnapshotUtils.getSnapshotInfo(om, testBucket.getVolumeName(),
- testBucket.getName(), testBucket.getName() + "snap2"));
- cluster.restartOzoneManager();
- }
-
/*
Flow
----
@@ -846,74 +624,166 @@ private synchronized void
createSnapshotDataForBucket(OzoneBucket bucket) throws
bucket.getName()));
}
- /*
- Flow
- ----
- create dir0/key0
- create dir1/key1
- overwrite dir0/key0
- create dir2/key2
- create snap1
- rename dir1/key1 -> dir1/key10
- delete dir1/key10
- delete dir2
- create snap2
- delete snap2
- */
- private synchronized void createSnapshotFSODataForBucket(OzoneBucket bucket)
throws Exception {
- Table<String, SnapshotInfo> snapshotInfoTable =
- om.getMetadataManager().getSnapshotInfoTable();
- Table<String, RepeatedOmKeyInfo> deletedTable =
- om.getMetadataManager().getDeletedTable();
- Table<String, OmKeyInfo> deletedDirTable =
- om.getMetadataManager().getDeletedDirTable();
- Table<String, OmKeyInfo> keyTable =
-
om.getMetadataManager().getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
- Table<String, OmDirectoryInfo> dirTable =
- om.getMetadataManager().getDirectoryTable();
- Table<String, String> renameTable =
om.getMetadataManager().getSnapshotRenamedTable();
- OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
- om.getMetadataManager();
- Map<String, Integer> countMap =
- metadataManager.listTables().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> {
- try {
- return (int)metadataManager.countRowsInTable(e.getValue());
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }));
- TestDataUtil.createKey(bucket, "dir0/" + bucket.getName() + "key0",
CONTENT.array());
- TestDataUtil.createKey(bucket, "dir1/" + bucket.getName() + "key1",
CONTENT.array());
- assertTableRowCount(keyTable, countMap.get(keyTable.getName()) + 2);
- assertTableRowCount(dirTable, countMap.get(dirTable.getName()) + 2);
+ private MockedConstruction<ReclaimableKeyFilter>
getMockedReclaimableKeyFilter(String volume, String bucket,
+ AtomicBoolean kdsWaitStarted, AtomicBoolean sdsLockWaitStarted,
+ AtomicBoolean sdsLockAcquired, AtomicBoolean kdsFinished,
ReclaimableKeyFilter keyFilter) throws IOException {
+
+ return mockConstruction(ReclaimableKeyFilter.class,
+ (mocked, context) -> {
+ when(mocked.apply(any())).thenAnswer(i -> {
+ Table.KeyValue<String, OmKeyInfo> keyInfo = i.getArgument(0);
+ if (!keyInfo.getValue().getVolumeName().equals(volume) ||
+ !keyInfo.getValue().getBucketName().equals(bucket)) {
+ return keyFilter.apply(i.getArgument(0));
+ }
+ keyFilter.apply(i.getArgument(0));
+ //Notify SDS that Kds has started for the bucket.
+ kdsWaitStarted.set(true);
+ GenericTestUtils.waitFor(sdsLockWaitStarted::get, 1000, 10000);
+ // Wait for 1 more second so that the command moves to lock wait.
+ Thread.sleep(1000);
+ return keyFilter.apply(i.getArgument(0));
+ });
+ doAnswer(i -> {
+ assertTrue(sdsLockWaitStarted.get());
+ assertFalse(sdsLockAcquired.get());
+ kdsFinished.set(true);
+ keyFilter.close();
+ return null;
+ }).when(mocked).close();
+ when(mocked.getExclusiveReplicatedSizeMap()).thenAnswer(i ->
keyFilter.getExclusiveReplicatedSizeMap());
+ when(mocked.getExclusiveSizeMap()).thenAnswer(i ->
keyFilter.getExclusiveSizeMap());
+ });
+ }
- // Overwrite bucket1key0, This is a newer version of the key which should
- // reclaimed as this is a different version of the key.
- TestDataUtil.createKey(bucket, "dir0/" + bucket.getName() + "key0",
CONTENT.array());
- TestDataUtil.createKey(bucket, "dir2/" + bucket.getName() + "key2",
CONTENT.array());
- assertTableRowCount(keyTable, countMap.get(keyTable.getName()) + 3);
- assertTableRowCount(dirTable, countMap.get(dirTable.getName()) + 3);
- assertTableRowCount(deletedTable, countMap.get(deletedTable.getName()) +
1);
- // create snap1
- client.getProxy().createSnapshot(bucket.getVolumeName(), bucket.getName(),
- bucket.getName() + "snap1");
- bucket.renameKey("dir1/" + bucket.getName() + "key1", "dir1/" +
bucket.getName() + "key10");
- bucket.renameKey("dir1/", "dir10/");
- assertTableRowCount(renameTable, countMap.get(renameTable.getName()) + 2);
- client.getProxy().deleteKey(bucket.getVolumeName(), bucket.getName(),
- "dir10/" + bucket.getName() + "key10", false);
- assertTableRowCount(deletedTable, countMap.get(deletedTable.getName()) +
1);
- // Key 2 is deleted here, which will be reclaimed here as
- // it is not being referenced by previous snapshot.
- client.getProxy().deleteKey(bucket.getVolumeName(), bucket.getName(),
"dir2", true);
- assertTableRowCount(deletedDirTable,
countMap.get(deletedDirTable.getName()) + 1);
- client.getProxy().createSnapshot(bucket.getVolumeName(), bucket.getName(),
- bucket.getName() + "snap2");
- // Delete Snapshot 2.
- client.getProxy().deleteSnapshot(bucket.getVolumeName(), bucket.getName(),
- bucket.getName() + "snap2");
- assertTableRowCount(snapshotInfoTable,
countMap.get(snapshotInfoTable.getName()) + 2);
+ @ParameterizedTest
+ @CsvSource({"true, 0", "true, 1", "false, 0", "false, 1", "false, 2"})
+ @DisplayName("Tests Snapshot Deleting Service while KeyDeletingService is
already running.")
+ @Order(4)
+ public void testSnapshotDeletingServiceWaitsForKeyDeletingService(boolean
kdsRunningOnAOS,
+ int snasphotDeleteIndex) throws Exception {
+ SnapshotChainManager snapshotChainManager =
+
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager();
+ GenericTestUtils.waitFor(() -> {
+ try {
+ Iterator<UUID> itr = snapshotChainManager.iterator(false);
+ while (itr.hasNext()) {
+ SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(om,
snapshotChainManager, itr.next());
+ assertEquals(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE,
snapshotInfo.getSnapshotStatus());
+ }
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 1000, 30000);
+ om.awaitDoubleBufferFlush();
+ // Suspend the services first
+ om.getKeyManager().getDirDeletingService().suspend();
+ om.getKeyManager().getDeletingService().suspend();
+ om.getKeyManager().getSnapshotDeletingService().suspend();
+ String volume = "vol" + RandomStringUtils.secure().nextNumeric(3),
+ bucket = "bucket" + RandomStringUtils.secure().nextNumeric(3);
+ client.getObjectStore().createVolume(volume);
+ OzoneVolume ozoneVolume = client.getObjectStore().getVolume(volume);
+ ozoneVolume.createBucket(bucket);
+ OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucket);
+
+ // Create snap0
+ client.getObjectStore().createSnapshot(volume, bucket, "snap0");
+ client.getObjectStore().getSnapshotInfo(volume, bucket, "snap0");
+ UUID snap1Id = client.getObjectStore().getSnapshotInfo(volume, bucket,
"snap0").getSnapshotId();
+
+ // Create snap1
+ TestDataUtil.createKey(ozoneBucket, "key", CONTENT.array());
+ client.getObjectStore().createSnapshot(volume, bucket, "snap1");
+ UUID snap2Id = client.getObjectStore().getSnapshotInfo(volume, bucket,
"snap1").getSnapshotId();
+
+ ozoneBucket.renameKey("key", "renamedKey");
+ ozoneBucket.deleteKey("renamedKey");
+ om.awaitDoubleBufferFlush();
+ UUID snap3Id;
+ ReclaimableKeyFilter keyFilter;
+ SnapshotInfo snapInfo;
+ // Create snap3 to test snapshot 3 deep cleaning otherwise just run on AOS.
+ if (kdsRunningOnAOS) {
+ snap3Id = null;
+ snapInfo = null;
+ keyFilter = new ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
+ snapInfo, om.getKeyManager(), om.getMetadataManager().getLock());
+ } else {
+
+ client.getObjectStore().createSnapshot(volume, bucket, "snap2");
+ snap3Id = client.getObjectStore().getSnapshotInfo(volume, bucket,
"snap2").getSnapshotId();
+ om.awaitDoubleBufferFlush();
+ SnapshotInfo snap = om.getMetadataManager().getSnapshotInfo(volume,
bucket, "snap2");
+ snap.setDeepCleanedDeletedDir(true);
+ om.getMetadataManager().getSnapshotInfoTable().put(snap.getTableKey(),
snap);
+ assertTrue(om.getMetadataManager().getSnapshotInfo(volume, bucket,
"snap2")
+ .isDeepCleanedDeletedDir());
+ snapInfo = SnapshotUtils.getSnapshotInfo(om, volume, bucket, "snap2");
+ keyFilter = new ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
+ snapInfo, getOmSnapshot(volume, bucket,
"snap2").get().getKeyManager(),
+ om.getMetadataManager().getLock());
+ }
+
+
+ MultiSnapshotLocks sdsMultiLocks = new
MultiSnapshotLocks(cluster.getOzoneManager().getMetadataManager().getLock(),
+ SNAPSHOT_GC_LOCK, true);
+ AtomicBoolean kdsWaitStarted = new AtomicBoolean(false);
+ AtomicBoolean kdsFinished = new AtomicBoolean(false);
+ AtomicBoolean sdsLockWaitStarted = new AtomicBoolean(false);
+ AtomicBoolean sdsLockAcquired = new AtomicBoolean(false);
+
+ try (MockedConstruction<MultiSnapshotLocks> mockedMultiSnapshotLock =
mockConstruction(MultiSnapshotLocks.class,
+ (mocked, context) -> {
+ when(mocked.acquireLock(anyList())).thenAnswer(i -> {
+ List<UUID> ids = i.getArgument(0);
+ List<UUID> expectedIds = Arrays.asList(snap1Id, snap2Id,
snap3Id).subList(snasphotDeleteIndex, Math.min(3,
+ snasphotDeleteIndex +
2)).stream().filter(Objects::nonNull).collect(Collectors.toList());
+ if (expectedIds.equals(ids) && !sdsLockWaitStarted.get() &&
!sdsLockAcquired.get()) {
+ sdsLockWaitStarted.set(true);
+ OMLockDetails lockDetails = sdsMultiLocks.acquireLock(ids);
+ assertTrue(kdsFinished::get);
+ sdsLockAcquired.set(true);
+ return lockDetails;
+ }
+ return sdsMultiLocks.acquireLock(ids);
+ });
+ doAnswer(i -> {
+ sdsMultiLocks.releaseLock();
+ return null;
+ }).when(mocked).releaseLock();
+ })) {
+ KeyDeletingService kds = new KeyDeletingService(om,
om.getScmClient().getBlockClient(), 500, 10000,
+ om.getConfiguration(), 1, true);
+ kds.shutdown();
+ KeyDeletingService.KeyDeletingTask task = kds.new
KeyDeletingTask(snap3Id);
+
+ CompletableFuture.supplyAsync(() -> {
+ try (MockedConstruction<ReclaimableKeyFilter> mockedReclaimableFilter
= getMockedReclaimableKeyFilter(
+ volume, bucket, kdsWaitStarted, sdsLockWaitStarted,
sdsLockAcquired, kdsFinished, keyFilter)) {
+ return task.call();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ SnapshotDeletingService sds = new SnapshotDeletingService(500, 10000,
om);
+ sds.shutdown();
+ GenericTestUtils.waitFor(kdsWaitStarted::get, 1000, 30000);
+ client.getObjectStore().deleteSnapshot(volume, bucket, "snap" +
snasphotDeleteIndex);
+ sds.runPeriodicalTaskNow();
+ om.awaitDoubleBufferFlush();
+ if (snasphotDeleteIndex == 2) {
+ sds.runPeriodicalTaskNow();
+ }
+ assertTrue(sdsLockWaitStarted.get());
+ assertTrue(sdsLockAcquired.get());
+ assertThrows(IOException.class, () -> SnapshotUtils.getSnapshotInfo(om,
volume, bucket,
+ "snap" + snasphotDeleteIndex));
+ }
}
private void verifySnapshotChain(SnapshotInfo deletedSnapshot,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index f96099323d..28668eb686 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -39,7 +39,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -149,7 +148,6 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
// from parent directory info from deleted directory table concurrently
// and send deletion requests.
private int ratisByteLimit;
- private final AtomicBoolean isRunningOnAOS;
private final SnapshotChainManager snapshotChainManager;
private final boolean deepCleanSnapshots;
private final ExecutorService deletionThreadPool;
@@ -173,7 +171,6 @@ public DirectoryDeletingService(long interval, TimeUnit
unit,
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * 0.9);
- this.isRunningOnAOS = new AtomicBoolean(false);
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(),
configuration);
this.snapshotChainManager =
((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
this.deepCleanSnapshots = deepCleanSnapshots;
@@ -200,14 +197,10 @@ private synchronized void
updateAndRestart(OzoneConfiguration conf) {
start();
}
- public boolean isRunningOnAOS() {
- return isRunningOnAOS.get();
- }
-
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- queue.add(new DirDeletingTask(this, null));
+ queue.add(new DirDeletingTask(null));
if (deepCleanSnapshots) {
Iterator<UUID> iterator = null;
try {
@@ -218,7 +211,7 @@ public BackgroundTaskQueue getTasks() {
}
while (iterator.hasNext()) {
UUID snapshotId = iterator.next();
- queue.add(new DirDeletingTask(this, snapshotId));
+ queue.add(new DirDeletingTask(snapshotId));
}
}
return queue;
@@ -468,11 +461,9 @@ private OzoneManagerProtocolProtos.OMResponse
submitPurgePaths(List<PurgePathReq
@VisibleForTesting
final class DirDeletingTask implements BackgroundTask {
- private final DirectoryDeletingService directoryDeletingService;
private final UUID snapshotId;
- private DirDeletingTask(DirectoryDeletingService service, UUID snapshotId)
{
- this.directoryDeletingService = service;
+ DirDeletingTask(UUID snapshotId) {
this.snapshotId = snapshotId;
}
@@ -650,7 +641,6 @@ public BackgroundTaskResult call() {
final long run = getRunCount().incrementAndGet();
if (snapshotId == null) {
LOG.debug("Running DirectoryDeletingService for active object store,
{}", run);
- isRunningOnAOS.set(true);
} else {
LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}",
snapshotId, run);
}
@@ -681,13 +671,6 @@ public BackgroundTaskResult call() {
} catch (IOException | ExecutionException | InterruptedException e) {
LOG.error("Error while running delete files background task for
store {}. Will retry at next run.",
snapInfo, e);
- } finally {
- if (snapshotId == null) {
- isRunningOnAOS.set(false);
- synchronized (directoryDeletingService) {
- this.directoryDeletingService.notify();
- }
- }
}
}
// By design, no one cares about the results of this call back.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index d875e95e85..781184b862 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -34,7 +34,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -87,7 +86,6 @@ public class KeyDeletingService extends
AbstractKeyDeletingService {
private int keyLimitPerTask;
private final AtomicLong deletedKeyCount;
- private final AtomicBoolean isRunningOnAOS;
private final boolean deepCleanSnapshots;
private final SnapshotChainManager snapshotChainManager;
@@ -103,7 +101,6 @@ public KeyDeletingService(OzoneManager ozoneManager,
Preconditions.checkArgument(keyLimitPerTask >= 0,
OZONE_KEY_DELETING_LIMIT_PER_TASK + " cannot be negative.");
this.deletedKeyCount = new AtomicLong(0);
- this.isRunningOnAOS = new AtomicBoolean(false);
this.deepCleanSnapshots = deepCleanSnapshots;
this.snapshotChainManager =
((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
this.scmClient = scmClient;
@@ -119,10 +116,6 @@ public AtomicLong getDeletedKeyCount() {
return deletedKeyCount;
}
- public boolean isRunningOnAOS() {
- return isRunningOnAOS.get();
- }
-
Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksList,
Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntries,
String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException
{
@@ -261,7 +254,7 @@ private Pair<Integer, Boolean>
submitPurgeKeysRequest(List<DeleteBlockGroupResul
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- queue.add(new KeyDeletingTask(this, null));
+ queue.add(new KeyDeletingTask(null));
if (deepCleanSnapshots) {
Iterator<UUID> iterator = null;
try {
@@ -272,7 +265,7 @@ public BackgroundTaskQueue getTasks() {
}
while (iterator.hasNext()) {
UUID snapshotId = iterator.next();
- queue.add(new KeyDeletingTask(this, snapshotId));
+ queue.add(new KeyDeletingTask(snapshotId));
}
}
return queue;
@@ -295,11 +288,9 @@ public void setKeyLimitPerTask(int keyLimitPerTask) {
*/
@VisibleForTesting
final class KeyDeletingTask implements BackgroundTask {
- private final KeyDeletingService deletingService;
private final UUID snapshotId;
- KeyDeletingTask(KeyDeletingService service, UUID snapshotId) {
- this.deletingService = service;
+ KeyDeletingTask(UUID snapshotId) {
this.snapshotId = snapshotId;
}
@@ -427,7 +418,6 @@ public BackgroundTaskResult call() {
final long run = getRunCount().incrementAndGet();
if (snapshotId == null) {
LOG.debug("Running KeyDeletingService for active object store, {}",
run);
- isRunningOnAOS.set(true);
} else {
LOG.debug("Running KeyDeletingService for snapshot : {}, {}",
snapshotId, run);
}
@@ -464,13 +454,6 @@ public BackgroundTaskResult call() {
} catch (IOException e) {
LOG.error("Error while running delete files background task for
store {}. Will retry at next run.",
snapInfo, e);
- } finally {
- if (snapshotId == null) {
- isRunningOnAOS.set(false);
- synchronized (deletingService) {
- this.deletingService.notify();
- }
- }
}
}
// By design, no one cares about the results of this call back.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index 96ae98a19b..75e9a20cdf 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -22,6 +22,7 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
@@ -52,6 +53,8 @@
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
@@ -86,7 +89,8 @@ public class SnapshotDeletingService extends
AbstractKeyDeletingService {
private final int keyLimitPerTask;
private final int snapshotDeletionPerTask;
private final int ratisByteLimit;
- private final long serviceTimeout;
+ private final MultiSnapshotLocks snapshotIdLocks;
+ private final List<UUID> lockIds;
public SnapshotDeletingService(long interval, long serviceTimeout,
OzoneManager ozoneManager)
@@ -112,32 +116,9 @@ public SnapshotDeletingService(long interval, long
serviceTimeout,
this.keyLimitPerTask = conf.getInt(
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK,
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
- this.serviceTimeout = serviceTimeout;
- }
-
- // Wait for a notification from KeyDeletingService if the key deletion is
running. This is to ensure, merging of
- // entries do not start while the AOS is still processing the deleted keys.
- @VisibleForTesting
- public void waitForKeyDeletingService() throws InterruptedException {
- KeyDeletingService keyDeletingService =
getOzoneManager().getKeyManager().getDeletingService();
- synchronized (keyDeletingService) {
- while (keyDeletingService.isRunningOnAOS()) {
- keyDeletingService.wait(serviceTimeout);
- }
- }
- }
-
- // Wait for a notification from DirectoryDeletingService if the directory
deletion is running. This is to ensure,
- // merging of entries do not start while the AOS is still processing the
deleted keys.
- @VisibleForTesting
- public void waitForDirDeletingService() throws InterruptedException {
- DirectoryDeletingService directoryDeletingService =
getOzoneManager().getKeyManager()
- .getDirDeletingService();
- synchronized (directoryDeletingService) {
- while (directoryDeletingService.isRunningOnAOS()) {
- directoryDeletingService.wait(serviceTimeout);
- }
- }
+ IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock();
+ this.snapshotIdLocks = new MultiSnapshotLocks(lock, SNAPSHOT_GC_LOCK,
true);
+ this.lockIds = new ArrayList<>(2);
}
private class SnapshotDeletingTask implements BackgroundTask {
@@ -173,16 +154,22 @@ public BackgroundTaskResult call() throws
InterruptedException {
snapInfo.getTableKey());
continue;
}
-
// nextSnapshot = null means entries would be moved to AOS.
if (nextSnapshot == null) {
LOG.info("Snapshot: {} entries will be moved to AOS.",
snapInfo.getTableKey());
- waitForKeyDeletingService();
- waitForDirDeletingService();
} else {
LOG.info("Snapshot: {} entries will be moved to next active
snapshot: {}",
snapInfo.getTableKey(), nextSnapshot.getTableKey());
}
+ lockIds.clear();
+ lockIds.add(snapInfo.getSnapshotId());
+ if (nextSnapshot != null) {
+ lockIds.add(nextSnapshot.getSnapshotId());
+ }
+ // Acquire write lock on current snapshot and next snapshot in chain.
+ if (!snapshotIdLocks.acquireLock(lockIds).isLockAcquired()) {
+ continue;
+ }
try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
omSnapshotManager.getSnapshot(
snapInfo.getVolumeName(), snapInfo.getBucketName(),
snapInfo.getName())) {
KeyManager snapshotKeyManager = snapshot.get().getKeyManager();
@@ -229,6 +216,8 @@ public BackgroundTaskResult call() throws
InterruptedException {
} else {
snapshotsToBePurged.add(snapInfo.getTableKey());
}
+ } finally {
+ snapshotIdLocks.releaseLock();
}
successRunCount.incrementAndGet();
snapshotLimit--;
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index b3178580b5..674c78d16a 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -347,7 +347,7 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
keyDeletingService.suspend();
SnapshotDeletingService snapshotDeletingService =
om.getKeyManager().getSnapshotDeletingService();
snapshotDeletingService.suspend();
- GenericTestUtils.waitFor(() -> !keyDeletingService.isRunningOnAOS(),
1000, 10000);
+
final String volumeName = getTestName();
final String bucketName = uniqueObjectName("bucket");
OzoneManager ozoneManager = Mockito.spy(om);
@@ -618,7 +618,7 @@ public void
testKeyDeletingServiceWithDeepCleanedSnapshots() throws Exception {
when(kds.getTasks()).thenAnswer(i -> {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
for (UUID id : snapshotIds) {
- queue.add(kds.new KeyDeletingTask(kds, id));
+ queue.add(kds.new KeyDeletingTask(id));
}
return queue;
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]