This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b39bac05eca HDDS-13783. Implement locks for OmSnapshotLocalDataManager 
(#9140)
b39bac05eca is described below

commit b39bac05eca3f78a77b7efe8cbb5487fc00d48c2
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Oct 29 02:19:30 2025 -0400

    HDDS-13783. Implement locks for OmSnapshotLocalDataManager (#9140)
---
 .../apache/hadoop/ozone/om/lock/FlatResource.java  |   4 +-
 .../hadoop/ozone/om/OmSnapshotLocalData.java       |  17 +-
 .../hadoop/ozone/om/SnapshotDefragService.java     |   8 +-
 .../om/snapshot/OmSnapshotLocalDataManager.java    | 573 ++++++++++++++++++++-
 .../ozone/om/TestOmSnapshotLocalDataYaml.java      |   8 +-
 .../hadoop/ozone/om/TestOmSnapshotManager.java     |   6 +-
 .../snapshot/TestOmSnapshotLocalDataManager.java   | 462 ++++++++++++++++-
 7 files changed, 1018 insertions(+), 60 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
index 73f8357252f..f4d7e72ece3 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/FlatResource.java
@@ -26,7 +26,9 @@ public enum FlatResource implements Resource {
   // Background services lock on a Snapshot.
   SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"),
   // Lock acquired on a Snapshot's RocksDB Handle.
-  SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK");
+  SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK"),
+  // Lock acquired on a Snapshot's Local Data.
+  SNAPSHOT_LOCAL_DATA_LOCK("SNAPSHOT_LOCAL_DATA_LOCK");
 
   private String name;
   private IOzoneManagerLock.ResourceManager resourceManager;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
index 83ad02fb14b..1c840a1cd2e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
@@ -163,7 +163,7 @@ public Map<Integer, VersionMeta> getVersionSstFileInfos() {
    * Sets the defragged SST file list.
    * @param versionSstFileInfos Map of version to defragged SST file list
    */
-  public void setVersionSstFileInfos(Map<Integer, VersionMeta> 
versionSstFileInfos) {
+  void setVersionSstFileInfos(Map<Integer, VersionMeta> versionSstFileInfos) {
     this.versionSstFileInfos.clear();
     this.versionSstFileInfos.putAll(versionSstFileInfos);
   }
@@ -184,9 +184,14 @@ public void setPreviousSnapshotId(UUID previousSnapshotId) 
{
    * Adds an entry to the defragged SST file list.
    * @param sstFiles SST file name
    */
-  public void addVersionSSTFileInfos(List<SstFileInfo> sstFiles, int 
previousSnapshotVersion) {
+  public void addVersionSSTFileInfos(List<LiveFileMetaData> sstFiles, int 
previousSnapshotVersion) {
     version++;
-    this.versionSstFileInfos.put(version, new 
VersionMeta(previousSnapshotVersion, sstFiles));
+    this.versionSstFileInfos.put(version, new 
VersionMeta(previousSnapshotVersion, sstFiles.stream()
+        .map(SstFileInfo::new).collect(Collectors.toList())));
+  }
+
+  public void removeVersionSSTFileInfos(int snapshotVersion) {
+    this.versionSstFileInfos.remove(snapshotVersion);
   }
 
   /**
@@ -274,7 +279,7 @@ public OmSnapshotLocalData copyObject() {
    * maintain immutability.
    */
   public static class VersionMeta implements CopyObject<VersionMeta> {
-    private final int previousSnapshotVersion;
+    private int previousSnapshotVersion;
     private final List<SstFileInfo> sstFiles;
 
     public VersionMeta(int previousSnapshotVersion, List<SstFileInfo> 
sstFiles) {
@@ -286,6 +291,10 @@ public int getPreviousSnapshotVersion() {
       return previousSnapshotVersion;
     }
 
+    public void setPreviousSnapshotVersion(int previousSnapshotVersion) {
+      this.previousSnapshotVersion = previousSnapshotVersion;
+    }
+
     public List<SstFileInfo> getSstFiles() {
       return sstFiles;
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
index 9747bb7c894..87f6ff55bb7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
 import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,11 +131,10 @@ private boolean needsDefragmentation(SnapshotInfo 
snapshotInfo) {
     String snapshotPath = OmSnapshotManager.getSnapshotPath(
         ozoneManager.getConfiguration(), snapshotInfo);
 
-    try {
+    try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider 
readableOmSnapshotLocalDataProvider =
+             
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
 {
       // Read snapshot local metadata from YAML
-      OmSnapshotLocalData snapshotLocalData = 
ozoneManager.getOmSnapshotManager()
-          .getSnapshotLocalDataManager()
-          .getOmSnapshotLocalData(snapshotInfo);
+      OmSnapshotLocalData snapshotLocalData = 
readableOmSnapshotLocalDataProvider.getSnapshotLocalData();
 
       // Check if snapshot needs compaction (defragmentation)
       boolean needsDefrag = snapshotLocalData.getNeedsDefrag();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
index 3c529abaf3c..3e92eb6748c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
@@ -27,15 +27,23 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.Stack;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -44,8 +52,15 @@
 import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.FlatResource;
+import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
+import 
org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.util.ObjectSerializer;
 import org.apache.hadoop.ozone.util.YamlSerializer;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.rocksdb.LiveFileMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -59,9 +74,32 @@ public class OmSnapshotLocalDataManager implements 
AutoCloseable {
   private static final Logger LOG = 
LoggerFactory.getLogger(OmSnapshotLocalDataManager.class);
 
   private final ObjectSerializer<OmSnapshotLocalData> 
snapshotLocalDataSerializer;
+  // In-memory DAG of snapshot-version dependencies. Each node represents a
+  // specific (snapshotId, version) pair, and a directed edge points to the
+  // corresponding (previousSnapshotId, previousSnapshotVersion) it depends on.
+  // The durable state is stored in each snapshot's YAML (previousSnapshotId 
and
+  // VersionMeta.previousSnapshotVersion). This graph mirrors that persisted
+  // structure to validate adds/removes and to resolve versions across chains.
+  // This graph is maintained only in memory and is not persisted to disk.
+  // Example (linear chain, arrows point to previous):
+  //   (S0, v1)  <-  (S1, v4)  <-  (S2, v5)  <-  (S3, v7)
+  // where each node is (snapshotId, version) and each arrow points to its
+  // corresponding (previousSnapshotId, previousSnapshotVersion) dependency.
+  //
+  // Example (multiple versions for a single snapshotId S2):
+  //   (S1, v4)  <-  (S2, v6)  <-  (S3, v8)
+  //   (S1, v3)  <-  (S2, v5)
+  // Here S2 has two distinct versions (v6 and v5), each represented as its own
+  // node, and each version can depend on a different previousSnapshotVersion 
on S1.
   private final MutableGraph<LocalDataVersionNode> localDataGraph;
   private final Map<UUID, SnapshotVersionsMeta> versionNodeMap;
   private final OMMetadataManager omMetadataManager;
+  // Used for acquiring locks on the entire data structure.
+  private final ReadWriteLock fullLock;
+  // Used for taking a lock on internal data structure Map and Graph to ensure 
thread safety;
+  private final ReadWriteLock internalLock;
+  // Locks should be always acquired by iterating through the snapshot chain 
to avoid deadlocks.
+  private HierarchicalResourceLockManager locks;
 
   public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager) 
throws IOException {
     this.localDataGraph = GraphBuilder.directed().build();
@@ -74,7 +112,9 @@ public void computeAndSetChecksum(Yaml yaml, 
OmSnapshotLocalData data) throws IO
         data.computeAndSetChecksum(yaml);
       }
     };
-    this.versionNodeMap = new HashMap<>();
+    this.versionNodeMap = new ConcurrentHashMap<>();
+    this.fullLock = new ReentrantReadWriteLock();
+    this.internalLock = new ReentrantReadWriteLock();
     init();
   }
 
@@ -114,26 +154,41 @@ public String getSnapshotLocalPropertyYamlPath(UUID 
snapshotId) {
    * @param snapshotInfo snapshot info instance corresponding to snapshot.
    */
   public void createNewOmSnapshotLocalDataFile(RDBStore snapshotStore, 
SnapshotInfo snapshotInfo) throws IOException {
-    Path snapshotLocalDataPath = Paths.get(
-        
getSnapshotLocalPropertyYamlPath(snapshotStore.getDbLocation().toPath()));
-    Files.deleteIfExists(snapshotLocalDataPath);
-    OmSnapshotLocalData snapshotLocalDataYaml = new 
OmSnapshotLocalData(snapshotInfo.getSnapshotId(),
-        OmSnapshotManager.getSnapshotSSTFileList(snapshotStore), 
snapshotInfo.getPathPreviousSnapshotId());
-    snapshotLocalDataSerializer.save(snapshotLocalDataPath.toFile(), 
snapshotLocalDataYaml);
+    try (WritableOmSnapshotLocalDataProvider snapshotLocalData =
+        new WritableOmSnapshotLocalDataProvider(snapshotInfo.getSnapshotId(),
+            () -> Pair.of(new OmSnapshotLocalData(snapshotInfo.getSnapshotId(),
+                OmSnapshotManager.getSnapshotSSTFileList(snapshotStore), 
snapshotInfo.getPathPreviousSnapshotId()),
+                null))) {
+      snapshotLocalData.commit();
+    }
   }
 
-  public OmSnapshotLocalData getOmSnapshotLocalData(SnapshotInfo snapshotInfo) 
throws IOException {
+  public ReadableOmSnapshotLocalDataProvider 
getOmSnapshotLocalData(SnapshotInfo snapshotInfo) throws IOException {
     return getOmSnapshotLocalData(snapshotInfo.getSnapshotId());
   }
 
-  public OmSnapshotLocalData getOmSnapshotLocalData(UUID snapshotId) throws 
IOException {
-    Path snapshotLocalDataPath = 
Paths.get(getSnapshotLocalPropertyYamlPath(snapshotId));
-    OmSnapshotLocalData snapshotLocalData = 
snapshotLocalDataSerializer.load(snapshotLocalDataPath.toFile());
-    if (!Objects.equals(snapshotLocalData.getSnapshotId(), snapshotId)) {
-      throw new IOException("SnapshotId in path : " + snapshotLocalDataPath + 
" contains snapshotLocalData " +
-          "corresponding to snapshotId " + snapshotLocalData.getSnapshotId() + 
". Expected snapshotId " + snapshotId);
-    }
-    return snapshotLocalData;
+  public ReadableOmSnapshotLocalDataProvider getOmSnapshotLocalData(UUID 
snapshotId) throws IOException {
+    return new ReadableOmSnapshotLocalDataProvider(snapshotId);
+  }
+
+  public ReadableOmSnapshotLocalDataProvider getOmSnapshotLocalData(UUID 
snapshotId, UUID previousSnapshotID)
+      throws IOException {
+    return new ReadableOmSnapshotLocalDataProvider(snapshotId, 
previousSnapshotID);
+  }
+
+  public WritableOmSnapshotLocalDataProvider 
getWritableOmSnapshotLocalData(SnapshotInfo snapshotInfo)
+      throws IOException {
+    return getWritableOmSnapshotLocalData(snapshotInfo.getSnapshotId(), 
snapshotInfo.getPathPreviousSnapshotId());
+  }
+
+  public WritableOmSnapshotLocalDataProvider 
getWritableOmSnapshotLocalData(UUID snapshotId, UUID previousSnapshotId)
+      throws IOException {
+    return new WritableOmSnapshotLocalDataProvider(snapshotId, 
previousSnapshotId);
+  }
+
+  public WritableOmSnapshotLocalDataProvider 
getWritableOmSnapshotLocalData(UUID snapshotId)
+      throws IOException {
+    return new WritableOmSnapshotLocalDataProvider(snapshotId);
   }
 
   public OmSnapshotLocalData getOmSnapshotLocalData(File snapshotDataPath) 
throws IOException {
@@ -141,7 +196,7 @@ public OmSnapshotLocalData getOmSnapshotLocalData(File 
snapshotDataPath) throws
   }
 
   private LocalDataVersionNode getVersionNode(UUID snapshotId, int version) {
-    if (!versionNodeMap.containsKey(snapshotId)) {
+    if (snapshotId == null || !versionNodeMap.containsKey(snapshotId)) {
       return null;
     }
     return versionNodeMap.get(snapshotId).getVersionNode(version);
@@ -151,15 +206,9 @@ private void addSnapshotVersionMeta(UUID snapshotId, 
SnapshotVersionsMeta snapsh
       throws IOException {
     if (!versionNodeMap.containsKey(snapshotId)) {
       for (LocalDataVersionNode versionNode : 
snapshotVersionsMeta.getSnapshotVersions().values()) {
-        if (getVersionNode(versionNode.snapshotId, versionNode.version) != 
null) {
-          throw new IOException("Unable to add " + versionNode + " since it 
already exists");
-        }
-        LocalDataVersionNode previousVersionNode = 
versionNode.previousSnapshotId == null ? null :
+        validateVersionAddition(versionNode);
+        LocalDataVersionNode previousVersionNode =
             getVersionNode(versionNode.previousSnapshotId, 
versionNode.previousSnapshotVersion);
-        if (versionNode.previousSnapshotId != null && previousVersionNode == 
null) {
-          throw new IOException("Unable to add " + versionNode + " since 
previous snapshot with version hasn't been " +
-              "loaded");
-        }
         localDataGraph.addNode(versionNode);
         if (previousVersionNode != null) {
           localDataGraph.putEdge(versionNode, previousVersionNode);
@@ -186,7 +235,13 @@ void addVersionNodeWithDependents(OmSnapshotLocalData 
snapshotLocalData) throws
       } else {
         UUID prevSnapId = snapshotVersionsMeta.getPreviousSnapshotId();
         if (prevSnapId != null && !versionNodeMap.containsKey(prevSnapId)) {
-          OmSnapshotLocalData prevSnapshotLocalData = 
getOmSnapshotLocalData(prevSnapId);
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(prevSnapId));
+          OmSnapshotLocalData prevSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+          if (!prevSnapId.equals(prevSnapshotLocalData.getSnapshotId())) {
+            throw new IOException("SnapshotId mismatch: expected " + 
prevSnapId +
+                " but found " + prevSnapshotLocalData.getSnapshotId() +
+                " in file " + previousSnapshotLocalDataFile.getAbsolutePath());
+          }
           stack.push(Pair.of(prevSnapshotLocalData.getSnapshotId(), new 
SnapshotVersionsMeta(prevSnapshotLocalData)));
         }
         visitedSnapshotIds.add(snapId);
@@ -195,6 +250,7 @@ void addVersionNodeWithDependents(OmSnapshotLocalData 
snapshotLocalData) throws
   }
 
   private void init() throws IOException {
+    this.locks = omMetadataManager.getHierarchicalLockManager();
     RDBStore store = (RDBStore) omMetadataManager.getStore();
     String checkpointPrefix = store.getDbLocation().getName();
     File snapshotDir = new File(store.getSnapshotsParentDir());
@@ -217,6 +273,46 @@ private void init() throws IOException {
     }
   }
 
+  /**
+   * Acquires a write lock and provides an auto-closeable supplier for 
specifying details
+   * of the lock acquisition. The lock is released when the returned supplier 
is closed.
+   *
+   * @return an instance of {@code 
UncheckedAutoCloseableSupplier<OMLockDetails>} representing
+   *         the acquired lock details, where the lock will automatically be 
released on close.
+   */
+  public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
+    this.fullLock.writeLock().lock();
+    return new UncheckedAutoCloseableSupplier<OMLockDetails>() {
+      @Override
+      public OMLockDetails get() {
+        return OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
+      }
+
+      @Override
+      public void close() {
+        fullLock.writeLock().unlock();
+      }
+    };
+  }
+
+  private void validateVersionRemoval(UUID snapshotId, int version) throws 
IOException {
+    LocalDataVersionNode versionNode = getVersionNode(snapshotId, version);
+    if (versionNode != null && localDataGraph.inDegree(versionNode) != 0) {
+      Set<LocalDataVersionNode> versionNodes = 
localDataGraph.predecessors(versionNode);
+      throw new IOException(String.format("Cannot remove Snapshot %s with 
version : %d since it still has " +
+          "predecessors : %s", snapshotId, version, versionNodes));
+    }
+  }
+
+  private void validateVersionAddition(LocalDataVersionNode versionNode) 
throws IOException {
+    LocalDataVersionNode previousVersionNode = 
getVersionNode(versionNode.previousSnapshotId,
+        versionNode.previousSnapshotVersion);
+    if (versionNode.previousSnapshotId != null && previousVersionNode == null) 
{
+      throw new IOException("Unable to add " + versionNode + " since previous 
snapshot with version hasn't been " +
+          "loaded");
+    }
+  }
+
   @Override
   public void close() {
     if (snapshotLocalDataSerializer != null) {
@@ -228,6 +324,413 @@ public void close() {
     }
   }
 
+  private static final class LockDataProviderInitResult {
+    private final OmSnapshotLocalData snapshotLocalData;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final UUID previousSnapshotId;
+
+    private LockDataProviderInitResult(HierarchicalResourceLock lock, 
OmSnapshotLocalData snapshotLocalData,
+        HierarchicalResourceLock previousLock, UUID previousSnapshotId) {
+      this.lock = lock;
+      this.snapshotLocalData = snapshotLocalData;
+      this.previousLock = previousLock;
+      this.previousSnapshotId = previousSnapshotId;
+    }
+
+    private HierarchicalResourceLock getLock() {
+      return lock;
+    }
+
+    private HierarchicalResourceLock getPreviousLock() {
+      return previousLock;
+    }
+
+    private UUID getPreviousSnapshotId() {
+      return previousSnapshotId;
+    }
+
+    private OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+  }
+
+  /**
+   * The ReadableOmSnapshotLocalDataProvider class is responsible for managing 
the
+   * access and initialization of local snapshot data in a thread-safe manner.
+   * It provides mechanisms to handle snapshot data, retrieve associated 
previous
+   * snapshot data, and manage lock synchronization for safe concurrent 
operations.
+   *
+   * This class works with snapshot identifiers and ensures that the 
appropriate
+   * local data for a given snapshot is loaded and accessible. Additionally, it
+   * maintains locking mechanisms to ensure thread-safe initialization and 
access
+   * to both the current and previous snapshot local data. The implementation 
also
+   * supports handling errors in the snapshot data initialization process.
+   *
+   * Key Functionalities:
+   * - Initializes and provides access to snapshot local data associated with a
+   *   given snapshot identifier.
+   * - Resolves and retrieves data for the previous snapshot if applicable.
+   * - Ensures safe concurrent read operations using locking mechanisms.
+   * - Validates the integrity and consistency of snapshot data during 
initialization.
+   * - Ensures that appropriate locks are released upon closing.
+   *
+   * Thread-Safety:
+   * This class utilizes locks to guarantee thread-safe operations when 
accessing
+   * or modifying snapshot data. State variables relating to snapshot data are
+   * properly synchronized to ensure consistency during concurrent operations.
+   *
+   * Usage Considerations:
+   * - Ensure proper handling of exceptions while interacting with this class,
+   *   particularly during initialization and cleanup.
+   * - Always invoke the {@code close()} method after usage to release 
acquired locks
+   *   and avoid potential deadlocks.
+   */
+  public class ReadableOmSnapshotLocalDataProvider implements AutoCloseable {
+
+    private final UUID snapshotId;
+    private final HierarchicalResourceLock lock;
+    private final HierarchicalResourceLock previousLock;
+    private final OmSnapshotLocalData snapshotLocalData;
+    private OmSnapshotLocalData previousSnapshotLocalData;
+    private volatile boolean isPreviousSnapshotLoaded = false;
+    private final UUID resolvedPreviousSnapshotId;
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      this(snapshotId, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapIdToResolve) throws IOException {
+      this(snapshotId, true, null, snapIdToResolve, true);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock) throws IOException {
+      this(snapshotId, readLock, null, null, false);
+    }
+
+    protected ReadableOmSnapshotLocalDataProvider(UUID snapshotId, boolean 
readLock,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier,
+        UUID snapshotIdToBeResolved, boolean isSnapshotToBeResolvedNullable) 
throws IOException {
+      this.snapshotId = snapshotId;
+      LockDataProviderInitResult result = initialize(readLock, snapshotId, 
snapshotIdToBeResolved,
+          isSnapshotToBeResolvedNullable, snapshotLocalDataSupplier);
+      this.snapshotLocalData = result.getSnapshotLocalData();
+      this.lock = result.getLock();
+      this.previousLock = result.getPreviousLock();
+      this.resolvedPreviousSnapshotId = result.getPreviousSnapshotId();
+      this.previousSnapshotLocalData = null;
+      this.isPreviousSnapshotLoaded = false;
+    }
+
+    public OmSnapshotLocalData getSnapshotLocalData() {
+      return snapshotLocalData;
+    }
+
+    public synchronized OmSnapshotLocalData getPreviousSnapshotLocalData() 
throws IOException {
+      if (!isPreviousSnapshotLoaded) {
+        if (resolvedPreviousSnapshotId != null) {
+          File previousSnapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(resolvedPreviousSnapshotId));
+          this.previousSnapshotLocalData = 
snapshotLocalDataSerializer.load(previousSnapshotLocalDataFile);
+        }
+        this.isPreviousSnapshotLoaded = true;
+      }
+      return previousSnapshotLocalData;
+    }
+
+    private HierarchicalResourceLock acquireLock(UUID snapId, boolean 
readLock) throws IOException {
+      HierarchicalResourceLock acquiredLock = readLock ? 
locks.acquireReadLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK,
+          snapId.toString()) : 
locks.acquireWriteLock(FlatResource.SNAPSHOT_LOCAL_DATA_LOCK, 
snapId.toString());
+      if (!acquiredLock.isLockAcquired()) {
+        throw new IOException("Unable to acquire lock for snapshotId: " + 
snapId);
+      }
+      return acquiredLock;
+    }
+
+    /**
+     * Intializes the snapshot local data by acquiring the lock on the 
snapshot and also acquires a read lock on the
+     * snapshotId to be resolved by iterating through the chain of previous 
snapshot ids.
+     */
+    private LockDataProviderInitResult initialize(
+        boolean readLock, UUID snapId, UUID toResolveSnapshotId, boolean 
isSnapshotToBeResolvedNullable,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier) throws IOException {
+      HierarchicalResourceLock snapIdLock = null;
+      HierarchicalResourceLock previousReadLockAcquired = null;
+      try {
+        snapIdLock = acquireLock(snapId, readLock);
+        snapshotLocalDataSupplier = snapshotLocalDataSupplier == null ? () -> {
+          File snapshotLocalDataFile = new 
File(getSnapshotLocalPropertyYamlPath(snapId));
+          return 
Pair.of(snapshotLocalDataSerializer.load(snapshotLocalDataFile), 
snapshotLocalDataFile);
+        } : snapshotLocalDataSupplier;
+        Pair<OmSnapshotLocalData, File> pair = snapshotLocalDataSupplier.get();
+        OmSnapshotLocalData ssLocalData = pair.getKey();
+        if (!Objects.equals(ssLocalData.getSnapshotId(), snapId)) {
+          String loadPath = pair.getValue() == null ? null : 
pair.getValue().getAbsolutePath();
+          throw new IOException("SnapshotId in path : " + loadPath + " 
contains snapshotLocalData corresponding " +
+              "to snapshotId " + ssLocalData.getSnapshotId() + ". Expected 
snapshotId " + snapId);
+        }
+        // Get previous snapshotId and acquire read lock on the id. We need to 
do this outside the loop instead of a
+        // do while loop since the nodes that need be added may not be present 
in the graph so it may not be possible
+        // to iterate through the chain.
+        UUID previousSnapshotId = ssLocalData.getPreviousSnapshotId();
+        // if flag toResolveSnapshotIdIsNull is true or toResolveSnapshotId is 
not null, then we resolve snapshot
+        // with previous snapshot id as null, which would mean if the snapshot 
local data is committed the snapshot
+        // local data would become first snapshot in the chain with no 
previous snapshot id.
+        toResolveSnapshotId = (isSnapshotToBeResolvedNullable || 
toResolveSnapshotId != null) ? toResolveSnapshotId :
+            ssLocalData.getPreviousSnapshotId();
+        if (toResolveSnapshotId != null && previousSnapshotId != null) {
+          previousReadLockAcquired = acquireLock(previousSnapshotId, true);
+          if (!versionNodeMap.containsKey(previousSnapshotId)) {
+            throw new IOException(String.format("Operating on snapshot id : %s 
with previousSnapshotId: %s invalid " +
+                "since previousSnapshotId is not loaded.", snapId, 
previousSnapshotId));
+          }
+          // Create a copy of the previous versionMap to get the previous 
versions corresponding to the previous
+          // snapshot. This map would mutated to resolve the previous 
snapshot's version corresponding to the
+          // toResolveSnapshotId by iterating through the chain of previous 
snapshot ids.
+          Map<Integer, LocalDataVersionNode> previousVersionNodeMap =
+              new 
HashMap<>(versionNodeMap.get(previousSnapshotId).getSnapshotVersions());
+          UUID currentIteratedSnapshotId = previousSnapshotId;
+          // Iterate through the chain of previous snapshot ids until the 
snapshot id to be resolved is found.
+          while (!Objects.equals(currentIteratedSnapshotId, 
toResolveSnapshotId)) {
+            // All versions for the snapshot should point to the same previous 
snapshot id. Otherwise this is a sign
+            // of corruption.
+            Set<UUID> previousIds =
+                
previousVersionNodeMap.values().stream().map(LocalDataVersionNode::getPreviousSnapshotId)
+                .collect(Collectors.toSet());
+            if (previousIds.size() > 1) {
+              throw new IOException(String.format("Snapshot %s versions has 
multiple previous snapshotIds %s",
+                  currentIteratedSnapshotId, previousIds));
+            }
+            if (previousIds.isEmpty()) {
+              throw new IOException(String.format("Snapshot %s versions 
doesn't have previous Id thus snapshot " +
+                      "%s cannot be resolved against id %s",
+                  currentIteratedSnapshotId, snapId, toResolveSnapshotId));
+            }
+            UUID previousId = previousIds.iterator().next();
+            HierarchicalResourceLock previousToPreviousReadLockAcquired = 
acquireLock(previousId, true);
+            try {
+              // Get the version node for the snapshot and update the version 
node to the successor to point to the
+              // previous node.
+              for (Map.Entry<Integer, LocalDataVersionNode> entry : 
previousVersionNodeMap.entrySet()) {
+                internalLock.readLock().lock();
+                try {
+                  Set<LocalDataVersionNode> versionNode = 
localDataGraph.successors(entry.getValue());
+                  if (versionNode.size() > 1) {
+                    throw new IOException(String.format("Snapshot %s version 
%d has multiple successors %s",
+                        currentIteratedSnapshotId, 
entry.getValue().getVersion(), versionNode));
+                  }
+                  if (versionNode.isEmpty()) {
+                    throw new IOException(String.format("Snapshot %s version 
%d doesn't have successor",
+                        currentIteratedSnapshotId, 
entry.getValue().getVersion()));
+                  }
+                  // Set the version node for iterated version to the 
successor corresponding to the previous snapshot
+                  // id.
+                  entry.setValue(versionNode.iterator().next());
+                } finally {
+                  internalLock.readLock().unlock();
+                }
+              }
+            } finally {
+              // Release the read lock acquired on the previous snapshot id 
acquired. Now that the instance
+              // is no longer needed we can release the read lock for the 
snapshot iterated in the previous snapshot.
+              // Make previousToPrevious previous for next iteration.
+              previousReadLockAcquired.close();
+              previousReadLockAcquired = previousToPreviousReadLockAcquired;
+              currentIteratedSnapshotId = previousId;
+            }
+          }
+          ssLocalData.setPreviousSnapshotId(toResolveSnapshotId);
+          Map<Integer, OmSnapshotLocalData.VersionMeta> versionMetaMap = 
ssLocalData.getVersionSstFileInfos();
+          for (Map.Entry<Integer, OmSnapshotLocalData.VersionMeta> entry : 
versionMetaMap.entrySet()) {
+            OmSnapshotLocalData.VersionMeta versionMeta = entry.getValue();
+            // Get the relative version node which corresponds to the 
toResolveSnapshotId corresponding to the
+            // versionMeta which points to a particular version in the 
previous snapshot
+            LocalDataVersionNode relativePreviousVersionNode =
+                
previousVersionNodeMap.get(versionMeta.getPreviousSnapshotVersion());
+            if (relativePreviousVersionNode == null) {
+              throw new IOException(String.format("Unable to resolve previous 
version node for snapshot: %s" +
+                  " with version : %d against previous snapshot %s previous 
version : %d",
+                  snapId, entry.getKey(), toResolveSnapshotId, 
versionMeta.getPreviousSnapshotVersion()));
+            }
+            // Set the previous snapshot version to the 
relativePreviousVersionNode which was captured.
+            
versionMeta.setPreviousSnapshotVersion(relativePreviousVersionNode.getVersion());
+          }
+        } else {
+          toResolveSnapshotId = null;
+          ssLocalData.setPreviousSnapshotId(null);
+        }
+        return new LockDataProviderInitResult(snapIdLock, ssLocalData, 
previousReadLockAcquired, toResolveSnapshotId);
+      } catch (IOException e) {
+        // Release all the locks in case of an exception and rethrow the 
exception.
+        if (previousReadLockAcquired != null) {
+          previousReadLockAcquired.close();
+        }
+        if (snapIdLock != null) {
+          snapIdLock.close();
+        }
+        throw e;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (previousLock != null) {
+        previousLock.close();
+      }
+      if (lock != null) {
+        lock.close();
+      }
+    }
+  }
+
+  /**
+   * This class represents a writable provider for managing local data of
+   * OmSnapshot. It extends the functionality of {@code 
ReadableOmSnapshotLocalDataProvider}
+   * and provides support for write operations, such as committing changes.
+   *
+   * The writable snapshot data provider interacts with version nodes and
+   * facilitates atomic updates to snapshot properties and files.
+   *
+   * This class is designed to ensure thread-safe operations and uses locks to
+   * guarantee consistent state across concurrent activities.
+   *
+   * The default usage includes creating an instance of this provider with
+   * specific snapshot identifiers and optionally handling additional 
parameters
+   * such as data resolution or a supplier for snapshot data.
+   */
+  public final class WritableOmSnapshotLocalDataProvider extends 
ReadableOmSnapshotLocalDataProvider {
+
+    private boolean dirty;
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws 
IOException {
+      super(snapshotId, false);
+      fullLock.readLock().lock();
+    }
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID 
snapshotIdToBeResolved) throws IOException {
+      super(snapshotId, false, null, snapshotIdToBeResolved, true);
+      fullLock.readLock().lock();
+    }
+
+    private WritableOmSnapshotLocalDataProvider(UUID snapshotId,
+        CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> 
snapshotLocalDataSupplier) throws IOException {
+      super(snapshotId, false, snapshotLocalDataSupplier, null, false);
+      fullLock.readLock().lock();
+    }
+
+    private SnapshotVersionsMeta validateModification(OmSnapshotLocalData 
snapshotLocalData)
+        throws IOException {
+      internalLock.readLock().lock();
+      try {
+        SnapshotVersionsMeta versionsToBeAdded = new 
SnapshotVersionsMeta(snapshotLocalData);
+        SnapshotVersionsMeta existingVersionsMeta = 
getVersionNodeMap().get(snapshotLocalData.getSnapshotId());
+        for (LocalDataVersionNode node : 
versionsToBeAdded.getSnapshotVersions().values()) {
+          validateVersionAddition(node);
+        }
+        UUID snapshotId = snapshotLocalData.getSnapshotId();
+        Map<Integer, LocalDataVersionNode> existingVersions = 
getVersionNodeMap().containsKey(snapshotId) ?
+            getVersionNodeMap().get(snapshotId).getSnapshotVersions() : 
Collections.emptyMap();
+        for (Map.Entry<Integer, LocalDataVersionNode> entry : 
existingVersions.entrySet()) {
+          if 
(!versionsToBeAdded.getSnapshotVersions().containsKey(entry.getKey())) {
+            validateVersionRemoval(snapshotId, entry.getKey());
+          }
+        }
+        // Set Dirty if the snapshot doesn't exist or previousSnapshotId has 
changed.
+        if (existingVersionsMeta == null || 
!Objects.equals(versionsToBeAdded.getPreviousSnapshotId(),
+            existingVersionsMeta.getPreviousSnapshotId())) {
+          setDirty();
+        }
+        return versionsToBeAdded;
+      } finally {
+        internalLock.readLock().unlock();
+      }
+    }
+
+    public void addSnapshotVersion(RDBStore snapshotStore) throws IOException {
+      List<LiveFileMetaData> sstFiles = 
OmSnapshotManager.getSnapshotSSTFileList(snapshotStore);
+      OmSnapshotLocalData previousSnapshotLocalData = 
getPreviousSnapshotLocalData();
+      this.getSnapshotLocalData().addVersionSSTFileInfos(sstFiles, 
previousSnapshotLocalData == null ? 0 :
+          previousSnapshotLocalData.getVersion());
+      // Set Dirty if a version is added.
+      setDirty();
+    }
+
+    public void removeVersion(int version) {
+      this.getSnapshotLocalData().removeVersionSSTFileInfos(version);
+      // Set Dirty if a version is removed.
+      setDirty();
+    }
+
+    public synchronized void commit() throws IOException {
+      // Validate modification and commit the changes.
+      SnapshotVersionsMeta localDataVersionNodes = 
validateModification(super.snapshotLocalData);
+      // Need to update the disk state if and only if the dirty bit is set.
+      if (isDirty()) {
+        String filePath = getSnapshotLocalPropertyYamlPath(super.snapshotId);
+        String tmpFilePath = filePath + ".tmp";
+        File tmpFile = new File(tmpFilePath);
+        boolean tmpFileExists = tmpFile.exists();
+        if (tmpFileExists) {
+          tmpFileExists = !tmpFile.delete();
+        }
+        if (tmpFileExists) {
+          throw new IOException("Unable to delete tmp file " + tmpFilePath);
+        }
+        snapshotLocalDataSerializer.save(new File(tmpFilePath), 
super.snapshotLocalData);
+        Files.move(tmpFile.toPath(), Paths.get(filePath), 
StandardCopyOption.ATOMIC_MOVE,
+            StandardCopyOption.REPLACE_EXISTING);
+        upsertNode(super.snapshotId, localDataVersionNodes);
+        // Reset dirty bit
+        resetDirty();
+      }
+    }
+
+    private void upsertNode(UUID snapshotId, SnapshotVersionsMeta 
snapshotVersions) throws IOException {
+      internalLock.writeLock().lock();
+      try {
+        SnapshotVersionsMeta existingSnapVersions = 
getVersionNodeMap().remove(snapshotId);
+        Map<Integer, LocalDataVersionNode> existingVersions = 
existingSnapVersions == null ? Collections.emptyMap() :
+            existingSnapVersions.getSnapshotVersions();
+        Map<Integer, List<LocalDataVersionNode>> predecessors = new 
HashMap<>();
+        // Track all predecessors of the existing versions and remove the node 
from the graph.
+        for (Map.Entry<Integer, LocalDataVersionNode> existingVersion : 
existingVersions.entrySet()) {
+          LocalDataVersionNode existingVersionNode = 
existingVersion.getValue();
+          // Create a copy of predecessors since the list of nodes returned 
would be a mutable set and it changes as the
+          // nodes in the graph would change.
+          predecessors.put(existingVersion.getKey(), new 
ArrayList<>(localDataGraph.predecessors(existingVersionNode)));
+          localDataGraph.removeNode(existingVersionNode);
+        }
+        // Add the nodes to be added in the graph and map.
+        addSnapshotVersionMeta(snapshotId, snapshotVersions);
+        // Reconnect all the predecessors for existing nodes.
+        for (Map.Entry<Integer, LocalDataVersionNode> entry : 
snapshotVersions.getSnapshotVersions().entrySet()) {
+          for (LocalDataVersionNode predecessor : 
predecessors.getOrDefault(entry.getKey(), Collections.emptyList())) {
+            localDataGraph.putEdge(predecessor, entry.getValue());
+          }
+        }
+      } finally {
+        internalLock.writeLock().unlock();
+      }
+    }
+
+    private void setDirty() {
+      dirty = true;
+    }
+
+    private void resetDirty() {
+      dirty = false;
+    }
+
+    private boolean isDirty() {
+      return dirty;
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      fullLock.readLock().unlock();
+    }
+  }
+
   static final class LocalDataVersionNode {
     private final UUID snapshotId;
     private final int version;
@@ -241,6 +744,14 @@ private LocalDataVersionNode(UUID snapshotId, int version, 
UUID previousSnapshot
       this.version = version;
     }
 
+    private UUID getPreviousSnapshotId() {
+      return previousSnapshotId;
+    }
+
+    private int getVersion() {
+      return version;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof LocalDataVersionNode)) {
@@ -255,6 +766,16 @@ public boolean equals(Object o) {
     public int hashCode() {
       return Objects.hash(snapshotId, version, previousSnapshotId, 
previousSnapshotVersion);
     }
+
+    @Override
+    public String toString() {
+      return "LocalDataVersionNode{" +
+          "snapshotId=" + snapshotId +
+          ", version=" + version +
+          ", previousSnapshotId=" + previousSnapshotId +
+          ", previousSnapshotVersion=" + previousSnapshotVersion +
+          '}';
+    }
   }
 
   static final class SnapshotVersionsMeta {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
index 23d332ae75b..b234014ebbc 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
@@ -130,11 +130,11 @@ private Pair<File, UUID> writeToYaml(UUID snapshotId, 
String snapshotName) throw
 
     // Add some defragged SST files
     dataYaml.addVersionSSTFileInfos(ImmutableList.of(
-        new SstFileInfo("defragged-sst1", "k1", "k2", "table1"),
-        new SstFileInfo("defragged-sst2", "k3", "k4", "table2")),
+        createLiveFileMetaData("defragged-sst1", "table1", "k1", "k2"),
+        createLiveFileMetaData("defragged-sst2", "table2", "k3", "k4")),
         1);
     dataYaml.addVersionSSTFileInfos(Collections.singletonList(
-        new SstFileInfo("defragged-sst3", "k4", "k5", "table1")), 3);
+        createLiveFileMetaData("defragged-sst3", "table1", "k4", "k5")), 3);
 
     File yamlFile = new File(testRoot, yamlFilePath);
 
@@ -202,7 +202,7 @@ public void testUpdateSnapshotDataFile() throws IOException 
{
     dataYaml.setSstFiltered(false);
     dataYaml.setNeedsDefrag(false);
     dataYaml.addVersionSSTFileInfos(
-        singletonList(new SstFileInfo("defragged-sst4", "k5", "k6", 
"table3")), 5);
+        singletonList(createLiveFileMetaData("defragged-sst4", "table3", "k5", 
"k6")), 5);
 
     // Write updated data back to file
     omSnapshotLocalDataSerializer.save(yamlFile, dataYaml);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index 7f808df3f97..6ec49935b35 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -774,6 +774,7 @@ public void testCreateSnapshotIdempotent() throws Exception 
{
     when(bucketTable.get(dbBucketKey)).thenReturn(omBucketInfo);
 
     SnapshotInfo first = createSnapshotInfo(volumeName, bucketName);
+    first.setPathPreviousSnapshotId(null);
     when(snapshotInfoTable.get(first.getTableKey())).thenReturn(first);
 
     // Create first checkpoint for the snapshot checkpoint
@@ -797,10 +798,13 @@ public void testCreateSnapshotIdempotent() throws 
Exception {
 
   private SnapshotInfo createSnapshotInfo(String volumeName,
                                           String bucketName) {
-    return SnapshotInfo.newInstance(volumeName,
+    SnapshotInfo snapshotInfo = SnapshotInfo.newInstance(volumeName,
         bucketName,
         UUID.randomUUID().toString(),
         UUID.randomUUID(),
         Time.now());
+    snapshotInfo.setPathPreviousSnapshotId(null);
+    snapshotInfo.setGlobalPreviousSnapshotId(null);
+    return snapshotInfo;
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
index 34bde4814a6..947c1a4b7f4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
@@ -24,41 +24,63 @@
 import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import org.apache.commons.compress.utils.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
 import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.FlatResource;
+import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
+import 
org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
+import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider;
+import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
 import org.apache.hadoop.ozone.util.YamlSerializer;
 import org.apache.ozone.compaction.log.SstFileInfo;
+import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.rocksdb.LiveFileMetaData;
@@ -67,13 +89,18 @@
 /**
  * Test class for OmSnapshotLocalDataManager.
  */
+@Timeout(value = 30, unit = TimeUnit.SECONDS)
 public class TestOmSnapshotLocalDataManager {
 
   private static YamlSerializer<OmSnapshotLocalData> 
snapshotLocalDataYamlSerializer;
+  private static List<String> lockCapturor;
 
   @Mock
   private OMMetadataManager omMetadataManager;
 
+  @Mock
+  private HierarchicalResourceLockManager lockManager;
+
   @Mock
   private RDBStore rdbStore;
 
@@ -88,6 +115,11 @@ public class TestOmSnapshotLocalDataManager {
 
   private File snapshotsDir;
 
+  private static final String READ_LOCK_MESSAGE_ACQUIRE = "readLock acquire";
+  private static final String READ_LOCK_MESSAGE_UNLOCK = "readLock unlock";
+  private static final String WRITE_LOCK_MESSAGE_ACQUIRE = "writeLock acquire";
+  private static final String WRITE_LOCK_MESSAGE_UNLOCK = "writeLock unlock";
+
   @BeforeAll
   public static void setupClass() {
     snapshotLocalDataYamlSerializer = new YamlSerializer<OmSnapshotLocalData>(
@@ -98,10 +130,11 @@ public void computeAndSetChecksum(Yaml yaml, 
OmSnapshotLocalData data) throws IO
         data.computeAndSetChecksum(yaml);
       }
     };
+    lockCapturor = new ArrayList<>();
   }
 
   @AfterAll
-  public static void teardownClass() throws IOException {
+  public static void teardownClass() {
     snapshotLocalDataYamlSerializer.close();
     snapshotLocalDataYamlSerializer = null;
   }
@@ -112,15 +145,15 @@ public void setUp() throws IOException {
     
     // Setup mock behavior
     when(omMetadataManager.getStore()).thenReturn(rdbStore);
-
+    
when(omMetadataManager.getHierarchicalLockManager()).thenReturn(lockManager);
     this.snapshotsDir = tempDir.resolve("snapshots").toFile();
     FileUtils.deleteDirectory(snapshotsDir);
     assertTrue(snapshotsDir.exists() || snapshotsDir.mkdirs());
     File dbLocation = tempDir.resolve("db").toFile();
     FileUtils.deleteDirectory(dbLocation);
     assertTrue(dbLocation.exists() || dbLocation.mkdirs());
+    mockLockManager();
 
-    
     
when(rdbStore.getSnapshotsParentDir()).thenReturn(snapshotsDir.getAbsolutePath());
     when(rdbStore.getDbLocation()).thenReturn(dbLocation);
   }
@@ -135,6 +168,392 @@ public void tearDown() throws Exception {
     }
   }
 
+  private String getReadLockMessageAcquire(UUID snapshotId) {
+    return READ_LOCK_MESSAGE_ACQUIRE + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private String getReadLockMessageRelease(UUID snapshotId) {
+    return READ_LOCK_MESSAGE_UNLOCK + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private String getWriteLockMessageAcquire(UUID snapshotId) {
+    return WRITE_LOCK_MESSAGE_ACQUIRE + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private String getWriteLockMessageRelease(UUID snapshotId) {
+    return WRITE_LOCK_MESSAGE_UNLOCK + " " + 
FlatResource.SNAPSHOT_LOCAL_DATA_LOCK + " " + snapshotId;
+  }
+
+  private HierarchicalResourceLock getHierarchicalResourceLock(FlatResource 
resource, String key, boolean isWriteLock) {
+    return new HierarchicalResourceLock() {
+      @Override
+      public boolean isLockAcquired() {
+        return true;
+      }
+
+      @Override
+      public void close() {
+        if (isWriteLock) {
+          lockCapturor.add(WRITE_LOCK_MESSAGE_UNLOCK + " " + resource + " " + 
key);
+        } else {
+          lockCapturor.add(READ_LOCK_MESSAGE_UNLOCK + " " + resource + " " + 
key);
+        }
+      }
+    };
+  }
+
+  private void mockLockManager() throws IOException {
+    lockCapturor.clear();
+    reset(lockManager);
+    when(lockManager.acquireReadLock(any(FlatResource.class), anyString()))
+        .thenAnswer(i -> {
+          lockCapturor.add(READ_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0) 
+ " " + i.getArgument(1));
+          return getHierarchicalResourceLock(i.getArgument(0), 
i.getArgument(1), false);
+        });
+    when(lockManager.acquireWriteLock(any(FlatResource.class), anyString()))
+        .thenAnswer(i -> {
+          lockCapturor.add(WRITE_LOCK_MESSAGE_ACQUIRE + " " + i.getArgument(0) 
+ " " + i.getArgument(1));
+          return getHierarchicalResourceLock(i.getArgument(0), 
i.getArgument(1), true);
+        });
+  }
+
+  private List<UUID> createSnapshotLocalData(OmSnapshotLocalDataManager 
snapshotLocalDataManager,
+      int numberOfSnapshots) throws IOException {
+    SnapshotInfo previousSnapshotInfo = null;
+    int counter = 0;
+    Map<String, List<LiveFileMetaData>> liveFileMetaDataMap = new HashMap<>();
+    liveFileMetaDataMap.put(KEY_TABLE,
+        Lists.newArrayList(createMockLiveFileMetaData("file1.sst", KEY_TABLE, 
"key1", "key2")));
+    liveFileMetaDataMap.put(FILE_TABLE, 
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", FILE_TABLE, "key1",
+        "key2")));
+    liveFileMetaDataMap.put(DIRECTORY_TABLE, 
Lists.newArrayList(createMockLiveFileMetaData("file2.sst",
+        DIRECTORY_TABLE, "key1", "key2")));
+    liveFileMetaDataMap.put("col1", 
Lists.newArrayList(createMockLiveFileMetaData("file2.sst", "col1", "key1",
+        "key2")));
+    List<UUID> snapshotIds = new ArrayList<>();
+    for (int i = 0; i < numberOfSnapshots; i++) {
+      UUID snapshotId = UUID.randomUUID();
+      SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, 
previousSnapshotInfo == null ? null
+          : previousSnapshotInfo.getSnapshotId());
+      mockSnapshotStore(snapshotId, liveFileMetaDataMap.values().stream()
+          .flatMap(Collection::stream).collect(Collectors.toList()));
+      snapshotLocalDataManager.createNewOmSnapshotLocalDataFile(snapshotStore, 
snapshotInfo);
+      previousSnapshotInfo = snapshotInfo;
+      for (Map.Entry<String, List<LiveFileMetaData>> tableEntry : 
liveFileMetaDataMap.entrySet()) {
+        String table = tableEntry.getKey();
+        tableEntry.getValue().add(createMockLiveFileMetaData("file" + 
counter++ + ".sst", table, "key1", "key4"));
+      }
+      snapshotIds.add(snapshotId);
+    }
+    return snapshotIds;
+  }
+
+  private void mockSnapshotStore(UUID snapshotId, List<LiveFileMetaData> 
sstFiles) throws RocksDatabaseException {
+    // Setup snapshot store mock
+    File snapshotDbLocation = 
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotId).toFile();
+    assertTrue(snapshotDbLocation.exists() || snapshotDbLocation.mkdirs());
+
+    when(snapshotStore.getDbLocation()).thenReturn(snapshotDbLocation);
+    RocksDatabase rocksDatabase = mock(RocksDatabase.class);
+    when(snapshotStore.getDb()).thenReturn(rocksDatabase);
+    when(rocksDatabase.getLiveFilesMetaData()).thenReturn(sstFiles);
+  }
+
+  /**
+   * Checks lock orders taken i.e. while reading a snapshot against the 
previous snapshot.
+   * Depending on read or write locks are acquired on the snapshotId and read 
lock is acquired on the previous
+   * snapshot. Once the instance is closed the read lock on previous snapshot 
is released followed by releasing the
+   * lock on the snapshotId.
+   * @param read
+   * @throws IOException
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws 
IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = new ArrayList<>();
+    snapshotIds.add(null);
+    snapshotIds.addAll(createSnapshotLocalData(localDataManager, 20));
+    for (int start = 0; start < snapshotIds.size(); start++) {
+      for (int end = start + 1; end < snapshotIds.size(); end++) {
+        UUID startSnapshotId = snapshotIds.get(start);
+        UUID endSnapshotId = snapshotIds.get(end);
+        lockCapturor.clear();
+        int logCaptorIdx = 0;
+        try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+                 read ? localDataManager.getOmSnapshotLocalData(endSnapshotId, 
startSnapshotId) :
+                     
localDataManager.getWritableOmSnapshotLocalData(endSnapshotId, 
startSnapshotId)) {
+          OmSnapshotLocalData snapshotLocalData = 
omSnapshotLocalDataProvider.getSnapshotLocalData();
+          OmSnapshotLocalData previousSnapshot = 
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+          assertEquals(endSnapshotId, snapshotLocalData.getSnapshotId());
+          if (startSnapshotId == null) {
+            assertNull(previousSnapshot);
+            assertNull(snapshotLocalData.getPreviousSnapshotId());
+            continue;
+          }
+          assertEquals(startSnapshotId, previousSnapshot.getSnapshotId());
+          assertEquals(startSnapshotId, 
snapshotLocalData.getPreviousSnapshotId());
+          if (read) {
+            assertEquals(getReadLockMessageAcquire(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+          } else {
+            assertEquals(getWriteLockMessageAcquire(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+          }
+          int idx = end - 1;
+          UUID previousSnapId = snapshotIds.get(idx--);
+          assertEquals(getReadLockMessageAcquire(previousSnapId), 
lockCapturor.get(logCaptorIdx++));
+          while (idx >= start) {
+            UUID prevPrevSnapId = snapshotIds.get(idx--);
+            assertEquals(getReadLockMessageAcquire(prevPrevSnapId), 
lockCapturor.get(logCaptorIdx++));
+            assertEquals(getReadLockMessageRelease(previousSnapId), 
lockCapturor.get(logCaptorIdx++));
+            previousSnapId = prevPrevSnapId;
+          }
+        }
+        assertEquals(getReadLockMessageRelease(startSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+        if (read) {
+          assertEquals(getReadLockMessageRelease(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+        } else {
+          assertEquals(getWriteLockMessageRelease(endSnapshotId), 
lockCapturor.get(logCaptorIdx++));
+        }
+        assertEquals(lockCapturor.size(), logCaptorIdx);
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testVersionLockResolution(boolean read) throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
+    for (int snapIdx = 0; snapIdx < snapshotIds.size(); snapIdx++) {
+      UUID snapId = snapshotIds.get(snapIdx);
+      UUID expectedPreviousSnapId = snapIdx - 1 >= 0 ? snapshotIds.get(snapIdx 
- 1) : null;
+      lockCapturor.clear();
+      int logCaptorIdx = 0;
+      try (ReadableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+               read ? localDataManager.getOmSnapshotLocalData(snapId) :
+                   localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+        OmSnapshotLocalData snapshotLocalData = 
omSnapshotLocalDataProvider.getSnapshotLocalData();
+        OmSnapshotLocalData previousSnapshot = 
omSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+        assertEquals(snapId, snapshotLocalData.getSnapshotId());
+        assertEquals(expectedPreviousSnapId, previousSnapshot == null ? null :
+            previousSnapshot.getSnapshotId());
+        if (read) {
+          assertEquals(getReadLockMessageAcquire(snapId), 
lockCapturor.get(logCaptorIdx++));
+        } else {
+          assertEquals(getWriteLockMessageAcquire(snapId), 
lockCapturor.get(logCaptorIdx++));
+        }
+        if (expectedPreviousSnapId != null) {
+          assertEquals(getReadLockMessageAcquire(expectedPreviousSnapId), 
lockCapturor.get(logCaptorIdx++));
+        }
+      }
+      if (expectedPreviousSnapId != null) {
+        assertEquals(getReadLockMessageRelease(expectedPreviousSnapId), 
lockCapturor.get(logCaptorIdx++));
+      }
+      if (read) {
+        assertEquals(getReadLockMessageRelease(snapId), 
lockCapturor.get(logCaptorIdx++));
+      } else {
+        assertEquals(getWriteLockMessageRelease(snapId), 
lockCapturor.get(logCaptorIdx++));
+      }
+      assertEquals(lockCapturor.size(), logCaptorIdx);
+    }
+  }
+
+  @Test
+  public void 
testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExisting() 
throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
+    UUID snapId = snapshotIds.get(1);
+    try (WritableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
+             localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+      OmSnapshotLocalData snapshotLocalData = 
omSnapshotLocalDataProvider.getSnapshotLocalData();
+      
snapshotLocalData.addVersionSSTFileInfos(Lists.newArrayList(createMockLiveFileMetaData("file1.sst",
 KEY_TABLE,
+          "key1", "key2")), 3);
+
+      IOException ex = assertThrows(IOException.class, 
omSnapshotLocalDataProvider::commit);
+      assertTrue(ex.getMessage().contains("since previous snapshot with 
version hasn't been loaded"));
+    }
+  }
+
+  @Test
+  public void testAddVersionFromRDB() throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 2);
+    addVersionsToLocalData(localDataManager, snapshotIds.get(0), 
ImmutableMap.of(4, 5, 6, 8));
+    UUID snapId = snapshotIds.get(1);
+    List<LiveFileMetaData> newVersionSstFiles =
+        Lists.newArrayList(createMockLiveFileMetaData("file5.sst", KEY_TABLE, 
"key1", "key2"),
+        createMockLiveFileMetaData("file6.sst", FILE_TABLE, "key1", "key2"),
+        createMockLiveFileMetaData("file7.sst", KEY_TABLE, "key1", "key2"),
+        createMockLiveFileMetaData("file1.sst", "col1", "key1", "key2"));
+    try (WritableOmSnapshotLocalDataProvider snap =
+             localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+      mockSnapshotStore(snapId, newVersionSstFiles);
+      snap.addSnapshotVersion(snapshotStore);
+      snap.commit();
+    }
+    validateVersions(localDataManager, snapId, 1, Sets.newHashSet(0, 1));
+    try (ReadableOmSnapshotLocalDataProvider snap = 
localDataManager.getOmSnapshotLocalData(snapId)) {
+      OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+      OmSnapshotLocalData.VersionMeta versionMeta = 
snapshotLocalData.getVersionSstFileInfos().get(1);
+      assertEquals(6, versionMeta.getPreviousSnapshotVersion());
+      List<SstFileInfo> expectedLiveFileMetaData =
+          newVersionSstFiles.subList(0, 
3).stream().map(SstFileInfo::new).collect(Collectors.toList());
+      assertEquals(expectedLiveFileMetaData, versionMeta.getSstFiles());
+    }
+  }
+
+  private void validateVersions(OmSnapshotLocalDataManager 
snapshotLocalDataManager, UUID snapId, int expectedVersion,
+      Set<Integer> expectedVersions) throws IOException {
+    try (ReadableOmSnapshotLocalDataProvider snap = 
snapshotLocalDataManager.getOmSnapshotLocalData(snapId)) {
+      assertEquals(expectedVersion, snap.getSnapshotLocalData().getVersion());
+      assertEquals(expectedVersions, 
snap.getSnapshotLocalData().getVersionSstFileInfos().keySet());
+    }
+  }
+
+  /**
+   * Validates write-time version propagation and removal rules when the 
previous
+   * snapshot already has a concrete version recorded.
+   *
+   * Test flow:
+   * 1) Create two snapshots in a chain: {@code prevSnapId -> snapId}.
+   * 2) For {@code prevSnapId}: set {@code version=3} and add SST metadata for 
version {@code 0}; commit.
+   * 3) For {@code snapId}: set {@code version=4} and add SST metadata for 
version {@code 4}; commit.
+   *    After commit, versions resolve to {@code prev.version=4} and {@code 
snap.version=5}, and their
+   *    version maps are {@code {0,4}} and {@code {0,5}} respectively (base 
version 0 plus the current one).
+   * 4) If {@code nextVersionExisting} is {@code true}:
+   *    - Attempt to remove version {@code 4} from {@code prevSnapId}; expect 
{@link IOException} because
+   *      the successor snapshot still exists at version {@code 5} and depends 
on {@code prevSnapId}.
+   *    - Validate that versions and version maps remain unchanged.
+   *    Else ({@code false}):
+   *    - Remove version {@code 5} from {@code snapId} and commit, then remove 
version {@code 4} from
+   *      {@code prevSnapId} and commit.
+   *    - Validate that both snapshots now only contain the base version 
{@code 0}.
+   *
+   * This ensures a snapshot cannot drop a version that still has a dependent 
successor, and that removals
+   * are allowed only after dependents are cleared.
+   *
+   * @param nextVersionExisting whether the successor snapshot's version still 
exists ({@code true}) or is
+   *                            removed first ({@code false})
+   * @throws IOException if commit validation fails as expected in the 
protected case
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWriteVersionValidation(boolean nextVersionExisting) throws 
IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 3);
+    UUID prevSnapId = snapshotIds.get(0);
+    UUID snapId = snapshotIds.get(1);
+    UUID nextSnapId = snapshotIds.get(2);
+    addVersionsToLocalData(localDataManager, prevSnapId, ImmutableMap.of(4, 
1));
+    addVersionsToLocalData(localDataManager, snapId, ImmutableMap.of(5, 4));
+    addVersionsToLocalData(localDataManager, nextSnapId, ImmutableMap.of(6, 
0));
+
+    validateVersions(localDataManager, snapId, 5, Sets.newHashSet(0, 5));
+    validateVersions(localDataManager, prevSnapId, 4, Sets.newHashSet(0, 4));
+
+    if (nextVersionExisting) {
+      try (WritableOmSnapshotLocalDataProvider prevSnap = 
localDataManager.getWritableOmSnapshotLocalData(prevSnapId)) {
+        prevSnap.removeVersion(4);
+        IOException ex = assertThrows(IOException.class, prevSnap::commit);
+        assertTrue(ex.getMessage().contains("Cannot remove Snapshot " + 
prevSnapId + " with version : 4 since it " +
+            "still has predecessors"));
+      }
+      validateVersions(localDataManager, snapId, 5, Sets.newHashSet(0, 5));
+      validateVersions(localDataManager, prevSnapId, 4, Sets.newHashSet(0, 4));
+    } else {
+      try (WritableOmSnapshotLocalDataProvider snap = 
localDataManager.getWritableOmSnapshotLocalData(snapId)) {
+        snap.removeVersion(5);
+        snap.commit();
+      }
+
+      try (WritableOmSnapshotLocalDataProvider prevSnap = 
localDataManager.getWritableOmSnapshotLocalData(prevSnapId)) {
+        prevSnap.removeVersion(4);
+        prevSnap.commit();
+      }
+      validateVersions(localDataManager, snapId, 5, Sets.newHashSet(0));
+      validateVersions(localDataManager, prevSnapId, 4, Sets.newHashSet(0));
+      // Check next snapshot is able to resolve to previous snapshot.
+      try (ReadableOmSnapshotLocalDataProvider nextSnap = 
localDataManager.getOmSnapshotLocalData(nextSnapId,
+          prevSnapId)) {
+        OmSnapshotLocalData snapshotLocalData = 
nextSnap.getSnapshotLocalData();
+        assertEquals(prevSnapId, snapshotLocalData.getPreviousSnapshotId());
+        snapshotLocalData.getVersionSstFileInfos()
+            .forEach((version, versionMeta) -> {
+              assertEquals(0, versionMeta.getPreviousSnapshotVersion());
+            });
+      }
+    }
+  }
+
+  private void addVersionsToLocalData(OmSnapshotLocalDataManager 
snapshotLocalDataManager,
+      UUID snapId, Map<Integer, Integer> versionMap) throws IOException {
+    try (WritableOmSnapshotLocalDataProvider snap = 
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapId)) {
+      OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+      for (Map.Entry<Integer, Integer> version : versionMap.entrySet().stream()
+          .sorted(Map.Entry.comparingByKey()).collect(Collectors.toList())) {
+        snapshotLocalData.setVersion(version.getKey() - 1);
+        
snapshotLocalData.addVersionSSTFileInfos(ImmutableList.of(createMockLiveFileMetaData("file"
 + version +
+            ".sst", KEY_TABLE, "key1", "key2")), version.getValue());
+      }
+      mockSnapshotStore(snapId, 
ImmutableList.of(createMockLiveFileMetaData("file"
+          + snapshotLocalData.getVersion() + 1 + ".sst", KEY_TABLE, "key1", 
"key2")));
+      snap.addSnapshotVersion(snapshotStore);
+      snap.removeVersion(snapshotLocalData.getVersion());
+      snapshotLocalData.setVersion(snapshotLocalData.getVersion() - 1);
+      snap.commit();
+    }
+    try (ReadableOmSnapshotLocalDataProvider snap = 
snapshotLocalDataManager.getOmSnapshotLocalData(snapId)) {
+      OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+      for (int version : versionMap.keySet()) {
+        
assertTrue(snapshotLocalData.getVersionSstFileInfos().containsKey(version));
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testVersionResolution(boolean read) throws IOException {
+    localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+    List<UUID> snapshotIds = createSnapshotLocalData(localDataManager, 5);
+    List<Map<Integer, Integer>> versionMaps = Arrays.asList(
+        ImmutableMap.of(4, 1, 6, 3, 8, 9, 11, 15),
+        ImmutableMap.of(5, 4, 6, 8, 10, 11),
+        ImmutableMap.of(1, 5, 3, 5, 8, 10),
+        ImmutableMap.of(1, 1, 2, 3, 5, 8),
+        ImmutableMap.of(1, 1, 11, 2, 20, 5, 30, 2)
+    );
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      addVersionsToLocalData(localDataManager, snapshotIds.get(i), 
versionMaps.get(i));
+    }
+    for (int start = 0; start < snapshotIds.size(); start++) {
+      for (int end = start + 1; end < snapshotIds.size(); end++) {
+        UUID prevSnapId = snapshotIds.get(start);
+        UUID snapId = snapshotIds.get(end);
+        Map<Integer, Integer> versionMap = new HashMap<>(versionMaps.get(end));
+        versionMap.put(0, 0);
+        for (int idx = end - 1; idx > start; idx--) {
+          for (Map.Entry<Integer, Integer> version : versionMap.entrySet()) {
+            
version.setValue(versionMaps.get(idx).getOrDefault(version.getValue(), 0));
+          }
+        }
+        try (ReadableOmSnapshotLocalDataProvider snap = read ?
+            localDataManager.getOmSnapshotLocalData(snapId, prevSnapId) :
+            localDataManager.getWritableOmSnapshotLocalData(snapId, 
prevSnapId)) {
+          OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+          OmSnapshotLocalData prevSnapshotLocalData = 
snap.getPreviousSnapshotLocalData();
+          assertEquals(prevSnapshotLocalData.getSnapshotId(), 
snapshotLocalData.getPreviousSnapshotId());
+          assertEquals(prevSnapId, snapshotLocalData.getPreviousSnapshotId());
+          assertEquals(snapId, snapshotLocalData.getSnapshotId());
+          assertTrue(snapshotLocalData.getVersionSstFileInfos().size() > 1);
+          snapshotLocalData.getVersionSstFileInfos()
+              .forEach((version, versionMeta) -> {
+                assertEquals(versionMap.get(version), 
versionMeta.getPreviousSnapshotVersion());
+              });
+        }
+      }
+    }
+  }
+
   @Test
   public void testConstructor() throws IOException {
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
@@ -147,7 +566,7 @@ public void 
testGetSnapshotLocalPropertyYamlPathWithSnapshotInfo() throws IOExce
     SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, null);
     
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     File yamlPath = new 
File(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
     assertNotNull(yamlPath);
     Path expectedYamlPath = Paths.get(snapshotsDir.getAbsolutePath(), "db" + 
OM_SNAPSHOT_SEPARATOR + snapshotId
@@ -178,15 +597,18 @@ public void testCreateNewOmSnapshotLocalDataFile() throws 
IOException {
     when(snapshotStore.getDb()).thenReturn(rocksDatabase);
     when(rocksDatabase.getLiveFilesMetaData()).thenReturn(sstFiles);
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     localDataManager.createNewOmSnapshotLocalDataFile(snapshotStore, 
snapshotInfo);
     
     // Verify file was created
-    OmSnapshotLocalData snapshotLocalData = 
localDataManager.getOmSnapshotLocalData(snapshotId);
-    assertEquals(1, snapshotLocalData.getVersionSstFileInfos().size());
-    OmSnapshotLocalData.VersionMeta versionMeta = 
snapshotLocalData.getVersionSstFileInfos().get(0);
-    OmSnapshotLocalData.VersionMeta expectedVersionMeta = new 
OmSnapshotLocalData.VersionMeta(0, sstFileInfos);
-    assertEquals(expectedVersionMeta, versionMeta);
+    OmSnapshotLocalData.VersionMeta versionMeta;
+    try (ReadableOmSnapshotLocalDataProvider snapshotLocalData = 
localDataManager.getOmSnapshotLocalData(snapshotId)) {
+      assertEquals(1, 
snapshotLocalData.getSnapshotLocalData().getVersionSstFileInfos().size());
+      versionMeta = 
snapshotLocalData.getSnapshotLocalData().getVersionSstFileInfos().get(0);
+      OmSnapshotLocalData.VersionMeta expectedVersionMeta =
+          new OmSnapshotLocalData.VersionMeta(0, sstFileInfos);
+      assertEquals(expectedVersionMeta, versionMeta);
+    }
   }
 
   @Test
@@ -198,16 +620,16 @@ public void testGetOmSnapshotLocalDataWithSnapshotInfo() 
throws IOException {
     OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
     
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     // Write the file manually for testing
     Path yamlPath = 
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo.getSnapshotId()));
     writeLocalDataToFile(localData, yamlPath);
     
     // Test retrieval
-    OmSnapshotLocalData retrieved = 
localDataManager.getOmSnapshotLocalData(snapshotInfo);
-    
-    assertNotNull(retrieved);
-    assertEquals(snapshotId, retrieved.getSnapshotId());
+    try (ReadableOmSnapshotLocalDataProvider retrieved = 
localDataManager.getOmSnapshotLocalData(snapshotInfo)) {
+      assertNotNull(retrieved.getSnapshotLocalData());
+      assertEquals(snapshotId, 
retrieved.getSnapshotLocalData().getSnapshotId());
+    }
   }
 
   @Test
@@ -219,7 +641,7 @@ public void 
testGetOmSnapshotLocalDataWithMismatchedSnapshotId() throws IOExcept
     OmSnapshotLocalData localData = createMockLocalData(wrongSnapshotId, null);
     
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     Path yamlPath = 
Paths.get(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotId));
     writeLocalDataToFile(localData, yamlPath);
     // Should throw IOException due to mismatched IDs
@@ -235,7 +657,7 @@ public void testGetOmSnapshotLocalDataWithFile() throws 
IOException {
     OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
     
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     Path yamlPath = tempDir.resolve("test-snapshot.yaml");
     writeLocalDataToFile(localData, yamlPath);
     
@@ -269,7 +691,7 @@ public void testAddVersionNodeWithDependentsAlreadyExists() 
throws IOException {
     createSnapshotLocalDataFile(snapshotId, null);
     
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     OmSnapshotLocalData localData = createMockLocalData(snapshotId, null);
     
     // First addition
@@ -291,7 +713,7 @@ public void testInitWithExistingYamlFiles() throws 
IOException {
     
     // Initialize - should load existing files
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     assertNotNull(localDataManager);
     Map<UUID, OmSnapshotLocalDataManager.SnapshotVersionsMeta> versionMap =
         localDataManager.getVersionNodeMap();
@@ -317,7 +739,7 @@ public void testInitWithInvalidPathThrowsException() throws 
IOException {
   @Test
   public void testClose() throws IOException {
     localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
-    
+
     // Should not throw exception
     localDataManager.close();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to