This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c351de99147 HDDS-13756. Introduce Bucket Snapshot Used Bytes and 
SnapshotUsedNamespace in BucketInfo (#9115)
c351de99147 is described below

commit c351de991478b75d1560e48aa04e0cc6ebfb40ae
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Tue Oct 7 18:24:37 2025 -0400

    HDDS-13756. Introduce Bucket Snapshot Used Bytes and SnapshotUsedNamespace 
in BucketInfo (#9115)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |  2 +
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      | 85 +++++++++++++++++++++-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  | 36 +++++++++
 .../src/main/proto/OmClientProtocol.proto          |  2 +
 .../hadoop/ozone/om/BucketUtilizationMetrics.java  |  6 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  2 +
 .../key/OMDirectoriesPurgeRequestWithFSO.java      | 10 +--
 .../ozone/om/request/key/OMKeyCommitRequest.java   | 32 ++++++--
 .../om/request/key/OMKeyCommitRequestWithFSO.java  | 31 ++++++--
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |  6 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |  6 +-
 .../ozone/om/request/key/OMKeyPurgeRequest.java    |  4 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 40 +++++++---
 .../ozone/om/request/key/OMKeysDeleteRequest.java  | 21 ++++--
 .../om/request/key/OmKeysDeleteRequestWithFSO.java | 16 ++--
 .../response/key/AbstractOMKeyDeleteResponse.java  | 33 +++------
 .../key/OMDirectoriesPurgeResponseWithFSO.java     |  1 +
 .../ozone/om/response/key/OMKeyDeleteResponse.java |  2 +-
 .../response/key/OMKeyDeleteResponseWithFSO.java   |  2 +-
 .../om/response/key/OMKeysDeleteResponse.java      |  2 +-
 .../response/key/OMKeysDeleteResponseWithFSO.java  |  2 +-
 .../om/response/key/OMOpenKeysDeleteResponse.java  |  2 +-
 .../hadoop/ozone/om/service/QuotaRepairTask.java   |  4 +-
 .../hadoop/ozone/om/TestBucketManagerImpl.java     |  6 ++
 .../ozone/om/TestBucketUtilizationMetrics.java     | 12 ++-
 .../request/key/TestOMOpenKeysDeleteRequest.java   |  6 ++
 .../om/response/key/TestOMKeyDeleteResponse.java   |  4 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |  1 +
 .../ozone/om/service/TestQuotaRepairTask.java      |  2 +-
 .../ozone/recon/api/types/BucketObjectDBInfo.java  |  8 ++
 30 files changed, 294 insertions(+), 92 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 0b18d8aef67..bb6eef205e4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -262,6 +262,8 @@ public final class OzoneConsts {
   public static final String DST_KEY = "dstKey";
   public static final String USED_BYTES = "usedBytes";
   public static final String USED_NAMESPACE = "usedNamespace";
+  public static final String SNAPSHOT_USED_BYTES = "snapshotUsedBytes";
+  public static final String SNAPSHOT_USED_NAMESPACE = "snapshotUsedNamespace";
   public static final String QUOTA_IN_BYTES = "quotaInBytes";
   public static final String QUOTA_IN_NAMESPACE = "quotaInNamespace";
   public static final String OBJECT_ID = "objectID";
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index 85ac9290c60..b938a1fff7b 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -96,6 +96,12 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable, CopyO
   private long usedNamespace;
   private final long quotaInBytes;
   private final long quotaInNamespace;
+  // Total size of data trapped which is pending to be deleted either because 
of data trapped in snapshots or
+  // background key deleting service is yet to run.
+  // This also indicates the size exclusively held by all snapshots of this 
bucket.
+  // i.e. when all snapshots of this bucket are deleted and purged, this much 
space would be released.
+  private long snapshotUsedBytes;
+  private long snapshotUsedNamespace;
 
   /**
    * Bucket Layout.
@@ -118,6 +124,8 @@ private OmBucketInfo(Builder b) {
     this.sourceBucket = b.sourceBucket;
     this.usedBytes = b.usedBytes;
     this.usedNamespace = b.usedNamespace;
+    this.snapshotUsedBytes = b.snapshotUsedBytes;
+    this.snapshotUsedNamespace = b.snapshotUsedNamespace;
     this.quotaInBytes = b.quotaInBytes;
     this.quotaInNamespace = b.quotaInNamespace;
     this.bucketLayout = b.bucketLayout;
@@ -249,6 +257,22 @@ public String getSourceBucket() {
     return sourceBucket;
   }
 
+  public long getTotalBucketSpace() {
+    return usedBytes + snapshotUsedBytes;
+  }
+
+  public long getTotalBucketNamespace() {
+    return usedNamespace + snapshotUsedNamespace;
+  }
+
+  public long getSnapshotUsedBytes() {
+    return snapshotUsedBytes;
+  }
+
+  public long getSnapshotUsedNamespace() {
+    return snapshotUsedNamespace;
+  }
+
   public long getUsedBytes() {
     return usedBytes;
   }
@@ -261,10 +285,40 @@ public void incrUsedBytes(long bytes) {
     this.usedBytes += bytes;
   }
 
+  public void decrUsedBytes(long bytes, boolean increasePendingDeleteBytes) {
+    this.usedBytes -= bytes;
+    if (increasePendingDeleteBytes) {
+      incrSnapshotUsedBytes(bytes);
+    }
+  }
+
+  private void incrSnapshotUsedBytes(long bytes) {
+    this.snapshotUsedBytes += bytes;
+  }
+
   public void incrUsedNamespace(long namespaceToUse) {
     this.usedNamespace += namespaceToUse;
   }
 
+  public void decrUsedNamespace(long namespaceToUse, boolean 
increasePendingDeleteNamespace) {
+    this.usedNamespace -= namespaceToUse;
+    if (increasePendingDeleteNamespace) {
+      incrSnapshotUsedNamespace(namespaceToUse);
+    }
+  }
+
+  private void incrSnapshotUsedNamespace(long namespaceToUse) {
+    this.snapshotUsedNamespace += namespaceToUse;
+  }
+
+  public void purgeSnapshotUsedBytes(long bytes) {
+    this.snapshotUsedBytes -= bytes;
+  }
+
+  public void purgeSnapshotUsedNamespace(long namespaceToUse) {
+    this.snapshotUsedNamespace -= namespaceToUse;
+  }
+
   public long getQuotaInBytes() {
     return quotaInBytes;
   }
@@ -324,6 +378,8 @@ public Map<String, String> toAuditMap() {
     auditMap.put(OzoneConsts.USED_BYTES, String.valueOf(this.usedBytes));
     auditMap.put(OzoneConsts.USED_NAMESPACE,
         String.valueOf(this.usedNamespace));
+    auditMap.put(OzoneConsts.SNAPSHOT_USED_BYTES, 
String.valueOf(this.snapshotUsedBytes));
+    auditMap.put(OzoneConsts.SNAPSHOT_USED_NAMESPACE, 
String.valueOf(this.snapshotUsedNamespace));
     auditMap.put(OzoneConsts.OWNER, this.owner);
     auditMap.put(OzoneConsts.REPLICATION_TYPE,
         (this.defaultReplicationConfig != null) ?
@@ -369,6 +425,8 @@ public Builder toBuilder() {
         .setUsedNamespace(usedNamespace)
         .setQuotaInBytes(quotaInBytes)
         .setQuotaInNamespace(quotaInNamespace)
+        .setSnapshotUsedBytes(snapshotUsedBytes)
+        .setSnapshotUsedNamespace(snapshotUsedNamespace)
         .setBucketLayout(bucketLayout)
         .setOwner(owner)
         .setDefaultReplicationConfig(defaultReplicationConfig);
@@ -395,6 +453,8 @@ public static class Builder extends WithObjectID.Builder {
     private BucketLayout bucketLayout = BucketLayout.DEFAULT;
     private String owner;
     private DefaultReplicationConfig defaultReplicationConfig;
+    private long snapshotUsedBytes;
+    private long snapshotUsedNamespace;
 
     public Builder() {
     }
@@ -505,6 +565,18 @@ public Builder setUsedNamespace(long quotaUsage) {
       return this;
     }
 
+    /** @param snapshotUsedBytes - Bucket Quota Snapshot Usage in bytes. */
+    public Builder setSnapshotUsedBytes(long snapshotUsedBytes) {
+      this.snapshotUsedBytes = snapshotUsedBytes;
+      return this;
+    }
+
+    /** @param snapshotUsedNamespace - Bucket Quota Snapshot Usage in counts. 
*/
+    public Builder setSnapshotUsedNamespace(long snapshotUsedNamespace) {
+      this.snapshotUsedNamespace = snapshotUsedNamespace;
+      return this;
+    }
+
     /** @param quota Bucket quota in bytes. */
     public Builder setQuotaInBytes(long quota) {
       this.quotaInBytes = quota;
@@ -564,7 +636,9 @@ public BucketInfo getProtobuf() {
         .setUsedNamespace(usedNamespace)
         .addAllMetadata(KeyValueUtil.toProtobuf(getMetadata()))
         .setQuotaInBytes(quotaInBytes)
-        .setQuotaInNamespace(quotaInNamespace);
+        .setQuotaInNamespace(quotaInNamespace)
+        .setSnapshotUsedBytes(snapshotUsedBytes)
+        .setSnapshotUsedNamespace(snapshotUsedNamespace);
     if (bucketLayout != null) {
       bib.setBucketLayout(bucketLayout.toProto());
     }
@@ -614,7 +688,10 @@ public static OmBucketInfo getFromProtobuf(BucketInfo 
bucketInfo,
         .setModificationTime(bucketInfo.getModificationTime())
         .setQuotaInBytes(bucketInfo.getQuotaInBytes())
         .setUsedNamespace(bucketInfo.getUsedNamespace())
-        .setQuotaInNamespace(bucketInfo.getQuotaInNamespace());
+        .setQuotaInNamespace(bucketInfo.getQuotaInNamespace())
+        .setSnapshotUsedBytes(bucketInfo.getSnapshotUsedBytes())
+        .setSnapshotUsedNamespace(bucketInfo.getSnapshotUsedNamespace());
+
     if (buckLayout != null) {
       obib.setBucketLayout(buckLayout);
     } else if (bucketInfo.getBucketLayout() != null) {
@@ -693,6 +770,8 @@ public boolean equals(Object o) {
         getUpdateID() == that.getUpdateID() &&
         usedBytes == that.usedBytes &&
         usedNamespace == that.usedNamespace &&
+        snapshotUsedBytes == that.snapshotUsedBytes &&
+        snapshotUsedNamespace == that.snapshotUsedNamespace &&
         Objects.equals(sourceVolume, that.sourceVolume) &&
         Objects.equals(sourceBucket, that.sourceBucket) &&
         Objects.equals(getMetadata(), that.getMetadata()) &&
@@ -723,6 +802,8 @@ public String toString() {
         ", metadata=" + getMetadata() +
         ", usedBytes=" + usedBytes +
         ", usedNamespace=" + usedNamespace +
+        ", snapshotUsedBytes=" + snapshotUsedBytes +
+        ", snapshotUsedNamespace=" + snapshotUsedNamespace +
         ", quotaInBytes=" + quotaInBytes +
         ", quotaInNamespace=" + quotaInNamespace +
         ", bucketLayout=" + bucketLayout +
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index accdbea6b51..29c25778ed7 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.om.helpers;
 
 import com.google.common.collect.ImmutableList;
+import jakarta.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +58,12 @@ public final class OmKeyInfo extends WithParentObjectId
 
   private static final Codec<OmKeyInfo> CODEC_TRUE = newCodec(true);
   private static final Codec<OmKeyInfo> CODEC_FALSE = newCodec(false);
+  /**
+   * Metadata key flag to indicate whether a deleted key was a committed key.
+   * The flag is set when a committed key is deleted from AOS but still held in
+   * a snapshot to help with accurate bucket quota usage calculation.
+   */
+  private static final String COMMITTED_KEY_DELETED_FLAG = "CKDEL";
 
   private final String volumeName;
   private final String bucketName;
@@ -69,6 +76,7 @@ public final class OmKeyInfo extends WithParentObjectId
   private ReplicationConfig replicationConfig;
   private FileEncryptionInfo encInfo;
   private final FileChecksum fileChecksum;
+
   /**
    * Support OFS use-case to identify if the key is a file or a directory.
    */
@@ -185,6 +193,18 @@ public String getOwnerName() {
     return ownerName;
   }
 
+  public void setCommittedKeyDeletedFlag(boolean val) {
+    if (val) {
+      this.getMetadata().put(COMMITTED_KEY_DELETED_FLAG, "true");
+    } else {
+      this.getMetadata().remove(COMMITTED_KEY_DELETED_FLAG);
+    }
+  }
+
+  public boolean isDeletedKeyCommitted() {
+    return Boolean.parseBoolean(getMetadata().get(COMMITTED_KEY_DELETED_FLAG));
+  }
+
   /**
    * Returns the generation of the object. Note this is currently the same as 
updateID for a key.
    * @return long
@@ -927,4 +947,20 @@ public String getPath() {
     }
     return getParentObjectID() + OzoneConsts.OM_KEY_PREFIX + getFileName();
   }
+
+  public boolean hasBlocks() {
+    for (OmKeyLocationInfoGroup keyLocationList : getKeyLocationVersions()) {
+      if (keyLocationList.getLocationListCount() != 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static boolean isKeyEmpty(@Nullable OmKeyInfo keyInfo) {
+    if (keyInfo == null) {
+      return true;
+    }
+    return !keyInfo.hasBlocks();
+  }
 }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 6f936ce4767..2329da73c64 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -772,6 +772,8 @@ message BucketInfo {
     optional BucketLayoutProto bucketLayout = 18;
     optional string owner = 19;
     optional hadoop.hdds.DefaultReplicationConfig defaultReplicationConfig = 
20;
+    optional uint64 snapshotUsedBytes = 21;
+    optional uint64 snapshotUsedNamespace = 22;
 }
 
 enum BucketLayoutProto {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketUtilizationMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketUtilizationMetrics.java
index 6ac3e604b90..f45052de441 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketUtilizationMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketUtilizationMetrics.java
@@ -84,6 +84,7 @@ public void getMetrics(MetricsCollector collector, boolean 
all) {
           .tag(BucketMetricsInfo.VolumeName, bucketInfo.getVolumeName())
           .tag(BucketMetricsInfo.BucketName, bucketInfo.getBucketName())
           .addGauge(BucketMetricsInfo.BucketUsedBytes, 
bucketInfo.getUsedBytes())
+          .addGauge(BucketMetricsInfo.BucketSnapshotUsedBytes, 
bucketInfo.getSnapshotUsedBytes())
           .addGauge(BucketMetricsInfo.BucketQuotaBytes, 
bucketInfo.getQuotaInBytes())
           .addGauge(BucketMetricsInfo.BucketQuotaNamespace, 
bucketInfo.getQuotaInNamespace())
           .addGauge(BucketMetricsInfo.BucketAvailableBytes, availableSpace);
@@ -98,8 +99,9 @@ public void unRegister() {
   enum BucketMetricsInfo implements MetricsInfo {
     VolumeName("Volume Metrics."),
     BucketName("Bucket Metrics."),
-    BucketUsedBytes("Bytes used by bucket."),
-    BucketQuotaBytes("Bucket quote in bytes."),
+    BucketUsedBytes("Bytes used by bucket in AOS."),
+    BucketQuotaBytes("Bucket quota in bytes"),
+    BucketSnapshotUsedBytes("Bucket quota bytes held in snapshots"),
     BucketQuotaNamespace("Bucket quota in namespace."),
     BucketAvailableBytes("Bucket available space.");
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index c8bc0954d3e..75dd29d67bf 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3011,6 +3011,8 @@ public OmBucketInfo getBucketInfo(String volume, String 
bucket)
             .setQuotaInBytes(realBucket.getQuotaInBytes())
             .setQuotaInNamespace(realBucket.getQuotaInNamespace())
             .setUsedBytes(realBucket.getUsedBytes())
+            .setSnapshotUsedBytes(realBucket.getSnapshotUsedBytes())
+            .setSnapshotUsedNamespace(realBucket.getSnapshotUsedNamespace())
             .setUsedNamespace(realBucket.getUsedNamespace())
             .addAllMetadata(realBucket.getMetadata())
             .setBucketLayout(realBucket.getBucketLayout())
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index fb0af869884..4724410803e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -141,9 +141,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager, Execut
               processed.volumeName, processed.bucketName);
           // bucketInfo can be null in case of delete volume or bucket
           // or key does not belong to bucket as bucket is recreated
-          if (null != omBucketInfo
-              && omBucketInfo.getObjectID() == path.getBucketId()) {
-            omBucketInfo.incrUsedNamespace(-1L);
+          if (null != omBucketInfo && omBucketInfo.getObjectID() == 
path.getBucketId()) {
+            omBucketInfo.decrUsedNamespace(1L, true);
             String ozoneDbKey = 
omMetadataManager.getOzonePathKey(path.getVolumeId(),
                 path.getBucketId(), processed.keyInfo.getParentObjectID(),
                 processed.keyInfo.getFileName());
@@ -180,8 +179,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager, Execut
           // or key does not belong to bucket as bucket is recreated
           if (null != omBucketInfo
               && omBucketInfo.getObjectID() == path.getBucketId()) {
-            omBucketInfo.incrUsedBytes(-sumBlockLengths(processed.keyInfo));
-            omBucketInfo.incrUsedNamespace(-1L);
+            long totalSize = sumBlockLengths(processed.keyInfo);
+            omBucketInfo.decrUsedBytes(totalSize, true);
+            omBucketInfo.decrUsedNamespace(1L, true);
             String ozoneDbKey = 
omMetadataManager.getOzonePathKey(path.getVolumeId(),
                 path.getBucketId(), processed.keyInfo.getParentObjectID(),
                 processed.keyInfo.getFileName());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 2a1f4fe822c..31f1d9d7180 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneManagerVersion;
@@ -49,6 +50,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.WithMetadata;
 import org.apache.hadoop.ozone.om.request.util.OmKeyHSyncUtil;
@@ -317,12 +319,8 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
             correctedSpace);
       } else if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
-        // Subtract the size of blocks to be overwritten.
-        correctedSpace -= keyToDelete.getReplicatedSize();
         RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
             keyToDelete, omBucketInfo.getObjectID(), trxnLogIndex);
-        checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
-            correctedSpace);
         // using pseudoObjId as objectId can be same in case of overwrite key
         long pseudoObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex);
         String delKeyName = omMetadataManager.getOzoneDeletePathKey(
@@ -335,18 +333,36 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         // and local ID with omKeyInfo blocks'.
         // Otherwise, it causes data loss once those shared blocks are added
         // to deletedTable and processed by KeyDeletingService for deletion.
-        filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
-
+        Pair<Map<OmKeyInfo, List<OmKeyLocationInfo>>, Integer> 
filteredUsedBlockCnt =
+            filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
+        Map<OmKeyInfo, List<OmKeyLocationInfo>> blocks = 
filteredUsedBlockCnt.getLeft();
+        correctedSpace -= 
blocks.entrySet().stream().mapToLong(filteredKeyBlocks ->
+            filteredKeyBlocks.getValue().stream().mapToLong(block -> 
QuotaUtil.getReplicatedSize(
+                block.getLength(), 
filteredKeyBlocks.getKey().getReplicationConfig())).sum()).sum();
+        long totalSize = 0;
+        long totalNamespace = 0;
         if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
           oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+          for (OmKeyInfo olderKeyVersions : oldVerKeyInfo.getOmKeyInfoList()) {
+            olderKeyVersions.setCommittedKeyDeletedFlag(true);
+            totalSize += sumBlockLengths(olderKeyVersions);
+            totalNamespace += 1;
+          }
         }
+        checkBucketQuotaInNamespace(omBucketInfo, 1L);
+        checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
+            correctedSpace);
+        // Subtract the size of blocks to be overwritten.
+        omBucketInfo.decrUsedNamespace(totalNamespace, true);
+        // Subtract the used namespace of empty overwritten keys.
+        omBucketInfo.decrUsedNamespace(filteredUsedBlockCnt.getRight(), false);
+        omBucketInfo.decrUsedBytes(totalSize, true);
       } else {
         checkBucketQuotaInNamespace(omBucketInfo, 1L);
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
             correctedSpace);
-        omBucketInfo.incrUsedNamespace(1L);
       }
-
+      omBucketInfo.incrUsedNamespace(1L);
       // let the uncommitted blocks pretend as key's old version blocks
       // which will be deleted as RepeatedOmKeyInfo
       final OmKeyInfo pseudoKeyInfo = isHSync ? null
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 74d0d998215..a23716d40d1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
@@ -42,6 +43,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmFSOFile;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.WithMetadata;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
@@ -247,12 +249,8 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
             correctedSpace);
       } else if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
-        // Subtract the size of blocks to be overwritten.
-        correctedSpace -= keyToDelete.getReplicatedSize();
         RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
             keyToDelete, omBucketInfo.getObjectID(), trxnLogIndex);
-        checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
-            correctedSpace);
         String delKeyName = omMetadataManager
             .getOzoneKey(volumeName, bucketName, fileName);
         // using pseudoObjId as objectId can be same in case of overwrite key
@@ -267,17 +265,36 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         // and local ID with omKeyInfo blocks'.
         // Otherwise, it causes data loss once those shared blocks are added
         // to deletedTable and processed by KeyDeletingService for deletion.
-        filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
-
+        Pair<Map<OmKeyInfo, List<OmKeyLocationInfo>>, Integer> 
filteredUsedBlockCnt =
+            filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
+        Map<OmKeyInfo, List<OmKeyLocationInfo>> blocks = 
filteredUsedBlockCnt.getLeft();
+        correctedSpace -= 
blocks.entrySet().stream().mapToLong(filteredKeyBlocks ->
+            filteredKeyBlocks.getValue().stream().mapToLong(block -> 
QuotaUtil.getReplicatedSize(
+                block.getLength(), 
filteredKeyBlocks.getKey().getReplicationConfig())).sum()).sum();
+        long totalSize = 0;
+        long totalNamespace = 0;
         if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
           oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+          for (OmKeyInfo olderKeyVersions : oldVerKeyInfo.getOmKeyInfoList()) {
+            olderKeyVersions.setCommittedKeyDeletedFlag(true);
+            totalSize += sumBlockLengths(olderKeyVersions);
+            totalNamespace += 1;
+          }
         }
+        // Subtract the size of blocks to be overwritten.
+        checkBucketQuotaInNamespace(omBucketInfo, 1L);
+        checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
+            correctedSpace);
+        // Subtract the size of blocks to be overwritten.
+        omBucketInfo.decrUsedNamespace(totalNamespace, true);
+        omBucketInfo.decrUsedNamespace(filteredUsedBlockCnt.getRight(), false);
+        omBucketInfo.decrUsedBytes(totalSize, true);
       } else {
         checkBucketQuotaInNamespace(omBucketInfo, 1L);
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
             correctedSpace);
-        omBucketInfo.incrUsedNamespace(1L);
       }
+      omBucketInfo.incrUsedNamespace(1L);
 
       // let the uncommitted blocks pretend as key's old version blocks
       // which will be deleted as RepeatedOmKeyInfo
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
index f9290a52632..5c2065356c0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
@@ -159,8 +159,10 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
           getBucketInfo(omMetadataManager, volumeName, bucketName);
 
       long quotaReleased = sumBlockLengths(omKeyInfo);
-      omBucketInfo.incrUsedBytes(-quotaReleased);
-      omBucketInfo.incrUsedNamespace(-1L);
+      // Empty entries won't be added to deleted table so this key shouldn't 
get added to snapshotUsed space.
+      boolean isKeyNonEmpty = !OmKeyInfo.isKeyEmpty(omKeyInfo);
+      omBucketInfo.decrUsedBytes(quotaReleased, isKeyNonEmpty);
+      omBucketInfo.decrUsedNamespace(1L, isKeyNonEmpty);
       OmKeyInfo deletedOpenKeyInfo = null;
 
       // If omKeyInfo has hsync metadata, delete its corresponding open key as 
well
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
index 1fc3ec615f4..75b5966e005 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
@@ -158,8 +158,10 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
 
       // TODO: HDDS-4565: consider all the sub-paths if the path is a dir.
       long quotaReleased = sumBlockLengths(omKeyInfo);
-      omBucketInfo.incrUsedBytes(-quotaReleased);
-      omBucketInfo.incrUsedNamespace(-1L);
+      // Empty entries won't be added to deleted table so this key shouldn't 
get added to snapshotUsed space.
+      boolean isKeyNonEmpty = !OmKeyInfo.isKeyEmpty(omKeyInfo);
+      omBucketInfo.decrUsedBytes(quotaReleased, isKeyNonEmpty);
+      omBucketInfo.decrUsedNamespace(1L, isKeyNonEmpty);
 
       // If omKeyInfo has hsync metadata, delete its corresponding open key as 
well
       String dbOpenKey = null;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index 64e3fa31244..66973b294c1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -117,8 +117,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager, Execut
     
deletingServiceMetrics.incrNumRenameEntriesPurged(renamedKeysToBePurged.size());
 
     if (keysToBePurgedList.isEmpty() && renamedKeysToBePurged.isEmpty()) {
-      OMException oe = new OMException("None of the keys can be purged be 
purged since a new snapshot was created " +
-          "for all the buckets, making this request invalid", 
OMException.ResultCodes.KEY_DELETION_ERROR);
+      OMException oe = new OMException("No keys found to be purged or renamed 
in the request.",
+          OMException.ResultCodes.KEY_DELETION_ERROR);
       
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.KEY_DELETION,
 null, oe));
       return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, oe));
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 3d33361b9c4..c267041cc16 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -41,11 +41,11 @@
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
@@ -91,6 +91,7 @@
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
@@ -891,7 +892,7 @@ protected boolean checkDirectoryAlreadyExists(String 
volumeName,
   /**
    * @return the number of bytes used by blocks pointed to by {@code 
omKeyInfo}.
    */
-  protected static long sumBlockLengths(OmKeyInfo omKeyInfo) {
+  public static long sumBlockLengths(OmKeyInfo omKeyInfo) {
     long bytesUsed = 0;
     for (OmKeyLocationInfoGroup group: omKeyInfo.getKeyLocationVersions()) {
       for (OmKeyLocationInfo locationInfo : group.getLocationList()) {
@@ -903,6 +904,22 @@ protected static long sumBlockLengths(OmKeyInfo omKeyInfo) 
{
     return bytesUsed;
   }
 
+  /**
+   * @return the number of bytes used by blocks pointed to by {@code 
omKeyInfo}.
+   */
+  public static long sumBlockLengths(OzoneManagerProtocolProtos.KeyInfo 
keyInfo) {
+    long bytesUsed = 0;
+    ReplicationConfig replicationConfig = 
ReplicationConfig.fromProto(keyInfo.getType(), keyInfo.getFactor(),
+            keyInfo.getEcReplicationConfig());
+    for (OzoneManagerProtocolProtos.KeyLocationList group: 
keyInfo.getKeyLocationListList()) {
+      for (OzoneManagerProtocolProtos.KeyLocation locationInfo : 
group.getKeyLocationsList()) {
+        bytesUsed += QuotaUtil.getReplicatedSize(locationInfo.getLength(), 
replicationConfig);
+      }
+    }
+
+    return bytesUsed;
+  }
+
   /**
    * Return bucket info for the specified bucket.
    */
@@ -1200,20 +1217,20 @@ protected static Map<String, RepeatedOmKeyInfo> 
addKeyInfoToDeleteMap(OzoneManag
    * @param referenceKey OmKeyInfo
    * @param keysToBeFiltered RepeatedOmKeyInfo
    */
-  protected void filterOutBlocksStillInUse(OmKeyInfo referenceKey,
-                                           RepeatedOmKeyInfo keysToBeFiltered) 
{
+  protected Pair<Map<OmKeyInfo, List<OmKeyLocationInfo>>, Integer> 
filterOutBlocksStillInUse(OmKeyInfo referenceKey,
+      RepeatedOmKeyInfo keysToBeFiltered) {
 
     LOG.debug("Before block filtering, keysToBeFiltered = {}",
         keysToBeFiltered);
 
     // A HashSet for fast lookup. Gathers all ContainerBlockID entries inside
     // the referenceKey.
-    HashSet<ContainerBlockID> cbIdSet = referenceKey.getKeyLocationVersions()
+    Map<ContainerBlockID, OmKeyLocationInfo> cbIdSet = 
referenceKey.getKeyLocationVersions()
         .stream()
         .flatMap(e -> e.getLocationList().stream())
-        .map(omKeyLocationInfo ->
-            omKeyLocationInfo.getBlockID().getContainerBlockID())
-        .collect(Collectors.toCollection(HashSet::new));
+        .collect(Collectors.toMap(omKeyLocationInfo -> 
omKeyLocationInfo.getBlockID().getContainerBlockID(),
+            Function.identity()));
+    Map<OmKeyInfo, List<OmKeyLocationInfo>> filteredOutBlocks = new 
HashMap<>();
 
     // Pardon the nested loops. ContainerBlockID is 9-layer deep from:
     // keysToBeFiltered               // Layer 0. RepeatedOmKeyInfo
@@ -1232,7 +1249,7 @@ protected void filterOutBlocksStillInUse(OmKeyInfo 
referenceKey,
     // Layer 1: List<OmKeyInfo>
     Iterator<OmKeyInfo> iterOmKeyInfo = keysToBeFiltered
         .getOmKeyInfoList().iterator();
-
+    int emptyKeyRemovedCount = 0;
     while (iterOmKeyInfo.hasNext()) {
       // Note with HDDS-8462, each RepeatedOmKeyInfo should have only one 
entry,
       // so this outer most loop should never be entered twice in each call.
@@ -1266,8 +1283,9 @@ protected void filterOutBlocksStillInUse(OmKeyInfo 
referenceKey,
             ContainerBlockID cbId = keyLocationInfo
                 .getBlockID().getContainerBlockID();
 
-            if (cbIdSet.contains(cbId)) {
+            if (cbIdSet.containsKey(cbId)) {
               // Remove this block from oldVerKeyInfo because it is referenced.
+              filteredOutBlocks.computeIfAbsent(oldOmKeyInfo, (k) -> new 
ArrayList<>()).add(keyLocationInfo);
               iterKeyLocInfo.remove();
               LOG.debug("Filtered out block: {}", cbId);
             }
@@ -1287,6 +1305,7 @@ protected void filterOutBlocksStillInUse(OmKeyInfo 
referenceKey,
 
       // Cleanup when Layer 3 is an empty list
       if (oldOmKeyInfo.getKeyLocationVersions().isEmpty()) {
+        emptyKeyRemovedCount++;
         iterOmKeyInfo.remove();
       }
     }
@@ -1294,6 +1313,7 @@ protected void filterOutBlocksStillInUse(OmKeyInfo 
referenceKey,
     // Intentional extra space for alignment
     LOG.debug("After block filtering,  keysToBeFiltered = {}",
         keysToBeFiltered);
+    return Pair.of(filteredOutBlocks, emptyKeyRemovedCount);
   }
 
   protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs 
keyArgs) throws OMException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index 3d79df51bb8..427b2978f9c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -187,17 +187,19 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         }
       }
 
-      long quotaReleased = 0;
       OmBucketInfo omBucketInfo =
           getBucketInfo(omMetadataManager, volumeName, bucketName);
 
       Map<String, OmKeyInfo> openKeyInfoMap = new HashMap<>();
       // Mark all keys which can be deleted, in cache as deleted.
-      quotaReleased =
+      Pair<Long, Integer> quotaReleasedEmptyKeys =
           markKeysAsDeletedInCache(ozoneManager, trxnLogIndex, omKeyInfoList,
-              dirList, omMetadataManager, quotaReleased, openKeyInfoMap);
-      omBucketInfo.incrUsedBytes(-quotaReleased);
-      omBucketInfo.incrUsedNamespace(-1L * omKeyInfoList.size());
+              dirList, omMetadataManager, openKeyInfoMap);
+      omBucketInfo.decrUsedBytes(quotaReleasedEmptyKeys.getKey(), true);
+      // For empty keyInfos the quota should be released and not added to 
namespace.
+      omBucketInfo.decrUsedNamespace(omKeyInfoList.size() + dirList.size() -
+              quotaReleasedEmptyKeys.getValue(), true);
+      omBucketInfo.decrUsedNamespace(quotaReleasedEmptyKeys.getValue(), false);
 
       final long volumeId = omMetadataManager.getVolumeId(volumeName);
       omClientResponse =
@@ -300,10 +302,12 @@ protected OMClientResponse 
getOmClientResponse(OzoneManager ozoneManager,
     return omClientResponse;
   }
 
-  protected long markKeysAsDeletedInCache(OzoneManager ozoneManager,
+  protected Pair<Long, Integer> markKeysAsDeletedInCache(OzoneManager 
ozoneManager,
       long trxnLogIndex, List<OmKeyInfo> omKeyInfoList, List<OmKeyInfo> 
dirList,
-      OMMetadataManager omMetadataManager, long quotaReleased, Map<String, 
OmKeyInfo> openKeyInfoMap)
+      OMMetadataManager omMetadataManager, Map<String, OmKeyInfo> 
openKeyInfoMap)
           throws IOException {
+    int emptyKeys = 0;
+    long quotaReleased = 0;
     for (OmKeyInfo omKeyInfo : omKeyInfoList) {
       String volumeName = omKeyInfo.getVolumeName();
       String bucketName = omKeyInfo.getBucketName();
@@ -314,6 +318,7 @@ protected long markKeysAsDeletedInCache(OzoneManager 
ozoneManager,
 
       omKeyInfo.setUpdateID(trxnLogIndex);
       quotaReleased += sumBlockLengths(omKeyInfo);
+      emptyKeys += OmKeyInfo.isKeyEmpty(omKeyInfo) ? 1 : 0;
 
       // If omKeyInfo has hsync metadata, delete its corresponding open key as 
well
       String hsyncClientId = 
omKeyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
@@ -331,7 +336,7 @@ protected long markKeysAsDeletedInCache(OzoneManager 
ozoneManager,
         }
       }
     }
-    return quotaReleased;
+    return Pair.of(quotaReleased, emptyKeys);
   }
 
   protected void addKeyToAppropriateList(List<OmKeyInfo> omKeyInfoList,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
index 1da12d1e561..0dbaa3c9d7b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OmKeysDeleteRequestWithFSO.java
@@ -26,6 +26,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -87,12 +88,11 @@ protected OzoneFileStatus getOzoneKeyStatus(
   }
 
   @Override
-  protected long markKeysAsDeletedInCache(
-          OzoneManager ozoneManager, long trxnLogIndex,
-          List<OmKeyInfo> omKeyInfoList,
-          List<OmKeyInfo> dirList, OMMetadataManager omMetadataManager,
-          long quotaReleased, Map<String, OmKeyInfo> openKeyInfoMap) throws 
IOException {
-
+  protected Pair<Long, Integer> markKeysAsDeletedInCache(
+      OzoneManager ozoneManager, long trxnLogIndex, List<OmKeyInfo> 
omKeyInfoList, List<OmKeyInfo> dirList,
+      OMMetadataManager omMetadataManager, Map<String, OmKeyInfo> 
openKeyInfoMap) throws IOException {
+    long quotaReleased = 0;
+    int emptyKeys = 0;
     // Mark all keys which can be deleted, in cache as deleted.
     for (OmKeyInfo omKeyInfo : omKeyInfoList) {
       final long volumeId = omMetadataManager.getVolumeId(
@@ -105,7 +105,7 @@ protected long markKeysAsDeletedInCache(
           new CacheKey<>(omMetadataManager
               .getOzonePathKey(volumeId, bucketId, parentId, fileName)),
           CacheValue.get(trxnLogIndex));
-
+      emptyKeys += OmKeyInfo.isKeyEmpty(omKeyInfo) ? 1 : 0;
       omKeyInfo.setUpdateID(trxnLogIndex);
       quotaReleased += sumBlockLengths(omKeyInfo);
 
@@ -141,7 +141,7 @@ protected long markKeysAsDeletedInCache(
       omKeyInfo.setUpdateID(trxnLogIndex);
       quotaReleased += sumBlockLengths(omKeyInfo);
     }
-    return quotaReleased;
+    return Pair.of(quotaReleased, emptyKeys);
   }
 
   @Nonnull @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
index 0a2bdc234f8..e91bb403945 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
@@ -17,8 +17,9 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import static org.apache.hadoop.ozone.om.helpers.OmKeyInfo.isKeyEmpty;
+
 import jakarta.annotation.Nonnull;
-import jakarta.annotation.Nullable;
 import java.io.IOException;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -26,7 +27,6 @@
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
@@ -59,7 +59,8 @@ protected void addDeletionToBatch(
       Table<String, ?> fromTable,
       String keyName,
       OmKeyInfo omKeyInfo,
-      long bucketId) throws IOException {
+      long bucketId,
+      boolean isCommittedKey) throws IOException {
 
     // For OmResponse with failure, this should do nothing. This method is
     // not called in failure scenario in OM code.
@@ -76,6 +77,7 @@ protected void addDeletionToBatch(
       // if RepeatedOMKeyInfo structure is null, we create a new instance,
       // if it is not null, then we simply add to the list and store this
       // instance in deletedTable.
+      omKeyInfo.setCommittedKeyDeletedFlag(isCommittedKey);
       RepeatedOmKeyInfo repeatedOmKeyInfo = 
OmUtils.prepareKeyForDelete(bucketId,
           omKeyInfo, omKeyInfo.getUpdateID()
       );
@@ -97,13 +99,15 @@ protected void addDeletionToBatch(
    * @param omKeyInfo
    * @throws IOException
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   protected void addDeletionToBatch(
       OMMetadataManager omMetadataManager,
       BatchOperation batchOperation,
       Table<String, ?> fromTable,
       String keyName, String deleteKeyName,
       OmKeyInfo omKeyInfo,
-      long bucketId) throws IOException {
+      long bucketId,
+      boolean isCommittedKey) throws IOException {
 
     // For OmResponse with failure, this should do nothing. This method is
     // not called in failure scenario in OM code.
@@ -120,6 +124,7 @@ protected void addDeletionToBatch(
       // if RepeatedOMKeyInfo structure is null, we create a new instance,
       // if it is not null, then we simply add to the list and store this
       // instance in deletedTable.
+      omKeyInfo.setCommittedKeyDeletedFlag(isCommittedKey);
       RepeatedOmKeyInfo repeatedOmKeyInfo = 
OmUtils.prepareKeyForDelete(bucketId,
           omKeyInfo, omKeyInfo.getUpdateID()
       );
@@ -131,24 +136,4 @@ protected void addDeletionToBatch(
   @Override
   public abstract void addToDBBatch(OMMetadataManager omMetadataManager,
         BatchOperation batchOperation) throws IOException;
-
-  /**
-   * Check if the key is empty or not. Key will be empty if it does not have
-   * blocks.
-   *
-   * @param keyInfo
-   * @return if empty true, else false.
-   */
-  private boolean isKeyEmpty(@Nullable OmKeyInfo keyInfo) {
-    if (keyInfo == null) {
-      return true;
-    }
-    for (OmKeyLocationInfoGroup keyLocationList : keyInfo
-            .getKeyLocationVersions()) {
-      if (keyLocationList.getLocationListCount() != 0) {
-        return false;
-      }
-    }
-    return true;
-  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
index 9ca4885c170..2ca04c4aae1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
@@ -144,6 +144,7 @@ public void processPaths(
 
       for (OzoneManagerProtocolProtos.KeyInfo key : deletedSubFilesList) {
         OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
+        keyInfo.setCommittedKeyDeletedFlag(true);
         String ozoneDbKey = keySpaceOmMetadataManager.getOzonePathKey(volumeId,
             bucketId, keyInfo.getParentObjectID(), keyInfo.getFileName());
         keySpaceOmMetadataManager.getKeyTable(getBucketLayout())
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
index 8840e1c0951..be646c4ee28 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
@@ -75,7 +75,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
     Table<String, OmKeyInfo> keyTable =
         omMetadataManager.getKeyTable(getBucketLayout());
     addDeletionToBatch(omMetadataManager, batchOperation, keyTable, ozoneKey,
-        omKeyInfo, omBucketInfo.getObjectID());
+        omKeyInfo, omBucketInfo.getObjectID(), true);
 
     // update bucket usedBytes.
     omMetadataManager.getBucketTable().putWithBatch(batchOperation,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
index cfcfe61c943..1b84bf51cb8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
@@ -101,7 +101,7 @@ public void addToDBBatch(OMMetadataManager 
omMetadataManager,
       deletedKey = omMetadataManager.getOzoneDeletePathKey(
           omKeyInfo.getObjectID(), deletedKey);
       addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
-          ozoneDbKey, deletedKey, omKeyInfo, getOmBucketInfo().getObjectID());
+          ozoneDbKey, deletedKey, omKeyInfo, getOmBucketInfo().getObjectID(), 
true);
     }
 
     // update bucket usedBytes.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
index 15004453df9..3cb1220b83c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
@@ -93,7 +93,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
           keyName);
 
       addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
-          deleteKey, omKeyInfo, getOmBucketInfo().getObjectID());
+          deleteKey, omKeyInfo, getOmBucketInfo().getObjectID(), true);
     }
 
     // update bucket usedBytes.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
index 178f0468e92..0b283509354 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
@@ -87,7 +87,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
       deletedKey = omMetadataManager.getOzoneDeletePathKey(
           omKeyInfo.getObjectID(), deletedKey);
       addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
-          ozoneDbKey, deletedKey, omKeyInfo, bucketId);
+          ozoneDbKey, deletedKey, omKeyInfo, bucketId, true);
     }
 
     // update bucket usedBytes.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMOpenKeysDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMOpenKeysDeleteResponse.java
index aaa27ab31d7..e46bd7db64f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMOpenKeysDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMOpenKeysDeleteResponse.java
@@ -74,7 +74,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
 
     for (Map.Entry<String, Pair<Long, OmKeyInfo>> keyInfoPair : 
keysToDelete.entrySet()) {
       addDeletionToBatch(omMetadataManager, batchOperation, openKeyTable,
-          keyInfoPair.getKey(), keyInfoPair.getValue().getValue(), 
keyInfoPair.getValue().getKey());
+          keyInfoPair.getKey(), keyInfoPair.getValue().getValue(), 
keyInfoPair.getValue().getKey(), false);
     }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
index 81cf9b36203..9a60c6ee4c3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
@@ -255,8 +255,8 @@ private static void populateBucket(
     String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
         bucketInfo.getBucketName());
     oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
-    bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
-    bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
+    bucketInfo.decrUsedBytes(bucketInfo.getUsedBytes(), false);
+    bucketInfo.decrUsedNamespace(bucketInfo.getUsedNamespace(), false);
     nameBucketInfoMap.put(bucketNameKey, bucketInfo);
     
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
             bucketInfo.getObjectID()), bucketInfo);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
index 0ec33ca1411..5b742c4b22e 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
@@ -441,6 +441,12 @@ public void testLinkedBucketResolution() throws Exception {
     assertEquals(
         bucketInfo.getUsedNamespace(),
         storedLinkBucket.getUsedNamespace());
+    assertEquals(
+        bucketInfo.getSnapshotUsedBytes(),
+        storedLinkBucket.getSnapshotUsedBytes());
+    assertEquals(
+        bucketInfo.getSnapshotUsedNamespace(),
+        storedLinkBucket.getSnapshotUsedNamespace());
     assertEquals(
         bucketInfo.getDefaultReplicationConfig(),
         storedLinkBucket.getDefaultReplicationConfig());
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketUtilizationMetrics.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketUtilizationMetrics.java
index a50176f46e0..f0e644b80a2 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketUtilizationMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketUtilizationMetrics.java
@@ -49,6 +49,8 @@ public class TestBucketUtilizationMetrics {
   private static final String BUCKET_NAME_2 = "bucket2";
   private static final long USED_BYTES_1 = 100;
   private static final long USED_BYTES_2 = 200;
+  private static final long SNAPSHOT_USED_BYTES_1 = 400;
+  private static final long SNAPSHOT_USED_BYTES_2 = 800;
   private static final long QUOTA_IN_BYTES_1 = 200;
   private static final long QUOTA_IN_BYTES_2 = QUOTA_RESET;
   private static final long QUOTA_IN_NAMESPACE_1 = 1;
@@ -59,9 +61,9 @@ void testBucketUtilizationMetrics() {
     OMMetadataManager omMetadataManager = mock(OMMetadataManager.class);
 
     Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry1 = 
createMockEntry(VOLUME_NAME_1, BUCKET_NAME_1,
-        USED_BYTES_1, QUOTA_IN_BYTES_1, QUOTA_IN_NAMESPACE_1);
+        USED_BYTES_1, SNAPSHOT_USED_BYTES_1, QUOTA_IN_BYTES_1, 
QUOTA_IN_NAMESPACE_1);
     Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry2 = 
createMockEntry(VOLUME_NAME_2, BUCKET_NAME_2,
-        USED_BYTES_2, QUOTA_IN_BYTES_2, QUOTA_IN_NAMESPACE_2);
+        USED_BYTES_2, SNAPSHOT_USED_BYTES_2, QUOTA_IN_BYTES_2, 
QUOTA_IN_NAMESPACE_2);
 
     Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> 
bucketIterator = mock(Iterator.class);
     when(bucketIterator.hasNext())
@@ -91,6 +93,7 @@ void testBucketUtilizationMetrics() {
     verify(mb, times(1)).tag(BucketMetricsInfo.VolumeName, VOLUME_NAME_1);
     verify(mb, times(1)).tag(BucketMetricsInfo.BucketName, BUCKET_NAME_1);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketUsedBytes, 
USED_BYTES_1);
+    verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketSnapshotUsedBytes, 
SNAPSHOT_USED_BYTES_1);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketQuotaBytes, 
QUOTA_IN_BYTES_1);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketQuotaNamespace, 
QUOTA_IN_NAMESPACE_1);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketAvailableBytes,
@@ -99,13 +102,14 @@ void testBucketUtilizationMetrics() {
     verify(mb, times(1)).tag(BucketMetricsInfo.VolumeName, VOLUME_NAME_2);
     verify(mb, times(1)).tag(BucketMetricsInfo.BucketName, BUCKET_NAME_2);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketUsedBytes, 
USED_BYTES_2);
+    verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketSnapshotUsedBytes, 
SNAPSHOT_USED_BYTES_2);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketQuotaBytes, 
QUOTA_IN_BYTES_2);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketQuotaNamespace, 
QUOTA_IN_NAMESPACE_2);
     verify(mb, times(1)).addGauge(BucketMetricsInfo.BucketAvailableBytes, 
QUOTA_RESET);
   }
 
   private static Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> 
createMockEntry(String volumeName,
-      String bucketName, long usedBytes, long quotaInBytes, long 
quotaInNamespace) {
+      String bucketName, long usedBytes, long snapshotUsedBytes, long 
quotaInBytes, long quotaInNamespace) {
     Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry = 
mock(Map.Entry.class);
     CacheValue<OmBucketInfo> cacheValue = mock(CacheValue.class);
     OmBucketInfo bucketInfo = mock(OmBucketInfo.class);
@@ -113,8 +117,10 @@ private static Map.Entry<CacheKey<String>, 
CacheValue<OmBucketInfo>> createMockE
     when(bucketInfo.getVolumeName()).thenReturn(volumeName);
     when(bucketInfo.getBucketName()).thenReturn(bucketName);
     when(bucketInfo.getUsedBytes()).thenReturn(usedBytes);
+    when(bucketInfo.getSnapshotUsedBytes()).thenReturn(snapshotUsedBytes);
     when(bucketInfo.getQuotaInBytes()).thenReturn(quotaInBytes);
     when(bucketInfo.getQuotaInNamespace()).thenReturn(quotaInNamespace);
+    when(bucketInfo.getTotalBucketSpace()).thenReturn(usedBytes + 
snapshotUsedBytes);
 
     when(cacheValue.getCacheValue()).thenReturn(bucketInfo);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
index e0e88580602..424c89828ea 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMOpenKeysDeleteRequest.java
@@ -326,6 +326,12 @@ private void deleteOpenKeysFromCache(List<Pair<Long, 
OmKeyInfo>> openKeys)
 
     assertEquals(Status.OK,
         omClientResponse.getOMResponse().getStatus());
+    for (OmKeyInfo openKey : 
openKeys.stream().map(Pair::getRight).collect(Collectors.toList())) {
+      assertEquals(0, omMetadataManager.getBucketTable().get(
+          omMetadataManager.getBucketKey(openKey.getVolumeName(), 
openKey.getBucketName())).getSnapshotUsedBytes());
+      assertEquals(0, omMetadataManager.getBucketTable().get(
+          omMetadataManager.getBucketKey(openKey.getVolumeName(), 
openKey.getBucketName())).getSnapshotUsedNamespace());
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
index 4cbfed0789a..33fcd137e66 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
@@ -115,7 +115,6 @@ public void testAddToDBBatchWithNonEmptyBlocks() throws 
Exception {
 
     // Do manual commit and see whether addToBatch is successful or not.
     omMetadataManager.getStore().commitBatchOperation(batchOperation);
-
     
assertFalse(omMetadataManager.getKeyTable(getBucketLayout()).isExist(ozoneKey));
     
     String deletedKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
@@ -126,6 +125,9 @@ public void testAddToDBBatchWithNonEmptyBlocks() throws 
Exception {
 
     // Key has blocks, it should not be in deletedKeyTable.
     assertThat(rangeKVs.size()).isGreaterThan(0);
+    for (Table.KeyValue<String, RepeatedOmKeyInfo> kv : rangeKVs) {
+      
assertTrue(kv.getValue().getOmKeyInfoList().get(0).isDeletedKeyCommitted());
+    }
   }
 
   @Test
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
index ed76236b80c..b5c6b686c1c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
@@ -114,6 +114,7 @@ public void testAddToDBBatchWithNonEmptyBlocks(
       String deleteKey = omMetadataManager.getOzoneDeletePathKey(
           entry.getValue().getValue().getObjectID(), entry.getKey());
       assertTrue(omMetadataManager.getDeletedTable().isExist(deleteKey));
+      
assertFalse(omMetadataManager.getDeletedTable().get(deleteKey).getOmKeyInfoList().get(0).isDeletedKeyCommitted());
     }
 
     for (Map.Entry<String, Pair<Long, OmKeyInfo>> entry: 
keysToKeep.entrySet()) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
index 41762d6c925..de950e8a5a1 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java
@@ -192,7 +192,7 @@ private void zeroOutBucketUsedBytes(String volumeName, 
String bucketName,
       throws IOException {
     String dbKey = omMetadataManager.getBucketKey(volumeName, bucketName);
     OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(dbKey);
-    bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
+    bucketInfo.decrUsedBytes(bucketInfo.getUsedBytes(), false);
     omMetadataManager.getBucketTable()
         .addCacheEntry(new CacheKey<>(dbKey),
             CacheValue.get(trxnLogIndex, bucketInfo));
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/BucketObjectDBInfo.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/BucketObjectDBInfo.java
index 381863daf95..6130b2b7fb4 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/BucketObjectDBInfo.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/BucketObjectDBInfo.java
@@ -40,6 +40,9 @@ public class BucketObjectDBInfo extends ObjectDBInfo {
   @JsonProperty("usedBytes")
   private long usedBytes;
 
+  @JsonProperty("snapshotUsedBytes")
+  private long snapshotUsedBytes;
+
   @JsonProperty("encryptionInfo")
   private BucketEncryptionKeyInfo bekInfo;
 
@@ -81,6 +84,7 @@ public BucketObjectDBInfo(OmBucketInfo omBucketInfo) {
     this.owner = omBucketInfo.getOwner();
     this.bekInfo = omBucketInfo.getEncryptionKeyInfo();
     this.usedBytes = omBucketInfo.getUsedBytes();
+    this.snapshotUsedBytes = omBucketInfo.getSnapshotUsedBytes();
   }
 
   public String getVolumeName() {
@@ -103,6 +107,10 @@ public long getUsedBytes() {
     return usedBytes;
   }
 
+  public long getSnapshotUsedBytes() {
+    return snapshotUsedBytes;
+  }
+
   public void setUsedBytes(long usedBytes) {
     this.usedBytes = usedBytes;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to