This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e936e4deb10 HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap
(#8474)
e936e4deb10 is described below
commit e936e4deb10c800bd53eef4a39d7634f9ab5ec9b
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Sun Jun 8 18:06:27 2025 -0400
HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap (#8474)
---
.../hadoop/ozone/om/lock/IOzoneManagerLock.java | 4 +
.../hadoop/ozone/om/lock/OmReadOnlyLock.java | 10 ++
.../hadoop/ozone/om/lock/OzoneManagerLock.java | 53 ++++++++---
.../hadoop/ozone/om/lock/TestOzoneManagerLock.java | 104 +++++++++++++++++++--
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 2 +-
.../hadoop/ozone/om/SstFilteringService.java | 8 +-
.../response/snapshot/OMSnapshotPurgeResponse.java | 8 +-
.../hadoop/ozone/om/snapshot/SnapshotCache.java | 70 ++++++++++++--
.../ozone/om/snapshot/TestSnapshotCache.java | 61 +++++++++++-
.../ozone/om/snapshot/TestSnapshotDiffManager.java | 4 +-
10 files changed, 278 insertions(+), 46 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
index 808b9a4321a..7e8ed7c7817 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
@@ -36,6 +36,8 @@ OMLockDetails acquireWriteLock(Resource resource,
OMLockDetails acquireWriteLocks(Resource resource,
Collection<String[]> resources);
+ OMLockDetails acquireResourceWriteLock(Resource resource);
+
boolean acquireMultiUserLock(String firstUser, String secondUser);
void releaseMultiUserLock(String firstUser, String secondUser);
@@ -46,6 +48,8 @@ OMLockDetails releaseWriteLock(Resource resource,
OMLockDetails releaseWriteLocks(Resource resource,
Collection<String[]> resources);
+ OMLockDetails releaseResourceWriteLock(Resource resource);
+
OMLockDetails releaseReadLock(Resource resource,
String... resources);
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
index 0c289cb1889..faf5ca99b8c 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
@@ -50,6 +50,11 @@ public OMLockDetails acquireWriteLocks(Resource resource,
Collection<String[]> r
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}
+ @Override
+ public OMLockDetails acquireResourceWriteLock(Resource resource) {
+ return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
+ }
+
@Override
public boolean acquireMultiUserLock(String firstUser, String secondUser) {
return false;
@@ -71,6 +76,11 @@ public OMLockDetails releaseWriteLocks(Resource resource,
Collection<String[]> r
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}
+ @Override
+ public OMLockDetails releaseResourceWriteLock(Resource resource) {
+ return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
+ }
+
@Override
public OMLockDetails releaseReadLock(Resource resource, String... resources)
{
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 2a62836bed9..6cd96f73238 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -37,7 +37,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.Pair;
@@ -140,9 +142,11 @@ private Striped<ReadWriteLock> createStripeLock(Resource r,
return SimpleStriped.readWriteLock(size, fair);
}
- private Iterable<ReadWriteLock> bulkGetLock(Map<Resource,
Striped<ReadWriteLock>> lockMap, Resource resource,
- Collection<String[]> keys) {
- Striped<ReadWriteLock> striped = lockMap.get(resource);
+ private Iterable<ReadWriteLock> getAllLocks(Striped<ReadWriteLock> striped) {
+ return IntStream.range(0,
striped.size()).mapToObj(striped::getAt).collect(Collectors.toList());
+ }
+
+ private Iterable<ReadWriteLock> bulkGetLock(Striped<ReadWriteLock> striped,
Collection<String[]> keys) {
List<Object> lockKeys = new ArrayList<>(keys.size());
for (String[] key : keys) {
if (Objects.nonNull(key)) {
@@ -200,7 +204,7 @@ public OMLockDetails acquireReadLock(Resource resource,
String... keys) {
*/
@Override
public OMLockDetails acquireReadLocks(Resource resource,
Collection<String[]> keys) {
- return acquireLocks(resource, true, keys);
+ return acquireLocks(resource, true, striped -> bulkGetLock(striped, keys));
}
/**
@@ -244,7 +248,17 @@ public OMLockDetails acquireWriteLock(Resource resource,
String... keys) {
*/
@Override
public OMLockDetails acquireWriteLocks(Resource resource,
Collection<String[]> keys) {
- return acquireLocks(resource, false, keys);
+ return acquireLocks(resource, false, striped -> bulkGetLock(striped,
keys));
+ }
+
+ /**
+ * Acquires all write locks for a specified resource.
+ *
+ * @param resource The resource for which the write lock is to be acquired.
+ */
+ @Override
+ public OMLockDetails acquireResourceWriteLock(Resource resource) {
+ return acquireLocks(resource, false, this::getAllLocks);
}
private void acquireLock(Resource resource, boolean isReadLock,
ReadWriteLock lock,
@@ -258,7 +272,8 @@ private void acquireLock(Resource resource, boolean
isReadLock, ReadWriteLock lo
}
}
- private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
Collection<String[]> keys) {
+ private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
+ Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>>
lockListProvider) {
Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
resourceLockPair =
resourcelockMap.get(resource.getClass());
ResourceLockManager<Resource> resourceLockManager =
resourceLockPair.getRight();
@@ -271,7 +286,7 @@ private OMLockDetails acquireLocks(Resource resource,
boolean isReadLock, Collec
long startWaitingTimeNanos = Time.monotonicNowNanos();
- for (ReadWriteLock lock : bulkGetLock(resourceLockPair.getKey(), resource,
keys)) {
+ for (ReadWriteLock lock :
lockListProvider.apply(resourceLockPair.getKey().get(resource))) {
acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
}
return resourceLockManager.lockResource(resource);
@@ -342,7 +357,6 @@ private String getErrorMessage(Resource resource) {
return "Thread '" + Thread.currentThread().getName() + "' cannot " +
"acquire " + resource.getName() + " lock while holding " +
getCurrentLocks().toString() + " lock(s).";
-
}
@VisibleForTesting
@@ -397,7 +411,17 @@ public OMLockDetails releaseWriteLock(Resource resource,
String... keys) {
*/
@Override
public OMLockDetails releaseWriteLocks(Resource resource,
Collection<String[]> keys) {
- return releaseLocks(resource, false, keys);
+ return releaseLocks(resource, false, striped -> bulkGetLock(striped,
keys));
+ }
+
+ /**
+ * Releases a write lock acquired on the entire Stripe for a specified
resource.
+ *
+ * @param resource The resource for which the write lock is to be acquired.
+ */
+ @Override
+ public OMLockDetails releaseResourceWriteLock(Resource resource) {
+ return releaseLocks(resource, false, this::getAllLocks);
}
/**
@@ -423,7 +447,7 @@ public OMLockDetails releaseReadLock(Resource resource,
String... keys) {
*/
@Override
public OMLockDetails releaseReadLocks(Resource resource,
Collection<String[]> keys) {
- return releaseLocks(resource, true, keys);
+ return releaseLocks(resource, true, striped -> bulkGetLock(striped, keys));
}
private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
@@ -445,12 +469,12 @@ private OMLockDetails releaseLock(Resource resource,
boolean isReadLock,
}
private OMLockDetails releaseLocks(Resource resource, boolean isReadLock,
- Collection<String[]> keys) {
+ Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>>
lockListProvider) {
Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
resourceLockPair =
resourcelockMap.get(resource.getClass());
ResourceLockManager<Resource> resourceLockManager =
resourceLockPair.getRight();
resourceLockManager.clearLockDetails();
- List<ReadWriteLock> locks =
StreamSupport.stream(bulkGetLock(resourceLockPair.getKey(), resource, keys)
+ List<ReadWriteLock> locks =
StreamSupport.stream(lockListProvider.apply(resourceLockPair.getKey().get(resource))
.spliterator(), false).collect(Collectors.toList());
// Release locks in reverse order.
Collections.reverse(locks);
@@ -558,7 +582,10 @@ public OMLockMetrics getOMLockMetrics() {
* Flat Resource defined in Ozone. Locks can be acquired on a resource
independent of one another.
*/
public enum FlatResource implements Resource {
- SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK");
+ // Background services lock on a Snapshot.
+ SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"),
+ // Lock acquired on a Snapshot's RocksDB Handle.
+ SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK");
private String name;
private ResourceManager resourceManager;
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
index 500a96e29a4..a1d853eb6b3 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
@@ -31,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
@@ -39,7 +40,9 @@
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Class tests OzoneManagerLock.
@@ -278,34 +281,119 @@ void acquireUserLockAfterMultiUserLock() {
lock.releaseMultiUserLock("user1", "user2");
}
- @Test
- void testLockResourceParallel() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testLockResourceParallel(boolean fullResourceLock) throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
- for (LeveledResource resource :
- LeveledResource.values()) {
+ for (Resource resource : Stream.of(LeveledResource.values(),
FlatResource.values())
+ .flatMap(Arrays::stream).collect(Collectors.toList())) {
final String[] resourceName = generateResourceName(resource);
- lock.acquireWriteLock(resource, resourceName);
+ if (fullResourceLock) {
+ lock.acquireResourceWriteLock(resource);
+ } else {
+ lock.acquireWriteLock(resource, resourceName);
+ }
AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
- lock.acquireWriteLock(resource, resourceName);
+ if (fullResourceLock) {
+ lock.acquireResourceWriteLock(resource);
+ } else {
+ lock.acquireWriteLock(resource, resourceName);
+ }
gotLock.set(true);
- lock.releaseWriteLock(resource, resourceName);
+ if (fullResourceLock) {
+ lock.releaseResourceWriteLock(resource);
+ } else {
+ lock.releaseWriteLock(resource, resourceName);
+ }
+
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same resource,
// it will wait.
assertFalse(gotLock.get());
- lock.releaseWriteLock(resource, resourceName);
+ if (fullResourceLock) {
+ lock.releaseResourceWriteLock(resource);
+ } else {
+ lock.releaseWriteLock(resource, resourceName);
+ }
// Since we have released the lock, the new thread should have the lock
// now.
// Let's give some time for the new thread to run
Thread.sleep(100);
assertTrue(gotLock.get());
}
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {
+ "true, true",
+ "true, false",
+ "false, true",
+ "false, false"
+ })
+ void testResourceLockFullResourceLockParallel(boolean
mainThreadAcquireResourceLock, boolean acquireWriteLock)
+ throws Exception {
+ OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+
+ for (Resource resource : Stream.of(LeveledResource.values(),
FlatResource.values())
+ .flatMap(Arrays::stream).collect(Collectors.toList())) {
+ final String[] resourceName = generateResourceName(resource);
+ if (mainThreadAcquireResourceLock) {
+ lock.acquireResourceWriteLock(resource);
+ } else {
+ if (acquireWriteLock) {
+ lock.acquireWriteLock(resource, resourceName);
+ } else {
+ lock.acquireReadLock(resource, resourceName);
+ }
+ }
+ AtomicBoolean gotLock = new AtomicBoolean(false);
+ new Thread(() -> {
+ if (!mainThreadAcquireResourceLock) {
+ lock.acquireResourceWriteLock(resource);
+ } else {
+ if (acquireWriteLock) {
+ lock.acquireWriteLock(resource, resourceName);
+ } else {
+ lock.acquireReadLock(resource, resourceName);
+ }
+ }
+ gotLock.set(true);
+ if (!mainThreadAcquireResourceLock) {
+ lock.releaseResourceWriteLock(resource);
+ } else {
+ if (acquireWriteLock) {
+ lock.releaseWriteLock(resource, resourceName);
+ } else {
+ lock.releaseReadLock(resource, resourceName);
+ }
+ }
+ }).start();
+ // Let's give some time for the new thread to run
+ Thread.sleep(100);
+ // Since the new thread is trying to get lock on same resource,
+ // it will wait.
+ assertFalse(gotLock.get());
+ if (mainThreadAcquireResourceLock) {
+ lock.releaseResourceWriteLock(resource);
+ } else {
+ if (acquireWriteLock) {
+ lock.releaseWriteLock(resource, resourceName);
+ } else {
+ lock.releaseReadLock(resource, resourceName);
+ }
+ }
+ // Since we have released the lock, the new thread should have the lock
+ // now.
+ // Let's give some time for the new thread to run
+ Thread.sleep(100);
+ assertTrue(gotLock.get());
+ }
}
@Test
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 262750e5c2f..1de509e9939 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -295,7 +295,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
.getBoolean(OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES,
OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT);
this.snapshotCache = new SnapshotCache(loader, softCacheSize,
ozoneManager.getMetrics(),
- cacheCleanupServiceInterval, compactNonSnapshotDiffTables);
+ cacheCleanupServiceInterval, compactNonSnapshotDiffTables,
ozoneManager.getMetadataManager().getLock());
this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
ozoneManager, snapDiffJobCf, snapDiffReportCf,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index ea46366d918..b94fd45bf7f 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -19,7 +19,7 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
-import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
import com.google.common.annotations.VisibleForTesting;
@@ -135,8 +135,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo
snapshotInfo) throws IO
// in OmSnapshotPurgeResponse. Any operation apart from delete can run
in parallel along with this operation.
//TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks
to read locks to further optimize it.
OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
- .acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(),
- snapshotInfo.getName());
+ .acquireReadLock(SNAPSHOT_DB_LOCK,
snapshotInfo.getSnapshotId().toString());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
String snapshotDir =
OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(),
snapshotInfo);
@@ -147,8 +146,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo
snapshotInfo) throws IO
}
} finally {
ozoneManager.getMetadataManager().getLock()
- .releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
- snapshotInfo.getBucketName(), snapshotInfo.getName());
+ .releaseReadLock(SNAPSHOT_DB_LOCK,
snapshotInfo.getSnapshotId().toString());
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index 5a530ee1188..2503b291c00 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.om.response.snapshot;
import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE;
-import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
@@ -121,8 +121,7 @@ private void deleteCheckpointDirectory(OMMetadataManager
omMetadataManager,
// inside the snapshot directory. Any operation apart which doesn't
create/delete files under this snapshot
// directory can run in parallel along with this operation.
OMLockDetails omLockDetails = omMetadataManager.getLock()
- .acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(),
- snapshotInfo.getName());
+ .acquireWriteLock(SNAPSHOT_DB_LOCK,
snapshotInfo.getSnapshotId().toString());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Path snapshotDirPath =
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
@@ -132,8 +131,7 @@ private void deleteCheckpointDirectory(OMMetadataManager
omMetadataManager,
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
} finally {
- omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK,
snapshotInfo.getVolumeName(),
- snapshotInfo.getBucketName(), snapshotInfo.getName());
+ omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK,
snapshotInfo.getSnapshotId().toString());
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index 4bdca5d04ff..b465956f35e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.snapshot;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
import com.google.common.annotations.VisibleForTesting;
@@ -28,12 +29,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +62,7 @@ public class SnapshotCache implements
ReferenceCountedCallback, AutoCloseable {
private final int cacheSizeLimit;
private final Set<UUID> pendingEvictionQueue;
private final Scheduler scheduler;
+ private final IOzoneManagerLock lock;
private static final String SNAPSHOT_CACHE_CLEANUP_SERVICE =
"SnapshotCacheCleanupService";
private final boolean compactNonSnapshotDiffTables;
@@ -86,7 +91,7 @@ private void compactSnapshotDB(OmSnapshot snapshot) throws
IOException {
try {
metadataManager.getStore().compactTable(table.getName());
} catch (IOException e) {
- LOG.warn("Failed to compact table {} in snapshot {}: {}",
+ LOG.warn("Failed to compact table {} in snapshot {}: {}",
table.getName(), snapshot.getSnapshotID(), e.getMessage());
}
}
@@ -94,17 +99,18 @@ private void compactSnapshotDB(OmSnapshot snapshot) throws
IOException {
}
public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int
cacheSizeLimit, OMMetrics omMetrics,
- long cleanupInterval, boolean
compactNonSnapshotDiffTables) {
+ long cleanupInterval, boolean
compactNonSnapshotDiffTables, IOzoneManagerLock lock) {
this.dbMap = new ConcurrentHashMap<>();
this.cacheLoader = cacheLoader;
this.cacheSizeLimit = cacheSizeLimit;
this.omMetrics = omMetrics;
+ this.lock = lock;
this.pendingEvictionQueue = ConcurrentHashMap.newKeySet();
this.compactNonSnapshotDiffTables = compactNonSnapshotDiffTables;
if (cleanupInterval > 0) {
this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE,
true, 1);
- this.scheduler.scheduleWithFixedDelay(this::cleanup, cleanupInterval,
+ this.scheduler.scheduleWithFixedDelay(() -> this.cleanup(false),
cleanupInterval,
cleanupInterval, TimeUnit.MILLISECONDS);
} else {
this.scheduler = null;
@@ -172,7 +178,8 @@ public enum Reason {
}
/**
- * Get or load OmSnapshot. Shall be close()d after use.
+ * Get or load OmSnapshot. Shall be close()d after use. This would acquire a
read lock on the Snapshot Database
+ * during the entire lifecycle of the returned OmSnapshot instance.
* TODO: [SNAPSHOT] Can add reason enum to param list later.
* @param key SnapshotId
* @return an OmSnapshot instance, or null on error
@@ -183,6 +190,11 @@ public UncheckedAutoCloseableSupplier<OmSnapshot> get(UUID
key) throws IOExcepti
LOG.warn("Snapshot cache size ({}) exceeds configured soft-limit ({}).",
size(), cacheSizeLimit);
}
+ OMLockDetails lockDetails = lock.acquireReadLock(SNAPSHOT_DB_LOCK,
key.toString());
+ if (!lockDetails.isLockAcquired()) {
+ throw new OMException("Unable to acquire readlock on snapshot db with
key " + key,
+ OMException.ResultCodes.INTERNAL_ERROR);
+ }
// Atomic operation to initialize the OmSnapshot instance (once) if the key
// does not exist, and increment the reference count on the instance.
ReferenceCounted<OmSnapshot> rcOmSnapshot =
@@ -214,11 +226,12 @@ public UncheckedAutoCloseableSupplier<OmSnapshot>
get(UUID key) throws IOExcepti
if (rcOmSnapshot == null) {
// The only exception that would fall through the loader logic above
// is OMException with FILE_NOT_FOUND.
+ lock.releaseReadLock(SNAPSHOT_DB_LOCK, key.toString());
throw new OMException("SnapshotId: '" + key + "' not found, or the
snapshot is no longer active.",
OMException.ResultCodes.FILE_NOT_FOUND);
}
return new UncheckedAutoCloseableSupplier<OmSnapshot>() {
- private AtomicReference<Boolean> closed = new AtomicReference<>(false);
+ private final AtomicReference<Boolean> closed = new
AtomicReference<>(false);
@Override
public OmSnapshot get() {
return rcOmSnapshot.get();
@@ -229,6 +242,7 @@ public void close() {
closed.updateAndGet(alreadyClosed -> {
if (!alreadyClosed) {
rcOmSnapshot.decrementRefCount();
+ lock.releaseReadLock(SNAPSHOT_DB_LOCK, key.toString());
}
return true;
});
@@ -249,21 +263,59 @@ public void release(UUID key) {
val.decrementRefCount();
}
+ /**
+ * Acquires a write lock on the snapshot database and returns an
auto-closeable supplier
+ * for lock details. The lock ensures that the operations accessing the
snapshot database
+ * are performed in a thread-safe manner. The returned supplier
automatically releases the
+ * lock when closed, preventing potential resource contention or deadlocks.
+ */
+ public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
+ return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK),
+ () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK));
+ }
+
+ private UncheckedAutoCloseableSupplier<OMLockDetails> lock(
+ Supplier<OMLockDetails> lockFunction, Supplier<OMLockDetails>
unlockFunction) {
+ AtomicReference<OMLockDetails> lockDetails = new
AtomicReference<>(lockFunction.get());
+ if (lockDetails.get().isLockAcquired()) {
+ cleanup(true);
+ if (!dbMap.isEmpty()) {
+ lockDetails.set(unlockFunction.get());
+ }
+ }
+
+ return new UncheckedAutoCloseableSupplier<OMLockDetails>() {
+
+ @Override
+ public void close() {
+ lockDetails.updateAndGet((prevLock) -> {
+ if (prevLock != null && prevLock.isLockAcquired()) {
+ return unlockFunction.get();
+ }
+ return prevLock;
+ });
+ }
+
+ @Override
+ public OMLockDetails get() {
+ return lockDetails.get();
+ }
+ };
+ }
/**
* If cache size exceeds soft limit, attempt to clean up and close the
instances that has zero reference count.
*/
- @VisibleForTesting
- void cleanup() {
- if (dbMap.size() > cacheSizeLimit) {
+ private synchronized void cleanup(boolean force) {
+ if (force || dbMap.size() > cacheSizeLimit) {
for (UUID evictionKey : pendingEvictionQueue) {
ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
if (snapshot != null && snapshot.getTotalRefCount() == 0) {
try {
compactSnapshotDB(snapshot.get());
} catch (IOException e) {
- LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
+ LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
evictionKey, e.getMessage());
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
index 947d14e1b8b..3018f396d74 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
@@ -17,15 +17,20 @@
package org.apache.hadoop.ozone.om.snapshot;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -41,6 +46,10 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
@@ -50,6 +59,8 @@
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.stubbing.Answer;
import org.slf4j.event.Level;
@@ -61,6 +72,7 @@ class TestSnapshotCache {
private static final int CACHE_SIZE_LIMIT = 3;
private static CacheLoader<UUID, OmSnapshot> cacheLoader;
+ private static IOzoneManagerLock lock;
private SnapshotCache snapshotCache;
private OMMetrics omMetrics;
@@ -93,20 +105,21 @@ static void beforeAll() throws Exception {
tables.add(table2);
tables.add(keyTable);
when(store.listTables()).thenReturn(tables);
-
+
return omSnapshot;
}
);
// Set SnapshotCache log level. Set to DEBUG for verbose output
GenericTestUtils.setLogLevel(SnapshotCache.class, Level.DEBUG);
+ lock = spy(new OmReadOnlyLock());
}
@BeforeEach
void setUp() {
// Reset cache for each test case
omMetrics = OMMetrics.create();
- snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT,
omMetrics, 50, true);
+ snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT,
omMetrics, 50, true, lock);
}
@AfterEach
@@ -128,6 +141,45 @@ void testGet() throws IOException {
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
}
+ @Test
+ @DisplayName("Tests get() fails on read lock failure")
+ public void testGetFailsOnReadLock() throws IOException {
+ final UUID dbKey1 = UUID.randomUUID();
+ final UUID dbKey2 = UUID.randomUUID();
+ when(lock.acquireReadLock(eq(SNAPSHOT_DB_LOCK), eq(dbKey1.toString())))
+ .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED);
+ assertThrows(OMException.class, () -> snapshotCache.get(dbKey1));
+ snapshotCache.get(dbKey2);
+ assertEquals(1, snapshotCache.size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 5, 10})
+ @DisplayName("Tests get() holds a read lock")
+ public void testGetHoldsReadLock(int numberOfLocks) throws IOException {
+ clearInvocations(lock);
+ final UUID dbKey1 = UUID.randomUUID();
+ final UUID dbKey2 = UUID.randomUUID();
+ for (int i = 0; i < numberOfLocks; i++) {
+ snapshotCache.get(dbKey1);
+ snapshotCache.get(dbKey2);
+ }
+ assertEquals(numberOfLocks > 0 ? 2 : 0, snapshotCache.size());
+ verify(lock, times(numberOfLocks)).acquireReadLock(eq(SNAPSHOT_DB_LOCK),
eq(dbKey1.toString()));
+ verify(lock, times(numberOfLocks)).acquireReadLock(eq(SNAPSHOT_DB_LOCK),
eq(dbKey2.toString()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 5, 10})
+ @DisplayName("Tests lock() holds a write lock")
+ public void testGetHoldsWriteLock(int numberOfLocks) {
+ clearInvocations(lock);
+ for (int i = 0; i < numberOfLocks; i++) {
+ snapshotCache.lock();
+ }
+ verify(lock,
times(numberOfLocks)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
+ }
+
@Test
@DisplayName("get() same entry twice yields one cache entry only")
void testGetTwice() throws IOException {
@@ -266,7 +318,7 @@ void testEviction1() throws IOException,
InterruptedException, TimeoutException
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
assertEntryExistence(dbKey1, false);
-
+
// Verify compaction was called on the tables
org.apache.hadoop.hdds.utils.db.DBStore store1 =
snapshot1.get().getMetadataManager().getStore();
verify(store1, times(1)).compactTable("table1");
@@ -371,7 +423,8 @@ void testEviction3WithClose() throws IOException,
InterruptedException, TimeoutE
@DisplayName("Snapshot operations not blocked during compaction")
void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException,
InterruptedException, TimeoutException {
omMetrics = OMMetrics.create();
- snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true);
+ snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true,
+ lock);
final UUID dbKey1 = UUID.randomUUID();
UncheckedAutoCloseableSupplier<OmSnapshot> snapshot1 =
snapshotCache.get(dbKey1);
assertEquals(1, snapshotCache.size());
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index 57666d3665e..2ffbaa44d82 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -135,6 +135,7 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
import
org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap;
import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
import
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage;
@@ -370,7 +371,8 @@ public void init() throws RocksDBException, IOException,
ExecutionException {
omSnapshotManager = mock(OmSnapshotManager.class);
when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
- SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10,
omMetrics, 0, true);
+ SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10,
omMetrics, 0, true,
+ new OmReadOnlyLock());
when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(),
anyString()))
.thenAnswer(invocationOnMock -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]