This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d2e7b23e570 HDDS-13767. Refactor SnapshotLocalDataYaml related code 
into OmSnapshotLocalDataManager (#9124)
d2e7b23e570 is described below

commit d2e7b23e570b6fef853624889a919face36115e0
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Oct 13 22:42:30 2025 -0400

    HDDS-13767. Refactor SnapshotLocalDataYaml related code into 
OmSnapshotLocalDataManager (#9124)
---
 hadoop-hdds/common/pom.xml                         |   4 +
 .../apache/hadoop/ozone/util/ObjectSerializer.java |  73 ++++++++++
 .../org/apache/hadoop/ozone/util/WithChecksum.java |  28 ++++
 .../apache/hadoop/ozone/util/YamlSerializer.java   | 159 +++++++++++++++++++++
 .../om/OMDBCheckpointServletInodeBasedXfer.java    |   3 +-
 .../hadoop/ozone/om/OmSnapshotLocalData.java       |  10 +-
 .../hadoop/ozone/om/OmSnapshotLocalDataYaml.java   | 141 +-----------------
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  75 ++--------
 .../response/snapshot/OMSnapshotPurgeResponse.java |  13 +-
 .../om/snapshot/OmSnapshotLocalDataManager.java    | 116 +++++++++++++++
 .../ozone/om/TestOmSnapshotLocalDataYaml.java      |  59 ++++----
 .../hadoop/ozone/om/TestOmSnapshotManager.java     |  14 +-
 .../TestOMSnapshotPurgeRequestAndResponse.java     |   6 +-
 .../snapshot/TestOMSnapshotCreateResponse.java     |  20 +--
 .../snapshot/TestOMSnapshotDeleteResponse.java     |  20 +--
 15 files changed, 462 insertions(+), 279 deletions(-)

diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index e75996a62da..6fdf1dffa45 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -108,6 +108,10 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-pool2</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ObjectSerializer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ObjectSerializer.java
new file mode 100644
index 00000000000..eaf42c37679
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ObjectSerializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Represents a generic interface for serialization and deserialization
+ * operations of objects that extend the {@link WithChecksum} interface.
+ * This interface provides functionality for loading and saving objects
+ * from/to files or input streams, as well as verifying checksum integrity.
+ *
+ * @param <T> the type of the object handled by the serializer, must extend 
{@code Checksum}
+ */
+public interface ObjectSerializer<T extends WithChecksum> extends Closeable {
+
+  /**
+   * Loads an object of type T from the specified file.
+   *
+   * @param path the file from which the object will be loaded
+   * @return the object of type T that has been deserialized from the file
+   * @throws IOException if an I/O error occurs during reading from the file
+   */
+  T load(File path) throws IOException;
+
+  /**
+   * Loads an object of type T from the specified input stream.
+   *
+   * @param inputStream the input stream from which the object will be 
deserialized
+   * @return the deserialized object of type T
+   * @throws IOException if an I/O error occurs during reading from the input 
stream
+   */
+  T load(InputStream inputStream) throws IOException;
+
+  /**
+   * Serializes the given data object of type T and saves it to the specified 
file.
+   *
+   * @param path the file where the serialized object will be saved
+   * @param data the object of type T to be serialized and saved
+   * @throws IOException if an I/O error occurs during writing to the file
+   */
+  void save(File path, T data) throws IOException;
+
+  /**
+   * Verifies the checksum of the provided data object of type T.
+   *
+   * @param data the object of type T whose checksum is to be verified
+   * @return true if the checksum of the data is valid, false otherwise
+   * @throws IOException if an I/O error occurs during verification
+   */
+  boolean verifyChecksum(T data) throws IOException;
+
+  @Override
+  void close() throws IOException;
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/WithChecksum.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/WithChecksum.java
new file mode 100644
index 00000000000..45f31dfba1a
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/WithChecksum.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.util;
+
+import org.apache.hadoop.hdds.utils.db.CopyObject;
+
+/**
+ * Represents a generic interface for objects capable of generating or 
providing
+ * a checksum value.
+ */
+public interface WithChecksum<T extends WithChecksum<T>> extends CopyObject<T> 
{
+  String getChecksum();
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/YamlSerializer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/YamlSerializer.java
new file mode 100644
index 00000000000..11e43383f8e
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/YamlSerializer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.util;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.hadoop.hdds.server.YamlUtils;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * An abstract serializer for objects that extend the {@link WithChecksum} 
interface.
+ * This class provides mechanisms for serializing and deserializing objects
+ * in a YAML format.
+ */
+public abstract class YamlSerializer<T extends WithChecksum<T>> implements 
ObjectSerializer<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(YamlSerializer.class);
+
+  private final GenericObjectPool<Yaml> yamlPool;
+
+  public YamlSerializer(BasePooledObjectFactory<Yaml> yamlFactory) {
+    this.yamlPool = new GenericObjectPool<>(yamlFactory);
+  }
+
+  private UncheckedAutoCloseableSupplier<Yaml> getYaml() throws IOException {
+    try {
+      Yaml yaml = yamlPool.borrowObject();
+      return new UncheckedAutoCloseableSupplier<Yaml>() {
+
+        @Override
+        public void close() {
+          yamlPool.returnObject(yaml);
+        }
+
+        @Override
+        public Yaml get() {
+          return yaml;
+        }
+      };
+    } catch (Exception e) {
+      throw new IOException("Failed to get yaml object.", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public T load(File yamlFile) throws IOException {
+    Preconditions.checkNotNull(yamlFile, "yamlFile cannot be null");
+    try (InputStream inputFileStream = 
Files.newInputStream(yamlFile.toPath())) {
+      return load(inputFileStream);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public T load(InputStream input) throws IOException {
+    T dataYaml;
+    try (UncheckedAutoCloseableSupplier<Yaml> yaml = getYaml()) {
+      dataYaml = yaml.get().load(input);
+    } catch (Exception e) {
+      throw new IOException("Failed to load file", e);
+    }
+
+    if (dataYaml == null) {
+      // If Yaml#load returned null, then the file is empty. This is valid yaml
+      // but considered an error in this case since we have lost data about
+      // the snapshot.
+      throw new IOException("Failed to load file. File is empty.");
+    }
+
+    return dataYaml;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean verifyChecksum(T data) throws IOException {
+    Preconditions.checkNotNull(data, "data cannot be null");
+
+    // Get the stored checksum
+    String storedChecksum = data.getChecksum();
+    if (storedChecksum == null) {
+      LOG.warn("No checksum found in snapshot data for verification");
+      return false;
+    }
+
+    // Create a copy of the snapshot data for computing checksum
+    T copy = data.copyObject();
+
+    // Get the YAML representation
+    try (UncheckedAutoCloseableSupplier<Yaml> yaml = getYaml()) {
+      // Compute new checksum
+      computeAndSetChecksum(yaml.get(), copy);
+
+      // Compare the stored and computed checksums
+      String computedChecksum = copy.getChecksum();
+      boolean isValid = storedChecksum.equals(computedChecksum);
+
+      if (!isValid) {
+        LOG.warn("Checksum verification failed for snapshot local data. " +
+            "Stored: {}, Computed: {}", storedChecksum, computedChecksum);
+      }
+      return isValid;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void save(File yamlFile, T data) throws IOException {
+    // Create Yaml
+    try (UncheckedAutoCloseableSupplier<Yaml> yaml = getYaml()) {
+      // Compute Checksum and update SnapshotData
+      computeAndSetChecksum(yaml.get(), data);
+      // Write the object with checksum to Yaml file.
+      YamlUtils.dump(yaml.get(), data, yamlFile, LOG);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    yamlPool.close();
+  }
+
+  public abstract void computeAndSetChecksum(Yaml yaml, T data) throws 
IOException;
+
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index 1acd9593c82..8a58ed6aa76 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -67,6 +67,7 @@
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
@@ -298,7 +299,7 @@ private void transferSnapshotData(Set<String> 
sstFilesToExclude, Path tmpdir, Se
         writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, 
archiveOutputStream, tmpdir,
             hardLinkFileMap, false);
         Path snapshotLocalPropertyYaml = Paths.get(
-            OmSnapshotManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
+            
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
         if (Files.exists(snapshotLocalPropertyYaml)) {
           File yamlFile = snapshotLocalPropertyYaml.toFile();
           hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
index 5f65fd4c0d0..83ad02fb14b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java
@@ -31,6 +31,7 @@
 import java.util.stream.Collectors;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.utils.db.CopyObject;
+import org.apache.hadoop.ozone.util.WithChecksum;
 import org.apache.ozone.compaction.log.SstFileInfo;
 import org.rocksdb.LiveFileMetaData;
 import org.yaml.snakeyaml.Yaml;
@@ -39,8 +40,7 @@
  * OmSnapshotLocalData is the in-memory representation of snapshot local 
metadata.
  * Inspired by org.apache.hadoop.ozone.container.common.impl.ContainerData
  */
-public abstract class OmSnapshotLocalData {
-
+public class OmSnapshotLocalData implements WithChecksum<OmSnapshotLocalData> {
   // Unique identifier for the snapshot. This is used to identify the snapshot.
   private UUID snapshotId;
 
@@ -193,6 +193,7 @@ public void addVersionSSTFileInfos(List<SstFileInfo> 
sstFiles, int previousSnaps
    * Returns the checksum of the YAML representation.
    * @return checksum
    */
+  @Override
   public String getChecksum() {
     return checksum;
   }
@@ -258,6 +259,11 @@ public void setVersion(int version) {
     this.version = version;
   }
 
+  @Override
+  public OmSnapshotLocalData copyObject() {
+    return new OmSnapshotLocalData(this);
+  }
+
   /**
    * Represents metadata for a specific version in a snapshot.
    * This class maintains the version of the previous snapshot and a list of 
SST (Sorted String Table) files
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
index 1d4fedfacaa..543c4c6397c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
@@ -17,11 +17,6 @@
 
 package org.apache.hadoop.ozone.om;
 
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -29,20 +24,15 @@
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.hadoop.hdds.server.YamlUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData.VersionMeta;
 import org.apache.ozone.compaction.log.SstFileInfo;
-import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
-import org.rocksdb.LiveFileMetaData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.DumperOptions;
 import org.yaml.snakeyaml.LoaderOptions;
 import org.yaml.snakeyaml.TypeDescription;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.AbstractConstruct;
 import org.yaml.snakeyaml.constructor.SafeConstructor;
-import org.yaml.snakeyaml.error.YAMLException;
 import org.yaml.snakeyaml.introspector.BeanAccess;
 import org.yaml.snakeyaml.introspector.Property;
 import org.yaml.snakeyaml.introspector.PropertyUtils;
@@ -59,67 +49,14 @@
  * Checksum of the YAML fields are computed and stored in the YAML file 
transparently to callers.
  * Inspired by org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml
  */
-public final class OmSnapshotLocalDataYaml extends OmSnapshotLocalData {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(OmSnapshotLocalDataYaml.class);
+public final class OmSnapshotLocalDataYaml {
 
   public static final Tag SNAPSHOT_YAML_TAG = new Tag("OmSnapshotLocalData");
   public static final Tag SNAPSHOT_VERSION_META_TAG = new Tag("VersionMeta");
   public static final Tag SST_FILE_INFO_TAG = new Tag("SstFileInfo");
+  public static final String YAML_FILE_EXTENSION = ".yaml";
 
-  /**
-   * Creates a new OmSnapshotLocalDataYaml with default values.
-   */
-  public OmSnapshotLocalDataYaml(UUID snapshotId, List<LiveFileMetaData> 
liveFileMetaDatas, UUID previousSnapshotId) {
-    super(snapshotId, liveFileMetaDatas, previousSnapshotId);
-  }
-
-  /**
-   * Copy constructor to create a deep copy.
-   * @param source The source OmSnapshotLocalData to copy from
-   */
-  public OmSnapshotLocalDataYaml(OmSnapshotLocalData source) {
-    super(source);
-  }
-
-  /**
-   * Verifies the checksum of the snapshot data.
-   * @param snapshotData The snapshot data to verify
-   * @return true if the checksum is valid, false otherwise
-   * @throws IOException if there's an error computing the checksum
-   */
-  public static boolean verifyChecksum(OmSnapshotManager snapshotManager, 
OmSnapshotLocalData snapshotData)
-      throws IOException {
-    Preconditions.checkNotNull(snapshotData, "snapshotData cannot be null");
-
-    // Get the stored checksum
-    String storedChecksum = snapshotData.getChecksum();
-    if (storedChecksum == null) {
-      LOG.warn("No checksum found in snapshot data for verification");
-      return false;
-    }
-
-    // Create a copy of the snapshot data for computing checksum
-    OmSnapshotLocalDataYaml snapshotDataCopy = new 
OmSnapshotLocalDataYaml(snapshotData);
-
-    // Clear the existing checksum in the copy
-    snapshotDataCopy.setChecksum(null);
-
-    // Get the YAML representation
-    try (UncheckedAutoCloseableSupplier<Yaml> yaml = 
snapshotManager.getSnapshotLocalYaml()) {
-      // Compute new checksum
-      snapshotDataCopy.computeAndSetChecksum(yaml.get());
-
-      // Compare the stored and computed checksums
-      String computedChecksum = snapshotDataCopy.getChecksum();
-      boolean isValid = storedChecksum.equals(computedChecksum);
-
-      if (!isValid) {
-        LOG.warn("Checksum verification failed for snapshot local data. " +
-            "Stored: {}, Computed: {}", storedChecksum, computedChecksum);
-      }
-      return isValid;
-    }
+  private OmSnapshotLocalDataYaml() {
   }
 
   /**
@@ -129,7 +66,7 @@ private static class OmSnapshotLocalDataRepresenter extends 
Representer {
 
     OmSnapshotLocalDataRepresenter(DumperOptions options) {
       super(options);
-      this.addClassTag(OmSnapshotLocalDataYaml.class, SNAPSHOT_YAML_TAG);
+      this.addClassTag(OmSnapshotLocalData.class, SNAPSHOT_YAML_TAG);
       this.addClassTag(VersionMeta.class, SNAPSHOT_VERSION_META_TAG);
       this.addClassTag(SstFileInfo.class, SST_FILE_INFO_TAG);
       representers.put(SstFileInfo.class, new RepresentSstFileInfo());
@@ -190,7 +127,7 @@ private static class SnapshotLocalDataConstructor extends 
SafeConstructor {
       this.yamlConstructors.put(SNAPSHOT_YAML_TAG, new 
ConstructSnapshotLocalData());
       this.yamlConstructors.put(SNAPSHOT_VERSION_META_TAG, new 
ConstructVersionMeta());
       this.yamlConstructors.put(SST_FILE_INFO_TAG, new ConstructSstFileInfo());
-      TypeDescription omDesc = new 
TypeDescription(OmSnapshotLocalDataYaml.class);
+      TypeDescription omDesc = new TypeDescription(OmSnapshotLocalData.class);
       omDesc.putMapPropertyType(OzoneConsts.OM_SLD_VERSION_SST_FILE_INFO, 
Integer.class, VersionMeta.class);
       this.addTypeDescription(omDesc);
       TypeDescription versionMetaDesc = new TypeDescription(VersionMeta.class);
@@ -229,7 +166,7 @@ public Object construct(Node node) {
         Map<Object, Object> nodes = constructMapping(mnode);
         UUID snapId = UUID.fromString((String) 
nodes.get(OzoneConsts.OM_SLD_SNAP_ID));
         UUID prevSnapId = UUID.fromString((String) 
nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID));
-        OmSnapshotLocalDataYaml snapshotLocalData = new 
OmSnapshotLocalDataYaml(snapId, Collections.emptyList(),
+        OmSnapshotLocalData snapshotLocalData = new 
OmSnapshotLocalData(snapId, Collections.emptyList(),
             prevSnapId);
 
         // Set version from YAML
@@ -267,70 +204,6 @@ public Object construct(Node node) {
     }
   }
 
-  /**
-   * Returns the YAML representation of this object as a String
-   * (without triggering checksum computation or persistence).
-   * @return YAML string representation
-   */
-  public String getYaml(OmSnapshotManager snapshotManager) throws IOException {
-    try (UncheckedAutoCloseableSupplier<Yaml> yaml = 
snapshotManager.getSnapshotLocalYaml()) {
-      return yaml.get().dump(this);
-    }
-  }
-
-  /**
-   * Computes checksum (stored in this object), and writes this object to a 
YAML file.
-   * @param yamlFile The file to write to
-   * @throws IOException If there's an error writing to the file
-   */
-  public void writeToYaml(OmSnapshotManager snapshotManager, File yamlFile) 
throws IOException {
-    // Create Yaml
-    try (UncheckedAutoCloseableSupplier<Yaml> yaml = 
snapshotManager.getSnapshotLocalYaml()) {
-      // Compute Checksum and update SnapshotData
-      computeAndSetChecksum(yaml.get());
-      // Write the SnapshotData with checksum to Yaml file.
-      YamlUtils.dump(yaml.get(), this, yamlFile, LOG);
-    }
-  }
-
-  /**
-   * Creates a OmSnapshotLocalDataYaml instance from a YAML file.
-   * @param yamlFile The YAML file to read from
-   * @return A new OmSnapshotLocalDataYaml instance
-   * @throws IOException If there's an error reading the file
-   */
-  public static OmSnapshotLocalDataYaml getFromYamlFile(OmSnapshotManager 
snapshotManager, File yamlFile)
-      throws IOException {
-    Preconditions.checkNotNull(yamlFile, "yamlFile cannot be null");
-    try (InputStream inputFileStream = 
Files.newInputStream(yamlFile.toPath())) {
-      return getFromYamlStream(snapshotManager, inputFileStream);
-    }
-  }
-
-  /**
-   * Read the YAML content InputStream, and return OmSnapshotLocalDataYaml 
instance.
-   * @throws IOException
-   */
-  public static OmSnapshotLocalDataYaml getFromYamlStream(OmSnapshotManager 
snapshotManager,
-      InputStream input) throws IOException {
-    OmSnapshotLocalDataYaml dataYaml;
-    try (UncheckedAutoCloseableSupplier<Yaml> yaml = 
snapshotManager.getSnapshotLocalYaml()) {
-      dataYaml = yaml.get().load(input);
-    } catch (YAMLException ex) {
-      // Unchecked exception. Convert to IOException
-      throw new IOException(ex);
-    }
-
-    if (dataYaml == null) {
-      // If Yaml#load returned null, then the file is empty. This is valid yaml
-      // but considered an error in this case since we have lost data about
-      // the snapshot.
-      throw new IOException("Failed to load snapshot file. File is empty.");
-    }
-
-    return dataYaml;
-  }
-
   /**
    * Factory class for constructing and pooling instances of the Yaml object.
    * This class extends BasePooledObjectFactory to support object pooling,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index d531f95c46b..19fe367bb92 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -82,7 +82,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
@@ -102,6 +101,7 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
@@ -117,7 +117,6 @@
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
 /**
  * This class is used to manage/create OM snapshots.
@@ -186,7 +185,7 @@ public final class OmSnapshotManager implements 
AutoCloseable {
   private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
   private final List<ColumnFamilyHandle> columnFamilyHandles;
   private final SnapshotDiffCleanupService snapshotDiffCleanupService;
-  private final GenericObjectPool<Yaml> yamlPool;
+  private final OmSnapshotLocalDataManager snapshotLocalDataManager;
 
   private final int maxPageSize;
 
@@ -197,7 +196,7 @@ public final class OmSnapshotManager implements 
AutoCloseable {
   private final AtomicInteger inFlightSnapshotCount = new AtomicInteger(0);
 
   public OmSnapshotManager(OzoneManager ozoneManager) {
-    this.yamlPool = new GenericObjectPool<>(new 
OmSnapshotLocalDataYaml.YamlFactory());
+    this.snapshotLocalDataManager = new 
OmSnapshotLocalDataManager(ozoneManager.getMetadataManager());
     boolean isFilesystemSnapshotEnabled =
         ozoneManager.isFilesystemSnapshotEnabled();
     LOG.info("Ozone filesystem snapshot feature is {}.",
@@ -517,11 +516,12 @@ public static DBCheckpoint createOmSnapshotCheckpoint(
     }
     OmSnapshotManager omSnapshotManager =
         ((OmMetadataManagerImpl) 
omMetadataManager).getOzoneManager().getOmSnapshotManager();
+    OmSnapshotLocalDataManager snapshotLocalDataManager = 
omSnapshotManager.getSnapshotLocalDataManager();
     OzoneConfiguration configuration = ((OmMetadataManagerImpl) 
omMetadataManager).getOzoneManager().getConfiguration();
     try (OmMetadataManagerImpl checkpointMetadataManager =
              
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
dbCheckpoint)) {
       // Create the snapshot local property file.
-      OmSnapshotManager.createNewOmSnapshotLocalDataFile(omSnapshotManager,
+      snapshotLocalDataManager.createNewOmSnapshotLocalDataFile(
           (RDBStore) checkpointMetadataManager.getStore(), snapshotInfo);
     }
 
@@ -628,28 +628,12 @@ private static void 
deleteKeysFromDelKeyTableInSnapshotScope(
    * @param store AOS or snapshot DB for not defragged or defragged snapshot 
respectively.
    * @return a Map of (table, set of SST files corresponding to the table)
    */
-  private static List<LiveFileMetaData> getSnapshotSSTFileList(RDBStore store)
-      throws IOException {
+  public static List<LiveFileMetaData> getSnapshotSSTFileList(RDBStore store) 
throws IOException {
     return store.getDb().getLiveFilesMetaData().stream()
         .filter(lfm -> 
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
         .collect(Collectors.toList());
   }
 
-  /**
-   * Creates and writes snapshot local properties to a YAML file with not 
defragged SST file list.
-   * @param snapshotManager snapshot manager instance.
-   * @param snapshotStore snapshot metadata manager.
-   * @param snapshotInfo snapshot info instance corresponding to snapshot.
-   */
-  public static void createNewOmSnapshotLocalDataFile(OmSnapshotManager 
snapshotManager, RDBStore snapshotStore,
-      SnapshotInfo snapshotInfo) throws IOException {
-    Path snapshotLocalDataPath = 
Paths.get(getSnapshotLocalPropertyYamlPath(snapshotStore.getDbLocation().toPath()));
-    Files.deleteIfExists(snapshotLocalDataPath);
-    OmSnapshotLocalDataYaml snapshotLocalDataYaml = new 
OmSnapshotLocalDataYaml(snapshotInfo.getSnapshotId(),
-        getSnapshotSSTFileList(snapshotStore), 
snapshotInfo.getPathPreviousSnapshotId());
-    snapshotLocalDataYaml.writeToYaml(snapshotManager, 
snapshotLocalDataPath.toFile());
-  }
-
   // Get OmSnapshot if the keyName has ".snapshot" key indicator
   @SuppressWarnings("unchecked")
   public UncheckedAutoCloseableSupplier<IOmMetadataReader> 
getActiveFsMetadataOrSnapshot(
@@ -691,24 +675,8 @@ public UncheckedAutoCloseableSupplier<OmSnapshot> 
getSnapshot(
     return getSnapshot(volumeName, bucketName, snapshotName, true);
   }
 
-  public UncheckedAutoCloseableSupplier<Yaml> getSnapshotLocalYaml() throws 
IOException {
-    try {
-      Yaml yaml = yamlPool.borrowObject();
-      return new UncheckedAutoCloseableSupplier<Yaml>() {
-
-        @Override
-        public void close() {
-          yamlPool.returnObject(yaml);
-        }
-
-        @Override
-        public Yaml get() {
-          return yaml;
-        }
-      };
-    } catch (Exception e) {
-      throw new IOException("Failed to get snapshot local yaml", e);
-    }
+  public OmSnapshotLocalDataManager getSnapshotLocalDataManager() {
+    return snapshotLocalDataManager;
   }
 
   private UncheckedAutoCloseableSupplier<OmSnapshot> getSnapshot(
@@ -856,29 +824,6 @@ public static String 
extractSnapshotIDFromCheckpointDirName(String snapshotPath)
     return snapshotPath.substring(index + OM_DB_NAME.length() + 
OM_SNAPSHOT_SEPARATOR.length());
   }
 
-  /**
-   * Returns the path to the YAML file that stores local properties for the 
given snapshot.
-   *
-   * @param omMetadataManager metadata manager to get the base path
-   * @param snapshotInfo snapshot metadata
-   * @return the path to the snapshot's local property YAML file
-   */
-  public static String getSnapshotLocalPropertyYamlPath(OMMetadataManager 
omMetadataManager,
-      SnapshotInfo snapshotInfo) {
-    Path snapshotPath = getSnapshotPath(omMetadataManager, snapshotInfo);
-    return getSnapshotLocalPropertyYamlPath(snapshotPath);
-  }
-
-  /**
-   * Returns the path to the YAML file that stores local properties for the 
given snapshot.
-   *
-   * @param snapshotPath path to the snapshot checkpoint dir
-   * @return the path to the snapshot's local property YAML file
-   */
-  public static String getSnapshotLocalPropertyYamlPath(Path snapshotPath) {
-    return snapshotPath.toString() + ".yaml";
-  }
-
   public static boolean isSnapshotKey(String[] keyParts) {
     return (keyParts.length > 1) &&
         (keyParts[0].compareTo(OM_SNAPSHOT_INDICATOR) == 0);
@@ -1199,8 +1144,8 @@ public void close() {
     if (options != null) {
       options.close();
     }
-    if (yamlPool != null) {
-      yamlPool.close();
+    if (snapshotLocalDataManager != null) {
+      snapshotLocalDataManager.close();
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index ef3555f5435..75ba2a8f950 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,7 +99,9 @@ protected void addToDBBatch(OMMetadataManager 
omMetadataManager,
       ((OmMetadataManagerImpl) omMetadataManager).getSnapshotChainManager()
           .removeFromSnapshotIdToTable(snapshotInfo.getSnapshotId());
       // Delete Snapshot checkpoint directory.
-      deleteCheckpointDirectory(omMetadataManager, snapshotInfo);
+      OmSnapshotLocalDataManager snapshotLocalDataManager = 
((OmMetadataManagerImpl) omMetadataManager)
+          
.getOzoneManager().getOmSnapshotManager().getSnapshotLocalDataManager();
+      deleteCheckpointDirectory(snapshotLocalDataManager, omMetadataManager, 
snapshotInfo);
       // Delete snapshotInfo from the table.
       omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation, 
dbKey);
     }
@@ -117,8 +120,8 @@ private void updateSnapInfo(OmMetadataManagerImpl 
metadataManager,
   /**
    * Deletes the checkpoint directory for a snapshot.
    */
-  private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
-                                         SnapshotInfo snapshotInfo) {
+  private void deleteCheckpointDirectory(OmSnapshotLocalDataManager 
snapshotLocalDataManager,
+      OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) {
     // Acquiring write lock to avoid race condition with sst filtering service 
which creates a sst filtered file
     // inside the snapshot directory. Any operation apart which doesn't 
create/delete files under this snapshot
     // directory can run in parallel along with this operation.
@@ -127,8 +130,8 @@ private void deleteCheckpointDirectory(OMMetadataManager 
omMetadataManager,
     boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
     if (acquiredSnapshotLock) {
       Path snapshotDirPath = 
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
-      Path snapshotLocalDataPath = Paths.get(
-          
OmSnapshotManager.getSnapshotLocalPropertyYamlPath(omMetadataManager, 
snapshotInfo));
+      // TODO: Do not delete on snapshot purge. OmSnapshotLocalDataManager 
should delete orphan local data files.
+      Path snapshotLocalDataPath = 
Paths.get(snapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
       try {
         FileUtils.deleteDirectory(snapshotDirPath.toFile());
         Files.deleteIfExists(snapshotLocalDataPath);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
new file mode 100644
index 00000000000..98536444a61
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static 
org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml.YAML_FILE_EXTENSION;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.util.ObjectSerializer;
+import org.apache.hadoop.ozone.util.YamlSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Manages local data and metadata associated with Ozone Manager (OM) 
snapshots,
+ * including the creation, storage, and representation of data as YAML files.
+ */
+public class OmSnapshotLocalDataManager implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OmSnapshotLocalDataManager.class);
+
+  private final ObjectSerializer<OmSnapshotLocalData> 
snapshotLocalDataSerializer;
+  private final OMMetadataManager omMetadataManager;
+
+  public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager) {
+    this.omMetadataManager = omMetadataManager;
+    this.snapshotLocalDataSerializer = new YamlSerializer<OmSnapshotLocalData>(
+        new OmSnapshotLocalDataYaml.YamlFactory()) {
+
+      @Override
+      public void computeAndSetChecksum(Yaml yaml, OmSnapshotLocalData data) 
throws IOException {
+        data.computeAndSetChecksum(yaml);
+      }
+    };
+  }
+
+  /**
+   * Returns the path to the YAML file that stores local properties for the 
given snapshot.
+   *
+   * @param snapshotPath path to the snapshot checkpoint dir
+   * @return the path to the snapshot's local property YAML file
+   */
+  public static String getSnapshotLocalPropertyYamlPath(Path snapshotPath) {
+    return snapshotPath.toString() + YAML_FILE_EXTENSION;
+  }
+
+  /**
+   * Returns the path to the YAML file that stores local properties for the 
given snapshot.
+   *
+   * @param snapshotInfo snapshot metadata
+   * @return the path to the snapshot's local property YAML file
+   */
+  public String getSnapshotLocalPropertyYamlPath(SnapshotInfo snapshotInfo) {
+    Path snapshotPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, 
snapshotInfo);
+    return getSnapshotLocalPropertyYamlPath(snapshotPath);
+  }
+
+  /**
+   * Creates and writes snapshot local properties to a YAML file with not 
defragged SST file list.
+   * @param snapshotStore snapshot metadata manager.
+   * @param snapshotInfo snapshot info instance corresponding to snapshot.
+   */
+  public void createNewOmSnapshotLocalDataFile(RDBStore snapshotStore, 
SnapshotInfo snapshotInfo) throws IOException {
+    Path snapshotLocalDataPath = Paths.get(
+        
getSnapshotLocalPropertyYamlPath(snapshotStore.getDbLocation().toPath()));
+    Files.deleteIfExists(snapshotLocalDataPath);
+    OmSnapshotLocalData snapshotLocalDataYaml = new 
OmSnapshotLocalData(snapshotInfo.getSnapshotId(),
+        OmSnapshotManager.getSnapshotSSTFileList(snapshotStore), 
snapshotInfo.getPathPreviousSnapshotId());
+    snapshotLocalDataSerializer.save(snapshotLocalDataPath.toFile(), 
snapshotLocalDataYaml);
+  }
+
+  public OmSnapshotLocalData getOmSnapshotLocalData(SnapshotInfo snapshotInfo) 
throws IOException {
+    Path snapshotLocalDataPath = 
Paths.get(getSnapshotLocalPropertyYamlPath(snapshotInfo));
+    return snapshotLocalDataSerializer.load(snapshotLocalDataPath.toFile());
+  }
+
+  public OmSnapshotLocalData getOmSnapshotLocalData(File snapshotDataPath) 
throws IOException {
+    return snapshotLocalDataSerializer.load(snapshotDataPath);
+  }
+
+  @Override
+  public void close() {
+    if (snapshotLocalDataSerializer != null) {
+      try {
+        snapshotLocalDataSerializer.close();
+      } catch (IOException e) {
+        LOG.error("Failed to close snapshot local data serializer", e);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
index 8b41e507218..23d332ae75b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
@@ -44,8 +44,10 @@
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OmSnapshotLocalData.VersionMeta;
+import org.apache.hadoop.ozone.util.ObjectSerializer;
+import org.apache.hadoop.ozone.util.YamlSerializer;
 import org.apache.ozone.compaction.log.SstFileInfo;
-import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -59,26 +61,26 @@
 public class TestOmSnapshotLocalDataYaml {
 
   private static String testRoot = new FileSystemTestHelper().getTestRootDir();
-  private static OmSnapshotManager omSnapshotManager;
-  private static final Yaml YAML = new 
OmSnapshotLocalDataYaml.YamlFactory().create();
-  private static final UncheckedAutoCloseableSupplier<Yaml> YAML_SUPPLIER = 
new UncheckedAutoCloseableSupplier<Yaml>() {
-    @Override
-    public Yaml get() {
-      return YAML;
-    }
-
-    @Override
-    public void close() {
-
-    }
-  };
+  private static final OmSnapshotLocalDataYaml.YamlFactory YAML_FACTORY = new 
OmSnapshotLocalDataYaml.YamlFactory();
+  private static ObjectSerializer<OmSnapshotLocalData> 
omSnapshotLocalDataSerializer;
 
   private static final Instant NOW = Instant.now();
 
   @BeforeAll
-  public static void setupClassMocks() throws IOException {
-    omSnapshotManager = mock(OmSnapshotManager.class);
-    when(omSnapshotManager.getSnapshotLocalYaml()).thenReturn(YAML_SUPPLIER);
+  public static void setupSerializer() throws IOException {
+    omSnapshotLocalDataSerializer = new 
YamlSerializer<OmSnapshotLocalData>(YAML_FACTORY) {
+      @Override
+      public void computeAndSetChecksum(Yaml yaml, OmSnapshotLocalData data) 
throws IOException {
+        data.computeAndSetChecksum(yaml);
+      }
+    };
+  }
+
+  @AfterAll
+  public static void cleanupSerializer() throws IOException {
+    if (omSnapshotLocalDataSerializer != null) {
+      omSnapshotLocalDataSerializer.close();
+    }
   }
 
   @BeforeEach
@@ -112,7 +114,7 @@ private Pair<File, UUID> writeToYaml(UUID snapshotId, 
String snapshotName) throw
         createLiveFileMetaData("sst1", "table1", "k1", "k2"),
         createLiveFileMetaData("sst2", "table1", "k3", "k4"),
         createLiveFileMetaData("sst3", "table2", "k4", "k5"));
-    OmSnapshotLocalDataYaml dataYaml = new OmSnapshotLocalDataYaml(snapshotId, 
notDefraggedSSTFileList,
+    OmSnapshotLocalData dataYaml = new OmSnapshotLocalData(snapshotId, 
notDefraggedSSTFileList,
         previousSnapshotId);
 
     // Set version
@@ -137,7 +139,7 @@ private Pair<File, UUID> writeToYaml(UUID snapshotId, 
String snapshotName) throw
     File yamlFile = new File(testRoot, yamlFilePath);
 
     // Create YAML file with SnapshotData
-    dataYaml.writeToYaml(omSnapshotManager, yamlFile);
+    omSnapshotLocalDataSerializer.save(yamlFile, dataYaml);
 
     // Check YAML file exists
     assertTrue(yamlFile.exists());
@@ -153,7 +155,7 @@ public void testWriteToYaml() throws IOException {
     UUID prevSnapId = yamlFilePrevIdPair.getRight();
 
     // Read from YAML file
-    OmSnapshotLocalDataYaml snapshotData = 
OmSnapshotLocalDataYaml.getFromYamlFile(omSnapshotManager, yamlFile);
+    OmSnapshotLocalData snapshotData = 
omSnapshotLocalDataSerializer.load(yamlFile);
 
     // Verify fields
     assertEquals(44, snapshotData.getVersion());
@@ -193,8 +195,8 @@ public void testUpdateSnapshotDataFile() throws IOException 
{
     Pair<File, UUID> yamlFilePrevIdPair = writeToYaml(snapshotId, "snapshot2");
     File yamlFile = yamlFilePrevIdPair.getLeft();
     // Read from YAML file
-    OmSnapshotLocalDataYaml dataYaml =
-        OmSnapshotLocalDataYaml.getFromYamlFile(omSnapshotManager, yamlFile);
+    OmSnapshotLocalData dataYaml =
+        omSnapshotLocalDataSerializer.load(yamlFile);
 
     // Update snapshot data
     dataYaml.setSstFiltered(false);
@@ -203,10 +205,10 @@ public void testUpdateSnapshotDataFile() throws 
IOException {
         singletonList(new SstFileInfo("defragged-sst4", "k5", "k6", 
"table3")), 5);
 
     // Write updated data back to file
-    dataYaml.writeToYaml(omSnapshotManager, yamlFile);
+    omSnapshotLocalDataSerializer.save(yamlFile, dataYaml);
 
     // Read back the updated data
-    dataYaml = OmSnapshotLocalDataYaml.getFromYamlFile(omSnapshotManager, 
yamlFile);
+    dataYaml = omSnapshotLocalDataSerializer.load(yamlFile);
 
     // Verify updated data
     assertThat(dataYaml.getSstFiltered()).isFalse();
@@ -224,10 +226,9 @@ public void testEmptyFile() throws IOException {
     File emptyFile = new File(testRoot, "empty.yaml");
     assertTrue(emptyFile.createNewFile());
 
-    IOException ex = assertThrows(IOException.class, () ->
-        OmSnapshotLocalDataYaml.getFromYamlFile(omSnapshotManager, emptyFile));
+    IOException ex = assertThrows(IOException.class, () -> 
omSnapshotLocalDataSerializer.load(emptyFile));
 
-    assertThat(ex).hasMessageContaining("Failed to load snapshot file. File is 
empty.");
+    assertThat(ex).hasMessageContaining("Failed to load file. File is empty.");
   }
 
   @Test
@@ -236,7 +237,7 @@ public void testChecksum() throws IOException {
     Pair<File, UUID> yamlFilePrevIdPair = writeToYaml(snapshotId, "snapshot3");
     File yamlFile = yamlFilePrevIdPair.getLeft();
     // Read from YAML file
-    OmSnapshotLocalDataYaml snapshotData = 
OmSnapshotLocalDataYaml.getFromYamlFile(omSnapshotManager, yamlFile);
+    OmSnapshotLocalData snapshotData = 
omSnapshotLocalDataSerializer.load(yamlFile);
 
     // Get the original checksum
     String originalChecksum = snapshotData.getChecksum();
@@ -244,7 +245,7 @@ public void testChecksum() throws IOException {
     // Verify the checksum is not null or empty
     assertThat(originalChecksum).isNotNull().isNotEmpty();
 
-    assertTrue(OmSnapshotLocalDataYaml.verifyChecksum(omSnapshotManager, 
snapshotData));
+    assertTrue(omSnapshotLocalDataSerializer.verifyChecksum(snapshotData));
   }
 
   @Test
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index df2b026bce4..7f808df3f97 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -82,6 +82,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.compaction.log.SstFileInfo;
@@ -107,6 +108,7 @@ class TestOmSnapshotManager {
   private SnapshotChainManager snapshotChainManager;
   private OmMetadataManagerImpl omMetadataManager;
   private OmSnapshotManager omSnapshotManager;
+  private OmSnapshotLocalDataManager snapshotLocalDataManager;
   private static final String CANDIDATE_DIR_NAME = OM_DB_NAME +
       SNAPSHOT_CANDIDATE_DIR;
   private File leaderDir;
@@ -139,6 +141,7 @@ void init(@TempDir File tempDir) throws Exception {
     om = omTestManagers.getOzoneManager();
     omMetadataManager = (OmMetadataManagerImpl) om.getMetadataManager();
     omSnapshotManager = om.getOmSnapshotManager();
+    snapshotLocalDataManager = 
om.getOmSnapshotManager().getSnapshotLocalDataManager();
     snapshotChainManager = omMetadataManager.getSnapshotChainManager();
   }
 
@@ -158,8 +161,8 @@ void cleanup() throws IOException {
       SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotInfoKey);
       snapshotChainManager.deleteSnapshot(snapshotInfo);
       snapshotInfoTable.delete(snapshotInfoKey);
-      Path snapshotYaml = 
Paths.get(OmSnapshotManager.getSnapshotLocalPropertyYamlPath(
-          om.getMetadataManager(), snapshotInfo));
+
+      Path snapshotYaml = 
Paths.get(snapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
       Files.deleteIfExists(snapshotYaml);
     }
     omSnapshotManager.invalidateCache();
@@ -310,19 +313,18 @@ public void testCreateNewSnapshotLocalYaml() throws 
IOException {
     when(mockedStore.getDb()).thenReturn(mockedDb);
     when(mockedDb.getLiveFilesMetaData()).thenReturn(mockedLiveFiles);
 
-    Path snapshotYaml = 
Paths.get(OmSnapshotManager.getSnapshotLocalPropertyYamlPath(
-        omMetadataManager, snapshotInfo));
+    Path snapshotYaml = 
Paths.get(snapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotInfo));
     
when(mockedStore.getDbLocation()).thenReturn(getSnapshotPath(omMetadataManager, 
snapshotInfo).toFile());
     // Create an existing YAML file for the snapshot
     assertTrue(snapshotYaml.toFile().createNewFile());
     assertEquals(0, Files.size(snapshotYaml));
     // Create a new YAML file for the snapshot
-    OmSnapshotManager.createNewOmSnapshotLocalDataFile(omSnapshotManager, 
mockedStore, snapshotInfo);
+    snapshotLocalDataManager.createNewOmSnapshotLocalDataFile(mockedStore, 
snapshotInfo);
     // Verify that previous file was overwritten
     assertTrue(Files.exists(snapshotYaml));
     assertTrue(Files.size(snapshotYaml) > 0);
     // Verify the contents of the YAML file
-    OmSnapshotLocalData localData = 
OmSnapshotLocalDataYaml.getFromYamlFile(omSnapshotManager, 
snapshotYaml.toFile());
+    OmSnapshotLocalData localData = 
snapshotLocalDataManager.getOmSnapshotLocalData(snapshotYaml.toFile());
     assertNotNull(localData);
     assertEquals(0, localData.getVersion());
     assertEquals(notDefraggedVersionMeta, 
localData.getVersionSstFileInfos().get(0));
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
index d2ceb5a4478..0fb26a4cd99 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
@@ -47,11 +47,11 @@
 import org.apache.hadoop.hdds.utils.db.CodecException;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.om.snapshot.TestSnapshotRequestAndResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
@@ -164,7 +164,7 @@ public void testValidateAndUpdateCache() throws Exception {
     for (Path checkpoint : checkpointPaths) {
       assertTrue(Files.exists(checkpoint));
       assertTrue(Files.exists(Paths.get(
-          OmSnapshotManager.getSnapshotLocalPropertyYamlPath(checkpoint))));
+          
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(checkpoint))));
     }
 
     OMRequest snapshotPurgeRequest = createPurgeKeysRequest(
@@ -191,7 +191,7 @@ public void testValidateAndUpdateCache() throws Exception {
     for (Path checkpoint : checkpointPaths) {
       assertFalse(Files.exists(checkpoint));
       assertFalse(Files.exists(Paths.get(
-          OmSnapshotManager.getSnapshotLocalPropertyYamlPath(checkpoint))));
+          
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(checkpoint))));
     }
     assertEquals(initialSnapshotPurgeCount + 1, 
getOmSnapshotIntMetrics().getNumSnapshotPurges());
     assertEquals(initialSnapshotPurgeFailCount, 
getOmSnapshotIntMetrics().getNumSnapshotPurgeFails());
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
index 1e78943c7b5..2cafae138fd 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
@@ -40,23 +40,21 @@
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.util.Time;
-import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.yaml.snakeyaml.Yaml;
 
 /**
  * This class tests OMSnapshotCreateResponse.
@@ -76,24 +74,12 @@ public void setup() throws Exception {
     String fsPath = folder.getAbsolutePath();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
         fsPath);
-    OmSnapshotLocalDataYaml.YamlFactory yamlFactory = new 
OmSnapshotLocalDataYaml.YamlFactory();
-    Yaml yaml = yamlFactory.create();
-    UncheckedAutoCloseableSupplier<Yaml> yamlSupplier = new 
UncheckedAutoCloseableSupplier<Yaml>() {
-      @Override
-      public Yaml get() {
-        return yaml;
-      }
-
-      @Override
-      public void close() {
-
-      }
-    };
     OzoneManager ozoneManager = mock(OzoneManager.class);
     OmSnapshotManager omSnapshotManager = mock(OmSnapshotManager.class);
+    OmSnapshotLocalDataManager snapshotLocalDataManager = 
mock(OmSnapshotLocalDataManager.class);
     when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
     when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
-    when(omSnapshotManager.getSnapshotLocalYaml()).thenReturn(yamlSupplier);
+    
when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(snapshotLocalDataManager);
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration, 
ozoneManager);
     batchOperation = omMetadataManager.getStore().initBatchOperation();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
index 24fdc138fd7..2d5d7b2870f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java
@@ -33,22 +33,20 @@
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteSnapshotResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
-import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.yaml.snakeyaml.Yaml;
 
 /**
  * This class tests OMSnapshotDeleteResponse.
@@ -68,24 +66,12 @@ public void setup() throws Exception {
     String fsPath = folder.toAbsolutePath().toString();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
         fsPath);
-    OmSnapshotLocalDataYaml.YamlFactory yamlFactory = new 
OmSnapshotLocalDataYaml.YamlFactory();
-    Yaml yaml = yamlFactory.create();
-    UncheckedAutoCloseableSupplier<Yaml> yamlSupplier = new 
UncheckedAutoCloseableSupplier<Yaml>() {
-      @Override
-      public Yaml get() {
-        return yaml;
-      }
-
-      @Override
-      public void close() {
-
-      }
-    };
     OzoneManager ozoneManager = mock(OzoneManager.class);
     OmSnapshotManager omSnapshotManager = mock(OmSnapshotManager.class);
+    OmSnapshotLocalDataManager omSnapshotLocalDataManager = 
mock(OmSnapshotLocalDataManager.class);
     when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
     when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
-    when(omSnapshotManager.getSnapshotLocalYaml()).thenReturn(yamlSupplier);
+    
when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(omSnapshotLocalDataManager);
     omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration, 
ozoneManager);
     batchOperation = omMetadataManager.getStore().initBatchOperation();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to