This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d4f6bbb2b0 HDDS-14027. Use CodecBuffer instead of byte array in 
defrag to reduce GC (#9393)
0d4f6bbb2b0 is described below

commit 0d4f6bbb2b01a441110c680d58f979d3284866f9
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Dec 5 19:43:04 2025 -0500

    HDDS-14027. Use CodecBuffer instead of byte array in defrag to reduce GC 
(#9393)
---
 .../hadoop/hdds/utils/db/CodecBufferCodec.java     |  87 +++
 .../hadoop/hdds/utils/db/RDBSstFileWriter.java     |  23 +
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |  18 +-
 .../hadoop/hdds/utils/MapBackedTableIterator.java  |   2 +-
 .../hadoop/hdds/utils/db/InMemoryTestTable.java    |   4 +-
 .../hdds/utils/db/managed/ManagedDirectSlice.java  |  51 ++
 .../om/snapshot/defrag/SnapshotDefragService.java  |  76 ++-
 .../snapshot/defrag/TestSnapshotDefragService.java | 737 ++++++++++++++++++++-
 8 files changed, 955 insertions(+), 43 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
new file mode 100644
index 00000000000..847e96adc81
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import jakarta.annotation.Nonnull;
+import java.nio.ByteBuffer;
+
+/**
+ * A concrete implementation of the Codec interface for the CodecBuffer type.
+ * This class provides methods to serialize and deserialize CodecBuffer
+ * objects to and from byte arrays and to interact with CodecBuffer instances
+ * using different allocation strategies (direct or heap).
+ *
+ * The CodecBufferCodec distinguishes between direct and non-direct
+ * (heap-based) buffers, ensuring compatibility between the provided allocator
+ * and buffer type during serialization and deserialization.
+ *
+ * This codec supports CodecBuffer-based methods.
+ * NOTE: This codec does not create copies of CodecBuffer objects and it 
returns the CodecBuffer object itself
+ * consumer of this codec and thus the caller should not close the CodecBuffer 
object in case of
+ * {@link #copyObject(CodecBuffer)} and {@link #toCodecBuffer(CodecBuffer, 
CodecBuffer.Allocator)} methods. This has
+ * been done to avoid unnecessary memory allocations.
+ * However, it still has to handle lifecyle of CodecBuffer returned by {@link 
#fromPersistedFormat(byte[])} method.
+ */
+public final class CodecBufferCodec implements Codec<CodecBuffer> {
+
+  private static final Codec<CodecBuffer> DIRECT_INSTANCE = new 
CodecBufferCodec(true);
+  private static final Codec<CodecBuffer> NON_DIRECT_INSTANCE = new 
CodecBufferCodec(false);
+
+  private final CodecBuffer.Allocator allocator;
+
+  public static Codec<CodecBuffer> get(boolean direct) {
+    return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
+  }
+
+  private CodecBufferCodec(boolean direct) {
+    this.allocator = direct ? CodecBuffer.Allocator.getDirect() : 
CodecBuffer.Allocator.getHeap();
+  }
+
+  @Override
+  public CodecBuffer toCodecBuffer(@Nonnull CodecBuffer object, 
CodecBuffer.Allocator customAllocator) {
+    if (customAllocator.isDirect() != object.isDirect()) {
+      throw new IllegalArgumentException("Custom allocator must be of the same 
type as the object");
+    }
+    return object;
+  }
+
+  @Override
+  public boolean supportCodecBuffer() {
+    return true;
+  }
+
+  @Override
+  public Class<CodecBuffer> getTypeClass() {
+    return CodecBuffer.class;
+  }
+
+  @Override
+  public byte[] toPersistedFormat(CodecBuffer buffer) {
+    return buffer.getArray();
+  }
+
+  @Override
+  public CodecBuffer fromPersistedFormat(byte[] bytes) {
+    return this.allocator.apply(bytes.length).put(ByteBuffer.wrap(bytes));
+  }
+
+  @Override
+  public CodecBuffer copyObject(CodecBuffer buffer) {
+    return buffer;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index 5aa561ba948..a689e9fdea1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -20,6 +20,7 @@
 import java.io.Closeable;
 import java.io.File;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedEnvOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileWriter;
@@ -60,6 +61,17 @@ public void put(byte[] key, byte[] value) throws 
RocksDatabaseException {
     }
   }
 
+  public void put(CodecBuffer key, CodecBuffer value) throws 
RocksDatabaseException {
+    try {
+      sstFileWriter.put(key.asReadOnlyByteBuffer(), 
value.asReadOnlyByteBuffer());
+      keyCounter.incrementAndGet();
+    } catch (RocksDBException e) {
+      closeOnFailure();
+      throw new RocksDatabaseException("Failed to put key (length=" + 
key.readableBytes()
+          + ") and value (length=" + value.readableBytes() + "), sstFile=" + 
sstFile.getAbsolutePath(), e);
+    }
+  }
+
   public void delete(byte[] key) throws RocksDatabaseException {
     try {
       sstFileWriter.delete(key);
@@ -71,6 +83,17 @@ public void delete(byte[] key) throws RocksDatabaseException 
{
     }
   }
 
+  public void delete(CodecBuffer key) throws RocksDatabaseException {
+    try (ManagedDirectSlice slice = new 
ManagedDirectSlice(key.asReadOnlyByteBuffer())) {
+      sstFileWriter.delete(slice);
+      keyCounter.incrementAndGet();
+    } catch (RocksDBException e) {
+      closeOnFailure();
+      throw new RocksDatabaseException("Failed to delete key (length=" + 
key.readableBytes()
+          + "), sstFile=" + sstFile.getAbsolutePath(), e);
+    }
+  }
+
   @Override
   public void close() throws RocksDatabaseException {
     if (sstFileWriter != null) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 3e784bec10f..a35c4a259f1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -248,12 +248,24 @@ public void deleteBatchWithPrefix(BatchOperation batch, 
byte[] prefix)
   @Override
   public void dumpToFileWithPrefix(File externalFile, byte[] prefix)
       throws RocksDatabaseException, CodecException {
-    try (KeyValueIterator<byte[], byte[]> iter = iterator(prefix);
-         RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
+    CodecBuffer prefixBuffer = prefix == null || prefix.length == 0 ? null :
+        CodecBufferCodec.get(true).fromPersistedFormat(prefix);
+    KeyValueIterator<CodecBuffer, CodecBuffer> iter;
+    try {
+      iter = iterator(prefixBuffer, KeyValueIterator.Type.KEY_AND_VALUE);
+    } catch (RocksDatabaseException e) {
+      if (prefixBuffer != null) {
+        prefixBuffer.close();
+      }
+      throw e;
+    }
+    try (RDBSstFileWriter fileWriter = new RDBSstFileWriter(externalFile)) {
       while (iter.hasNext()) {
-        final KeyValue<byte[], byte[]> entry = iter.next();
+        final KeyValue<CodecBuffer, CodecBuffer> entry = iter.next();
         fileWriter.put(entry.getKey(), entry.getValue());
       }
+    } finally {
+      iter.close();
     }
   }
 
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java
index 5ce574509da..4c34dda5278 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java
@@ -46,7 +46,7 @@ public void seekToFirst() {
 
   @Override
   public void seekToLast() {
-
+    this.seek(this.values.lastKey());
   }
 
   @Override
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
index ab5821b4b36..23bb463d45f 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
@@ -97,7 +97,7 @@ public void deleteRangeWithBatch(BatchOperation batch, KEY 
beginKey, KEY endKey)
 
   @Override
   public void deleteRange(KEY beginKey, KEY endKey) {
-    throw new UnsupportedOperationException();
+    map.subMap(beginKey, endKey).clear();
   }
 
   @Override
@@ -136,7 +136,7 @@ public void loadFromFile(File externalFile) {
     throw new UnsupportedOperationException();
   }
 
-  NavigableMap<KEY, VALUE> getMap() {
+  public NavigableMap<KEY, VALUE> getMap() {
     return map;
   }
 }
diff --git 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
new file mode 100644
index 00000000000..b68a250cb8b
--- /dev/null
+++ 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import static 
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
+
+import java.nio.ByteBuffer;
+import org.apache.ratis.util.UncheckedAutoCloseable;
+import org.rocksdb.DirectSlice;
+
+/**
+ * Managed Direct Slice.
+ */
+public class ManagedDirectSlice extends DirectSlice {
+  private final UncheckedAutoCloseable leakTracker = track(this);
+
+  public ManagedDirectSlice(ByteBuffer data) {
+    super(data);
+  }
+
+  @Override
+  public synchronized long getNativeHandle() {
+    return super.getNativeHandle();
+  }
+
+  @Override
+  protected void disposeInternal() {
+    // RocksMutableObject.close is final thus can't be decorated.
+    // So, we decorate disposeInternal instead to track closure.
+    try {
+      super.disposeInternal();
+    } finally {
+      leakTracker.close();
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
index 54ee0a1f9a4..259639bb87a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
@@ -33,7 +33,6 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -53,8 +52,8 @@
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
-import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
 import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecBufferCodec;
 import org.apache.hadoop.hdds.utils.db.CodecException;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -159,10 +158,9 @@ public SnapshotDefragService(long interval, TimeUnit unit, 
long serviceTimeout,
     if (tmpDefragDirPath.toFile().exists()) {
       deleteDirectory(tmpDefragDirPath);
     }
-    createDirectories(tmpDefragDirPath);
     this.tmpDefragDir = tmpDefragDirPath.toString();
     this.differTmpDir = tmpDefragDirPath.resolve("differSstFiles");
-
+    createDirectories(differTmpDir);
     this.deltaDiffComputer = new CompositeDeltaDiffComputer(omSnapshotManager,
         ozoneManager.getMetadataManager(), differTmpDir, (status) -> {
       LOG.debug("Snapshot defragmentation diff status: {}", status);
@@ -186,6 +184,10 @@ public void resume() {
     running.set(true);
   }
 
+  boolean isRunning() {
+    return running.get();
+  }
+
   /**
    * Checks if rocks-tools native library is available.
    */
@@ -263,8 +265,8 @@ private Pair<String, String> getTableBounds(Table<String, 
?> table) throws Rocks
   void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo 
prefixInfo,
       Set<String> incrementalTables) throws IOException {
     for (String table : incrementalTables) {
-      Table<String, byte[]> checkpointTable = 
checkpointDBStore.getTable(table, StringCodec.get(),
-          ByteArrayCodec.get());
+      Table<String, CodecBuffer> checkpointTable = 
checkpointDBStore.getTable(table, StringCodec.get(),
+          CodecBufferCodec.get(true));
       String tableBucketPrefix = prefixInfo.getTablePrefix(table);
       String prefixUpperBound = 
getLexicographicallyHigherString(tableBucketPrefix);
 
@@ -287,7 +289,8 @@ void performFullDefragmentation(DBStore checkpointDBStore, 
TablePrefixInfo prefi
       // Compact the table completely with kForce to get rid of tombstones.
       try (ManagedCompactRangeOptions compactRangeOptions = new 
ManagedCompactRangeOptions()) {
         
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
-        compactRangeOptions.setExclusiveManualCompaction(true);
+        // Need to allow the range tombstones to change levels and get 
propogated to bottommost level.
+        compactRangeOptions.setChangeLevel(true);
         checkpointDBStore.compactTable(table, compactRangeOptions);
       }
     }
@@ -308,9 +311,9 @@ void performFullDefragmentation(DBStore checkpointDBStore, 
TablePrefixInfo prefi
    *         indicating whether any delta entries were written (true if there 
are differences, false otherwise)
    * @throws IOException if an I/O error occurs during processing
    */
-  @VisibleForTesting
-  Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> deltaFilePaths, 
Table<String, byte[]> snapshotTable,
-      Table<String, byte[]> previousSnapshotTable, String tableKeyPrefix) 
throws IOException {
+  private Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> 
deltaFilePaths,
+      Table<String, CodecBuffer> snapshotTable, Table<String, CodecBuffer> 
previousSnapshotTable,
+      String tableKeyPrefix) throws IOException {
     String sstFileReaderUpperBound = null;
     if (Strings.isNotEmpty(tableKeyPrefix)) {
       sstFileReaderUpperBound = 
getLexicographicallyHigherString(tableKeyPrefix);
@@ -321,22 +324,28 @@ Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> 
deltaFilePaths, Table<S
     int deltaEntriesCount = 0;
     try (ClosableIterator<String> keysToCheck =
              sstFileSetReader.getKeyStreamWithTombstone(tableKeyPrefix, 
sstFileReaderUpperBound);
-         TableMergeIterator<String, byte[]> tableMergeIterator = new 
TableMergeIterator<>(keysToCheck,
+         TableMergeIterator<String, CodecBuffer> tableMergeIterator = new 
TableMergeIterator<>(keysToCheck,
              tableKeyPrefix, snapshotTable, previousSnapshotTable);
          RDBSstFileWriter rdbSstFileWriter = new 
RDBSstFileWriter(fileToBeIngested.toFile())) {
       while (tableMergeIterator.hasNext()) {
-        Table.KeyValue<String, List<byte[]>> kvs = tableMergeIterator.next();
+        Table.KeyValue<String, List<CodecBuffer>> kvs = 
tableMergeIterator.next();
         // Check if the values are equal or if they are not equal then the 
value should be written to the
         // delta sstFile.
-        if (!Arrays.equals(kvs.getValue().get(0), kvs.getValue().get(1))) {
-          try (CodecBuffer key = 
StringCodec.get().toHeapCodecBuffer(kvs.getKey())) {
-            byte[] keyArray = key.asReadOnlyByteBuffer().array();
-            byte[] val = kvs.getValue().get(0);
+        CodecBuffer snapValue = kvs.getValue().get(0);
+        CodecBuffer prevValue = kvs.getValue().get(1);
+        boolean valuesEqual = false;
+        if (snapValue == null && prevValue == null) {
+          valuesEqual = true;
+        } else if (snapValue != null && prevValue != null) {
+          valuesEqual = snapValue.readableBytes() == prevValue.readableBytes() 
&& snapValue.startsWith(prevValue);
+        }
+        if (!valuesEqual) {
+          try (CodecBuffer key = 
StringCodec.get().toDirectCodecBuffer(kvs.getKey())) {
             // If the value is null then add a tombstone to the delta sstFile.
-            if (val == null) {
-              rdbSstFileWriter.delete(keyArray);
+            if (snapValue == null) {
+              rdbSstFileWriter.delete(key);
             } else {
-              rdbSstFileWriter.put(keyArray, val);
+              rdbSstFileWriter.put(key, snapValue);
             }
           }
           deltaEntriesCount++;
@@ -397,10 +406,10 @@ void performIncrementalDefragmentation(SnapshotInfo 
previousSnapshotInfo, Snapsh
           // file can reingested into the checkpointStore.
           fileToBeIngested = deltaFiles.get(0);
         } else {
-          Table<String, byte[]> snapshotTable = 
snapshot.get().getMetadataManager().getStore()
-              .getTable(table, StringCodec.get(), ByteArrayCodec.get());
-          Table<String, byte[]> previousSnapshotTable = 
previousSnapshot.get().getMetadataManager().getStore()
-              .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+          Table<String, CodecBuffer> snapshotTable = 
snapshot.get().getMetadataManager().getStore()
+              .getTable(table, StringCodec.get(), CodecBufferCodec.get(true));
+          Table<String, CodecBuffer> previousSnapshotTable = 
previousSnapshot.get().getMetadataManager().getStore()
+              .getTable(table, StringCodec.get(), CodecBufferCodec.get(true));
           String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table);
           Pair<Path, Boolean> spillResult = 
spillTableDiffIntoSstFile(deltaFiles, snapshotTable,
               previousSnapshotTable, tableBucketPrefix);
@@ -439,8 +448,8 @@ void performIncrementalDefragmentation(SnapshotInfo 
previousSnapshotInfo, Snapsh
    * @throws IOException if an I/O error occurs during table ingestion or file 
operations.
    */
   @VisibleForTesting
-  void ingestNonIncrementalTables(DBStore checkpointDBStore,
-      SnapshotInfo snapshotInfo, TablePrefixInfo bucketPrefixInfo, Set<String> 
incrementalTables) throws IOException {
+  void ingestNonIncrementalTables(DBStore checkpointDBStore, SnapshotInfo 
snapshotInfo,
+      TablePrefixInfo bucketPrefixInfo, Set<String> incrementalTables) throws 
IOException {
     String volumeName = snapshotInfo.getVolumeName();
     String bucketName = snapshotInfo.getBucketName();
     String snapshotName = snapshotInfo.getName();
@@ -455,12 +464,14 @@ void ingestNonIncrementalTables(DBStore checkpointDBStore,
               + SST_FILE_EXTENSION);
           filesToBeDeleted.add(tmpSstFile);
           String prefix = bucketPrefixInfo.getTablePrefix(snapshotTableName);
-          byte[] prefixBytes = Strings.isBlank(prefix) ? null : 
StringCodec.get().toPersistedFormat(prefix);
-          Table<byte[], byte[]> snapshotTableBytes = 
snapshotDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
-              ByteArrayCodec.get());
-          snapshotTableBytes.dumpToFileWithPrefix(tmpSstFile.toFile(), 
prefixBytes);
-          Table<byte[], byte[]> checkpointTable = 
checkpointDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
-              ByteArrayCodec.get());
+          try (CodecBuffer prefixBytes = Strings.isBlank(prefix) ? null :
+              StringCodec.get().toDirectCodecBuffer(prefix)) {
+            Table<CodecBuffer, CodecBuffer> snapshotTableBytes = 
snapshotDBStore.getTable(snapshotTableName,
+                CodecBufferCodec.get(true), CodecBufferCodec.get(true));
+            snapshotTableBytes.dumpToFileWithPrefix(tmpSstFile.toFile(), 
prefixBytes);
+          }
+          Table<CodecBuffer, CodecBuffer> checkpointTable = 
checkpointDBStore.getTable(snapshotTableName,
+              CodecBufferCodec.get(true), CodecBufferCodec.get(true));
           checkpointTable.loadFromFile(tmpSstFile.toFile());
         }
       }
@@ -572,7 +583,8 @@ private void acquireContentLock(UUID snapshotID) throws 
IOException {
     LOG.debug("Acquired MultiSnapshotLocks on snapshot: {}", snapshotID);
   }
 
-  private boolean checkAndDefragSnapshot(SnapshotChainManager chainManager, 
UUID snapshotId) throws IOException {
+  @VisibleForTesting
+  boolean checkAndDefragSnapshot(SnapshotChainManager chainManager, UUID 
snapshotId) throws IOException {
     SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
chainManager, snapshotId);
 
     if (snapshotInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
index 46fa02d21f3..d00732551f8 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
@@ -17,45 +17,111 @@
 
 package org.apache.hadoop.ozone.om.snapshot.defrag;
 
+import static 
org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE;
+import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELEGATION_TOKEN_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE;
+import static 
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_CONTENT_LOCK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecBufferCodec;
+import org.apache.hadoop.hdds.utils.db.CodecException;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;
+import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
+import org.apache.hadoop.hdds.utils.db.StringInMemoryTestTable;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import 
org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer;
+import org.apache.hadoop.ozone.om.snapshot.diff.delta.DeltaFileComputer;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.upgrade.LayoutFeature;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
+import org.apache.ozone.rocksdb.util.SstFileSetReader;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 /**
@@ -81,15 +147,20 @@ public class TestSnapshotDefragService {
   @Mock
   private OMLayoutVersionManager versionManager;
 
+  private DeltaFileComputer deltaFileComputer;
+
   @TempDir
   private Path tempDir;
+  private OzoneConfiguration configuration;
   private SnapshotDefragService defragService;
   private AutoCloseable mocks;
+  private Map<String, CodecBuffer> dummyTableValues;
+  private Set<CodecBuffer> closeSet = new HashSet<>();
 
   @BeforeEach
   public void setup() throws IOException {
     mocks = MockitoAnnotations.openMocks(this);
-    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration = new OzoneConfiguration();
 
     // Setup basic mocks
     when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
@@ -114,8 +185,28 @@ public void setup() throws IOException {
           configuration
       );
       assertEquals(1, compositeDeltaDiffComputer.constructed().size());
+      this.deltaFileComputer = compositeDeltaDiffComputer.constructed().get(0);
+    }
+    this.dummyTableValues = new HashMap<>();
+    for (char c = 'a'; c <= 'z'; c++) {
+      for (char d = 'a'; d <= 'z'; d++) {
+        for (int i = 0; i < 10; i++) {
+          String key = String.valueOf(c) + d + i;
+          CodecBuffer value = getCodecBuffer(key);
+          dummyTableValues.put(key, value);
+        }
+      }
     }
+  }
+
+  private CodecBuffer getCodecBuffer(String value) throws CodecException {
+    CodecBuffer buffer = StringCodec.get().toDirectCodecBuffer(value);
+    closeSet.add(buffer);
+    return buffer;
+  }
 
+  private String getFromCodecBuffer(CodecBuffer buffer) {
+    return StringCodec.get().fromCodecBuffer(buffer);
   }
 
   @AfterEach
@@ -126,6 +217,7 @@ public void tearDown() throws Exception {
     if (mocks != null) {
       mocks.close();
     }
+    closeSet.forEach(CodecBuffer::close);
   }
 
   @Test
@@ -134,10 +226,10 @@ public void testServiceStartAndPause() {
     assertTrue(defragService.getSnapshotsDefraggedCount().get() >= 0);
 
     defragService.pause();
-    assertNotNull(defragService);
+    assertFalse(defragService.isRunning());
 
     defragService.resume();
-    assertNotNull(defragService);
+    assertTrue(defragService.isRunning());
   }
 
   @Test
@@ -195,11 +287,646 @@ public void testNeedsDefragmentationRequiresDefrag() 
throws IOException {
     verify(provider).close();
   }
 
+  private Map<String, InMemoryTestTable<String, CodecBuffer>> 
createTables(String... tableNames) {
+    return createTables(dummyTableValues, tableNames);
+  }
+
+  private Map<String, InMemoryTestTable<String, CodecBuffer>> createTables(
+      Map<String, CodecBuffer> tableValues, String... tableNames) {
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tables = new 
HashMap<>();
+    for (String tableName : tableNames) {
+      tables.put(tableName, new StringInMemoryTestTable<>(tableValues, 
tableName));
+    }
+    return tables;
+  }
+
+  @Test
+  public void testPerformFullDefragmentation() throws Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tableMap = 
createTables("cf1", "cf2", "cf3");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef"));
+    Map<String, Map<String, CodecBuffer>> tablesCompacted = new HashMap<>();
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    when(checkpointDBStore.getTable(anyString(), eq(StringCodec.get()), 
eq(CodecBufferCodec.get(true))))
+        .thenAnswer(i -> {
+          String tableName = i.getArgument(0, String.class);
+          return tableMap.getOrDefault(tableName, null);
+        });
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      Map<String, CodecBuffer> table = new 
HashMap<>(tableMap.get(tableName).getMap());
+      tablesCompacted.putIfAbsent(tableName, table);
+      return null;
+
+    }).when(checkpointDBStore).compactTable(anyString(), any());
+
+    defragService.performFullDefragmentation(checkpointDBStore, prefixInfo, 
incrementalTables);
+    assertEquals(2, tablesCompacted.size());
+    for (Map.Entry<String, Map<String, CodecBuffer>> compactedTable : 
tablesCompacted.entrySet()) {
+      String prefix = prefixInfo.getTablePrefix(compactedTable.getKey());
+      Map<String, String> compactedStringTable = 
compactedTable.getValue().entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue())));
+      Map<String, String> expectedValue = dummyTableValues.entrySet().stream()
+          .filter(e -> e.getKey().startsWith(prefix))
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue())));
+      assertEquals(expectedValue, compactedStringTable);
+      assertEquals(expectedValue, 
tableMap.get(compactedTable.getKey()).getMap().entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue()))));
+    }
+    for (Map.Entry<String, InMemoryTestTable<String, CodecBuffer>> 
nonCompactedTable : tableMap.entrySet()) {
+      if (!tablesCompacted.containsKey(nonCompactedTable.getKey())) {
+        assertEquals(dummyTableValues, nonCompactedTable.getValue().getMap());
+      }
+    }
+  }
+
+  @Test
+  public void testIngestNonIncrementalTables() throws Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    DBStore snapshotDBStore = mock(DBStore.class);
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef", "cf4", ""));
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    Map<String, String> dumpedFileName = new HashMap<>();
+    Map<String, Table<CodecBuffer, CodecBuffer>> snapshotTables = 
Stream.of("cf1", "cf2", "cf3", "cf4", "cf5")
+        .map(name -> {
+          Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+          when(table.getName()).thenReturn(name);
+          try {
+            doAnswer(i -> {
+              CodecBuffer prefixBytes = i.getArgument(1) == null ? null : 
i.getArgument(1, CodecBuffer.class);
+              String prefix = prefixBytes == null ? "" : 
StringCodec.get().fromCodecBuffer(prefixBytes);
+              assertEquals(prefixInfo.getTablePrefix(name), prefix);
+              dumpedFileName.put(name, i.getArgument(0, 
File.class).toPath().toAbsolutePath().toString());
+              return null;
+            }).when(table).dumpToFileWithPrefix(any(File.class), any());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return table;
+        }).collect(Collectors.toMap(Table::getName, Function.identity()));
+    Map<String, String> ingestedFiles = new HashMap<>();
+    Map<String, Table> checkpointTables = Stream.of("cf3", "cf4", "cf5")
+        .map(name -> {
+          Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+          when(table.getName()).thenReturn(name);
+          try {
+            doAnswer(i -> {
+              File file = i.getArgument(0, File.class);
+              ingestedFiles.put(name, 
file.toPath().toAbsolutePath().toString());
+              return null;
+            }).when(table).loadFromFile(any(File.class));
+          } catch (RocksDatabaseException e) {
+            throw new RuntimeException(e);
+          }
+          return table;
+        }).collect(Collectors.toMap(Table::getName, Function.identity()));
+
+    OmSnapshot snapshot = mock(OmSnapshot.class);
+    OmMetadataManagerImpl snapshotMetadataManager = 
mock(OmMetadataManagerImpl.class);
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+      @Override
+      public OmSnapshot get() {
+        return snapshot;
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+
+    when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), 
anyString())).thenReturn(snapshotSupplier);
+    when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+    when(snapshotMetadataManager.getStore()).thenReturn(snapshotDBStore);
+    List<Table<?, ?>> snapshotTableList = new 
ArrayList<>(snapshotTables.values());
+    when(snapshotDBStore.listTables()).thenReturn(snapshotTableList);
+
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      return snapshotTables.get(tableName);
+    }).when(snapshotDBStore).getTable(anyString(), 
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      return checkpointTables.get(tableName);
+    }).when(checkpointDBStore).getTable(anyString(), 
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+
+    defragService.ingestNonIncrementalTables(checkpointDBStore, snapshotInfo, 
prefixInfo, incrementalTables);
+    assertEquals(checkpointTables.keySet(), dumpedFileName.keySet());
+    assertEquals(dumpedFileName, ingestedFiles);
+  }
+
+  private void assertContents(Map<String, Map<String, String>> tableContents, 
DBStore dbStore)
+      throws RocksDatabaseException, CodecException {
+    for (String tableName : dbStore.getTableNames().values()) {
+      Table<String, String> table = dbStore.getTable(tableName, 
StringCodec.get(), StringCodec.get());
+      try (Table.KeyValueIterator<String, String> iterator = table.iterator()) 
{
+        Map<String, String> expectedContents = tableContents.get(tableName);
+        Map<String, String> actualContents = new HashMap<>();
+        while (iterator.hasNext()) {
+          Table.KeyValue<String, String> kv = iterator.next();
+          actualContents.put(kv.getKey(), kv.getValue());
+        }
+        assertNotNull(expectedContents, "Expected contents for table " + 
tableName + " is null");
+        assertEquals(expectedContents, actualContents, "Table contents 
mismatch for table " + tableName);
+      }
+    }
+  }
+
+  private static Stream<Arguments> testCreateCheckpointCases() {
+    // Have random tables to be incremental to ensure content gets preserved.
+    return Stream.of(
+        Arguments.of(ImmutableSet.of(KEY_TABLE, BUCKET_TABLE, 
DIRECTORY_TABLE)),
+        Arguments.of(ImmutableSet.of(FILE_TABLE, DIRECTORY_TABLE, KEY_TABLE)),
+        Arguments.of(ImmutableSet.of(VOLUME_TABLE, BUCKET_TABLE, 
DELEGATION_TOKEN_TABLE))
+    );
+  }
+
+  private Map<String, Map<String, String>> createTableContents(Path path, 
String keyPrefix) throws IOException {
+    DBCheckpoint snapshotCheckpointLocation = new RocksDBCheckpoint(path);
+    Map<String, Map<String, String>> tableContents = new HashMap<>();
+    try (OmMetadataManagerImpl metadataManager = 
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+        snapshotCheckpointLocation, false)) {
+      Set<String> metadataManagerTables = new 
HashSet<>(metadataManager.listTableNames());
+      for (String tableName : 
metadataManager.getStore().getTableNames().values()) {
+        if (metadataManagerTables.contains(tableName)) {
+          Table<String, String> table = 
metadataManager.getStore().getTable(tableName,
+              StringCodec.get(), StringCodec.get());
+          for (int i = 0; i < 10; i++) {
+            String key = tableName + keyPrefix + i;
+            String value = "value_" + i;
+            table.put(key, value);
+            tableContents.computeIfAbsent(tableName, k -> new 
HashMap<>()).put(key, value);
+          }
+        } else {
+          tableContents.put(tableName, Collections.emptyMap());
+        }
+      }
+    }
+    return tableContents;
+  }
+
+  @ParameterizedTest
+  @MethodSource("testCreateCheckpointCases")
+  public void testCreateCheckpoint(Set<String> incrementalTables) throws 
Exception {
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    DBCheckpoint snapshotCheckpointLocation =
+        new 
RocksDBCheckpoint(tempDir.resolve(snapshotInfo.getSnapshotId().toString()));
+    Map<String, Map<String, String>> tableContents =
+        
createTableContents(snapshotCheckpointLocation.getCheckpointLocation(), 
"_key_");
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+      private final OmMetadataManagerImpl snapshotMetadataManager =
+          OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
snapshotCheckpointLocation, false);
+
+      @Override
+      public OmSnapshot get() {
+        OmSnapshot snapshot = mock(OmSnapshot.class);
+        
when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+        return snapshot;
+      }
+
+      @Override
+      public void close() {
+        try {
+          snapshotMetadataManager.close();
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    };
+
+    when(omSnapshotManager.getActiveSnapshot(eq(snapshotInfo.getVolumeName()), 
eq(snapshotInfo.getBucketName()),
+        eq(snapshotInfo.getName()))).thenReturn(snapshotSupplier);
+    try (OmMetadataManagerImpl result = 
defragService.createCheckpoint(snapshotInfo, incrementalTables)) {
+      try (OmMetadataManagerImpl originalSnapshotStore =
+               
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
snapshotCheckpointLocation)) {
+        assertContents(tableContents, originalSnapshotStore.getStore());
+      } catch (Throwable e) {
+        throw new RuntimeException("Failed to load original snapshot store", 
e);
+      }
+      // Ensure non-incremental tables are cleared.
+      tableContents.entrySet().stream()
+          .filter(e -> !incrementalTables.contains(e.getKey()))
+          .forEach(e -> e.getValue().clear());
+      assertContents(tableContents, result.getStore());
+    }
+  }
+
+  private void assertContents(Map<String, Map<String, String>> contents, Path 
path) throws IOException {
+    DBCheckpoint dbCheckpoint = new RocksDBCheckpoint(path);
+    try (OmMetadataManagerImpl metadataManager = 
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+        dbCheckpoint, true)) {
+      assertContents(contents, metadataManager.getStore());
+    }
+  }
+
+  @Test
+  public void testAtomicSwitchSnapshotDB() throws Exception {
+    UUID snapshotId = UUID.randomUUID();
+    Path checkpointPath = tempDir.resolve("checkpoint");
+    Map<String, Map<String, String>> checkpointContent = 
createTableContents(checkpointPath, "_cp1_");
+    WritableOmSnapshotLocalDataProvider provider = 
mock(WritableOmSnapshotLocalDataProvider.class);
+    OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+
+    
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)).thenReturn(provider);
+    when(provider.getSnapshotLocalData()).thenReturn(localData);
+    AtomicInteger version = new AtomicInteger(1);
+    when(localData.getVersion()).thenAnswer(i -> version.get());
+    doAnswer(i -> {
+      version.incrementAndGet();
+      return null;
+    }).when(provider).addSnapshotVersion(any());
+
+    Path nextVersionPath = tempDir.resolve(snapshotId + "v2").toAbsolutePath();
+    try (MockedStatic<OmSnapshotManager> mockedStatic = 
Mockito.mockStatic(OmSnapshotManager.class,
+        CALLS_REAL_METHODS)) {
+      mockedStatic.when(() -> 
OmSnapshotManager.getSnapshotPath(eq(metadataManager), eq(snapshotId), eq(2)))
+          .thenReturn(nextVersionPath);
+      Map<String, Map<String, String>> existingVersionContents = 
createTableContents(nextVersionPath, "_cp2_");
+      assertNotEquals(existingVersionContents, checkpointContent);
+      assertContents(existingVersionContents, nextVersionPath);
+      int result = defragService.atomicSwitchSnapshotDB(snapshotId, 
checkpointPath);
+      assertContents(checkpointContent, nextVersionPath);
+      assertEquals(1, result);
+      assertEquals(2, version.get());
+    }
+    assertNull(verify(provider).getSnapshotLocalData());
+  }
+
+  private void createMockSnapshot(SnapshotInfo snapshotInfo, Map<String, 
CodecBuffer> tableContents,
+      String... tables) throws IOException {
+    OmSnapshot snapshot = mock(OmSnapshot.class);
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+
+      @Override
+      public void close() {
+
+      }
+
+      @Override
+      public OmSnapshot get() {
+        return snapshot;
+      }
+    };
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tableMap = 
createTables(tableContents, tables);
+    OMMetadataManager snapshotMetadataManager = mock(OMMetadataManager.class);
+    when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+    DBStore snapshotDBStore = mock(DBStore.class);
+    when(snapshotDBStore.getTable(anyString(), eq(StringCodec.get()), 
eq(CodecBufferCodec.get(true))))
+        .thenAnswer(i -> tableMap.getOrDefault(i.getArgument(0, String.class), 
null));
+    when(snapshotMetadataManager.getStore()).thenReturn(snapshotDBStore);
+    when(omSnapshotManager.getActiveSnapshot(eq(snapshotInfo.getVolumeName()), 
eq(snapshotInfo.getBucketName()),
+        eq(snapshotInfo.getName()))).thenReturn(snapshotSupplier);
+  }
+
+  /**
+   * Tests the incremental defragmentation process between two snapshots.
+   *
+   * <p>This parameterized test validates the {@code 
performIncrementalDefragmentation} method
+   * across different version scenarios (0, 1, 2, 10) to ensure proper 
handling of snapshot
+   * delta files and version-specific optimizations.</p>
+   *
+   * <h3>Test Data Generation:</h3>
+   * Creates 67,600 synthetic key-value pairs (26×26×100) distributed across 
two snapshots
+   * with the following patterns based on {@code i % 6}:
+   * <ul>
+   *   <li><b>i % 6 == 0:</b> Different values in snap1 and snap2 
(updates)</li>
+   *   <li><b>i % 6 == 1:</b> Value exists only in snap2 (insertions)</li>
+   *   <li><b>i % 6 == 2:</b> Value exists only in snap1 (deletions)</li>
+   *   <li><b>i % 6 == 3:</b> Identical values in both snapshots (no 
change)</li>
+   *   <li><b>i % 6 == 4:</b> Absent in both snapshots</li>
+   *   <li><b>i % 6 == 5:</b> Different values, but excluded from delta 
files</li>
+   * </ul>
+   *
+   * <h3>Mock Setup:</h3>
+   * <ul>
+   *   <li>Two column families (cf1, cf2) configured for incremental 
tracking</li>
+   *   <li>Three delta SST files: one for cf1, two for cf2</li>
+   *   <li>Table prefix mappings: cf1→"ab", cf2→"cd", cf3→"ef"</li>
+   *   <li>RDBSstFileWriter mock captures put/delete operations</li>
+   *   <li>SstFileSetReader mock returns keys with indices 0-4 (i % 6 < 5)</li>
+   * </ul>
+   *
+   * <h3>Version-Specific Behavior:</h3>
+   * <ul>
+   *   <li><b>currentVersion == 0 (initial version):</b>
+   *     <ul>
+   *       <li>All incremental tables are dumped to new SST files</li>
+   *       <li>All dumped files are ingested into the checkpoint database</li>
+   *     </ul>
+   *   </li>
+   *   <li><b>currentVersion > 0 (subsequent versions):</b>
+   *     <ul>
+   *       <li>Single delta file tables (cf1) are ingested directly without 
merging</li>
+   *       <li>Multiple delta file tables (cf2) are merged and dumped before 
ingestion</li>
+   *       <li>Optimization: avoids unnecessary file I/O for single delta 
files</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * <h3>Assertions:</h3>
+   * <ul>
+   *   <li>Verifies correct tables are dumped and ingested based on 
version</li>
+   *   <li>Validates that only modified keys (i % 6 < 3) appear in delta 
files</li>
+   *   <li>Confirms written values match snap2's values or null for 
deletions</li>
+   *   <li>Ensures all incremental tables are ultimately ingested</li>
+   * </ul>
+   *
+   * @param currentVersion the snapshot version being defragmented (0 for 
initial, >0 for subsequent)
+   * @throws Exception if any error occurs during the test execution
+   */
+  @SuppressWarnings("checkstyle:MethodLength")
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 2, 10})
+  public void testPerformIncrementalDefragmentation(int currentVersion) throws 
Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    String samePrefix = "samePrefix";
+    String snap1Prefix = "snap1Prefix";
+    String snap2Prefix = "snap2Prefix";
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    Map<String, CodecBuffer> snap1TableValues = new HashMap<>();
+    Map<String, CodecBuffer> snap2TableValues = new HashMap<>();
+    for (char c = 'a'; c <= 'z'; c++) {
+      for (char d = 'a'; d <= 'z'; d++) {
+        for (int i = 0; i < 100; i++) {
+          String key = String.format("%c%c%03d", c, d, i);
+          String snap1Value, snap2Value;
+          if (i % 6 == 0) {
+            // Value is different.
+            snap1Value = String.valueOf(c) + d + snap1Prefix + i;
+            snap2Value = String.valueOf(c) + d + snap2Prefix + i;
+          } else if (i % 6 == 1) {
+            // Value is present in snap2 and not present in snap1.
+            snap1Value = null;
+            snap2Value = String.valueOf(c) + d + snap2Prefix + i;
+          } else if (i % 6 == 2) {
+            // Value is present in snap1 and not present in snap2.
+            snap1Value = String.valueOf(c) + d + snap1Prefix + i;
+            snap2Value = null;
+          } else if (i % 6 == 3) {
+            // Value is same.
+            snap1Value = String.valueOf(c) + d + samePrefix + i;
+            snap2Value = snap1Value;
+          } else if (i % 6 == 4) {
+            // both values are absent in snap1 and snap2.
+            snap1Value = null;
+            snap2Value = null;
+          } else {
+            // Value is different but this is key which is not present in 
delta file keys.
+            snap1Value = String.valueOf(c) + d + snap1Prefix + i;
+            snap2Value = String.valueOf(c) + d + snap2Prefix + i;
+          }
+          if (snap1Value != null) {
+            snap1TableValues.put(key, getCodecBuffer(snap1Value));
+          }
+          if (snap2Value != null) {
+            snap2TableValues.put(key, getCodecBuffer(snap2Value));
+          }
+        }
+      }
+    }
+    SnapshotInfo snap1Info = createMockSnapshotInfo(UUID.randomUUID(), "vol1", 
"bucket1", "snap1");
+    SnapshotInfo snap2Info = createMockSnapshotInfo(UUID.randomUUID(), "vol1", 
"bucket1", "snap2");
+    createMockSnapshot(snap1Info, snap1TableValues, "cf1", "cf2", "cf3");
+    createMockSnapshot(snap2Info, snap2TableValues, "cf1", "cf2", "cf3");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef"));
+
+    List<Pair<Path, SstFileInfo>> deltaFiles = ImmutableList.of(
+        Pair.of(tempDir.resolve("1.sst").toAbsolutePath(), new 
SstFileInfo("1", "", "", "cf1")),
+        Pair.of(tempDir.resolve("2.sst").toAbsolutePath(), new 
SstFileInfo("2", "", "", "cf2")),
+        Pair.of(tempDir.resolve("3.sst").toAbsolutePath(), new 
SstFileInfo("3", "", "", "cf2")));
+    Map<String, String> deltaFileNamesToTableMap = deltaFiles.stream()
+        .collect(Collectors.groupingBy(pair -> 
pair.getValue().getColumnFamily())).entrySet()
+        .stream().collect(Collectors.toMap(
+            e -> 
e.getValue().stream().map(Pair::getKey).map(Path::toString).sorted().collect(Collectors.joining(",")),
+            Map.Entry::getKey));
+    for (Pair<Path, SstFileInfo> deltaFile : deltaFiles) {
+      assertTrue(deltaFile.getKey().toFile().createNewFile());
+    }
+    AtomicReference<String> currentTable = new AtomicReference<>();
+    Map<String, String> dumpedFileName = new HashMap<>();
+    Map<Path, List<Pair<String, String>>> deltaFileContents = new HashMap<>();
+    when(deltaFileComputer.getDeltaFiles(eq(snap1Info), eq(snap2Info), 
eq(incrementalTables))).thenReturn(deltaFiles);
+    try (MockedConstruction<RDBSstFileWriter> rdbSstFileWriter =
+             Mockito.mockConstruction(RDBSstFileWriter.class, (mock, context) 
-> {
+               File file = (File) context.arguments().get(0);
+               assertTrue(file.createNewFile() || file.exists());
+               Path filePath = file.toPath().toAbsolutePath();
+               dumpedFileName.put(filePath.toString(), currentTable.get());
+               doAnswer(i -> {
+                 String key = 
StringCodec.get().fromCodecBuffer(i.getArgument(0));
+                 String value = 
StringCodec.get().fromCodecBuffer(i.getArgument(1));
+                 deltaFileContents.computeIfAbsent(filePath, k -> new 
ArrayList<>())
+                     .add(Pair.of(key, value));
+                 return null;
+               }).when(mock).put(any(CodecBuffer.class), 
any(CodecBuffer.class));
+               doAnswer(i -> {
+                 String key = 
StringCodec.get().fromCodecBuffer(i.getArgument(0));
+                 deltaFileContents.computeIfAbsent(filePath, k -> new 
ArrayList<>()).add(Pair.of(key, null));
+                 return null;
+               }).when(mock).delete(any(CodecBuffer.class));
+             });
+         MockedConstruction<SstFileSetReader> sstFileSetReader =
+             Mockito.mockConstruction(SstFileSetReader.class, (mock, context) 
-> {
+               String deltaKey = ((Collection<Path>) 
context.arguments().get(0))
+                   
.stream().map(Path::toAbsolutePath).map(Path::toString).sorted().collect(Collectors.joining(","));
+               String tableName = deltaFileNamesToTableMap.get(deltaKey);
+               currentTable.set(tableName);
+               doAnswer(i -> {
+                 String lowerBound = i.getArgument(0);
+                 String upperBound = i.getArgument(1);
+                 assertEquals(prefixInfo.getTablePrefix(tableName), 
lowerBound);
+                 
assertEquals(StringUtils.getLexicographicallyHigherString(lowerBound), 
upperBound);
+                 Iterator<String> itr = IntStream.range(0, 100).filter(idx -> 
idx % 6 < 5).boxed()
+                     .map(idx -> String.format("%s%03d", lowerBound, 
idx)).iterator();
+                 return new ClosableIterator<String>() {
+                   @Override
+                   public void close() {
+
+                   }
+
+                   @Override
+                   public boolean hasNext() {
+                     return itr.hasNext();
+                   }
+
+                   @Override
+                   public String next() {
+                     return itr.next();
+                   }
+                 };
+               }).when(mock).getKeyStreamWithTombstone(anyString(), 
anyString());
+             })
+    ) {
+      Map<String, String> ingestedFiles = new HashMap<>();
+      Map<String, Table> checkpointTables = incrementalTables.stream()
+          .map(name -> {
+            Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+            when(table.getName()).thenReturn(name);
+            try {
+              doAnswer(i -> {
+                File file = i.getArgument(0, File.class);
+                ingestedFiles.put(file.toPath().toAbsolutePath().toString(), 
name);
+                return null;
+              }).when(table).loadFromFile(any(File.class));
+            } catch (RocksDatabaseException e) {
+              throw new RuntimeException(e);
+            }
+            return table;
+          }).collect(Collectors.toMap(Table::getName, Function.identity()));
+      doAnswer(i -> {
+        String tableName = i.getArgument(0, String.class);
+        return checkpointTables.get(tableName);
+      }).when(checkpointDBStore).getTable(anyString());
+      defragService.performIncrementalDefragmentation(snap1Info, snap2Info, 
currentVersion, checkpointDBStore,
+          prefixInfo, incrementalTables);
+      if (currentVersion == 0) {
+        assertEquals(incrementalTables, new 
HashSet<>(dumpedFileName.values()));
+        assertEquals(ingestedFiles, dumpedFileName);
+      } else {
+        assertEquals(ImmutableSet.of("cf2"), new 
HashSet<>(dumpedFileName.values()));
+        assertEquals("cf1", 
ingestedFiles.get(deltaFiles.get(0).getLeft().toAbsolutePath().toString()));
+        assertEquals(ingestedFiles.entrySet().stream().filter(e -> 
e.getValue().equals(
+            "cf2")).collect(Collectors.toSet()), 
dumpedFileName.entrySet().stream().filter(e -> e.getValue().equals(
+            "cf2")).collect(Collectors.toSet()));
+      }
+      assertEquals(incrementalTables, new HashSet<>(ingestedFiles.values()));
+      for (Map.Entry<Path, List<Pair<String, String>>> deltaFileContent : 
deltaFileContents.entrySet()) {
+        int idx = 0;
+        for (int i = 0; i < 100; i++) {
+          String tableName = 
dumpedFileName.get(deltaFileContent.getKey().toString());
+          if (i % 6 < 3) {
+            String key = String.format("%s%03d", 
prefixInfo.getTablePrefix(tableName), i);
+            Pair<String, String> actualKey = 
deltaFileContent.getValue().get(idx);
+            CodecBuffer value = snap2TableValues.get(key);
+            Pair<String, String> expectedKey = Pair.of(key, value == null ? 
null :
+                StringCodec.get().fromCodecBuffer(value));
+            assertEquals(expectedKey, actualKey, "Key mismatch for table " + 
tableName + " at index " + i);
+            idx++;
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testCheckAndDefragDeletedSnapshot() throws IOException {
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    
snapshotInfo.setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
+    SnapshotChainManager chainManager = mock(SnapshotChainManager.class);
+    try (MockedStatic<SnapshotUtils> mockedStatic = 
Mockito.mockStatic(SnapshotUtils.class)) {
+      mockedStatic.when(() -> SnapshotUtils.getSnapshotInfo(eq(ozoneManager), 
eq(chainManager),
+          eq(snapshotInfo.getSnapshotId()))).thenReturn(snapshotInfo);
+      assertFalse(defragService.checkAndDefragSnapshot(chainManager, 
snapshotInfo.getSnapshotId()));
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testCheckAndDefragActiveSnapshot(boolean previousSnapshotExists) 
throws IOException {
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap2");
+    SnapshotInfo previousSnapshotInfo;
+    if (previousSnapshotExists) {
+      previousSnapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), "vol1", 
"bucket1", "snap1");
+      
snapshotInfo.setPathPreviousSnapshotId(previousSnapshotInfo.getSnapshotId());
+    } else {
+      previousSnapshotInfo = null;
+    }
+
+    SnapshotChainManager chainManager = mock(SnapshotChainManager.class);
+    try (MockedStatic<SnapshotUtils> mockedStatic = 
Mockito.mockStatic(SnapshotUtils.class)) {
+      mockedStatic.when(() -> SnapshotUtils.getSnapshotInfo(eq(ozoneManager), 
eq(chainManager),
+          eq(snapshotInfo.getSnapshotId()))).thenReturn(snapshotInfo);
+      if (previousSnapshotExists) {
+        mockedStatic.when(() -> 
SnapshotUtils.getSnapshotInfo(eq(ozoneManager), eq(chainManager),
+            
eq(previousSnapshotInfo.getSnapshotId()))).thenReturn(previousSnapshotInfo);
+      }
+      SnapshotDefragService spyDefragService = Mockito.spy(defragService);
+      doReturn(Pair.of(true, 
10)).when(spyDefragService).needsDefragmentation(eq(snapshotInfo));
+      OmMetadataManagerImpl checkpointMetadataManager = 
mock(OmMetadataManagerImpl.class);
+      File checkpointPath = 
tempDir.resolve("checkpoint").toAbsolutePath().toFile();
+      DBStore checkpointDBStore = mock(DBStore.class);
+      SnapshotInfo checkpointSnapshotInfo = previousSnapshotExists ? 
previousSnapshotInfo : snapshotInfo;
+      when(checkpointMetadataManager.getStore()).thenReturn(checkpointDBStore);
+      when(checkpointDBStore.getDbLocation()).thenReturn(checkpointPath);
+      
doReturn(checkpointMetadataManager).when(spyDefragService).createCheckpoint(eq(checkpointSnapshotInfo),
+          eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      TablePrefixInfo prefixInfo = new TablePrefixInfo(Collections.emptyMap());
+      
when(metadataManager.getTableBucketPrefix(eq(snapshotInfo.getVolumeName()), 
eq(snapshotInfo.getBucketName())))
+          .thenReturn(prefixInfo);
+      
doNothing().when(spyDefragService).performFullDefragmentation(eq(checkpointDBStore),
 eq(prefixInfo),
+          eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      
doNothing().when(spyDefragService).performIncrementalDefragmentation(eq(previousSnapshotInfo),
+          eq(snapshotInfo), eq(10), eq(checkpointDBStore), eq(prefixInfo),
+          eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      AtomicInteger lockAcquired = new AtomicInteger(0);
+      AtomicInteger lockReleased = new AtomicInteger(0);
+      when(omLock.acquireWriteLocks(eq(SNAPSHOT_DB_CONTENT_LOCK), 
anyCollection())).thenAnswer(i -> {
+        if (i.getArgument(1) != null && i.getArgument(1, 
Collection.class).size() == 1) {
+          Collection<String[]> keys = i.getArgument(1, Collection.class);
+          assertEquals(snapshotInfo.getSnapshotId().toString(), 
keys.stream().findFirst().get()[0]);
+          lockAcquired.incrementAndGet();
+        }
+        return OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
+      });
+      when(omLock.releaseWriteLocks(eq(SNAPSHOT_DB_CONTENT_LOCK), 
anyCollection())).thenAnswer(i -> {
+        if (i.getArgument(1) != null && i.getArgument(1, 
Collection.class).size() == 1) {
+          Collection<String[]> keys = i.getArgument(1, Collection.class);
+          assertEquals(snapshotInfo.getSnapshotId().toString(), 
keys.stream().findFirst().get()[0]);
+          lockReleased.incrementAndGet();
+        }
+        return OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
+      });
+      AtomicBoolean checkpointClosed = new AtomicBoolean(false);
+      doAnswer(i -> {
+        assertTrue(lockAcquired.get() == 1 && lockReleased.get() == 0);
+        return null;
+      
}).when(spyDefragService).ingestNonIncrementalTables(eq(checkpointDBStore),
+          eq(snapshotInfo), eq(prefixInfo), 
eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      doAnswer(i -> {
+        assertTrue(lockAcquired.get() == 1 && lockReleased.get() == 0);
+        checkpointClosed.set(true);
+        return null;
+      }).when(checkpointMetadataManager).close();
+      doAnswer(i -> {
+        assertTrue(lockAcquired.get() == 1 && lockReleased.get() == 0);
+        assertTrue(checkpointClosed.get());
+        return 20;
+      
}).when(spyDefragService).atomicSwitchSnapshotDB(eq(snapshotInfo.getSnapshotId()),
 eq(checkpointPath.toPath()));
+      doAnswer(i -> {
+        assertTrue(lockAcquired.get() == 1 && lockReleased.get() == 0);
+        assertTrue(checkpointClosed.get());
+        return null;
+      
}).when(omSnapshotManager).deleteSnapshotCheckpointDirectories(eq(snapshotInfo.getSnapshotId()),
 eq(20));
+      InOrder verifier = inOrder(spyDefragService, omSnapshotManager);
+      assertTrue(spyDefragService.checkAndDefragSnapshot(chainManager, 
snapshotInfo.getSnapshotId()));
+      assertTrue(lockAcquired.get() == 1 && lockReleased.get() == 1);
+      verifier.verify(spyDefragService).needsDefragmentation(eq(snapshotInfo));
+      
verifier.verify(spyDefragService).createCheckpoint(eq(checkpointSnapshotInfo),
+          eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      if (previousSnapshotExists) {
+        
verifier.verify(spyDefragService).performIncrementalDefragmentation(eq(previousSnapshotInfo),
+            eq(snapshotInfo), eq(10), eq(checkpointDBStore), eq(prefixInfo),
+            eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      } else {
+        
verifier.verify(spyDefragService).performFullDefragmentation(eq(checkpointDBStore),
 eq(prefixInfo),
+            eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      }
+      
verifier.verify(spyDefragService).ingestNonIncrementalTables(eq(checkpointDBStore),
 eq(snapshotInfo),
+          eq(prefixInfo), eq(COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT));
+      
verifier.verify(spyDefragService).atomicSwitchSnapshotDB(eq(snapshotInfo.getSnapshotId()),
+          eq(checkpointPath.toPath()));
+      
verifier.verify(omSnapshotManager).deleteSnapshotCheckpointDirectories(eq(snapshotInfo.getSnapshotId()),
 eq(20));
+    }
+  }
+
   /**
    * Helper method to create a mock SnapshotInfo.
    */
-  private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, String volume,
-                                                String bucket, String name) {
+  private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, String volume, 
String bucket, String name) {
     SnapshotInfo.Builder builder = SnapshotInfo.newBuilder();
     builder.setSnapshotId(snapshotId);
     builder.setVolumeName(volume);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to