This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-13177
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-13177 by this push:
     new 3389a4d03a HDDS-13178. Include block size information in DeletedBlock 
and pass it to SCM (#8566)
3389a4d03a is described below

commit 3389a4d03a1345db26ccfe38dfb5447db92ef8dd
Author: Priyesh Karatha <[email protected]>
AuthorDate: Thu Jul 17 15:09:30 2025 +0530

    HDDS-13178. Include block size information in DeletedBlock and pass it to 
SCM (#8566)
---
 .../java/org/apache/hadoop/hdds/scm/ScmInfo.java   |  16 +-
 .../hadoop/hdds/upgrade/HDDSLayoutFeature.java     |   3 +-
 .../apache/hadoop/ozone/common/DeletedBlock.java   |  58 +++++
 ...lockLocationProtocolClientSideTranslatorPB.java |   5 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |   5 +-
 .../org/apache/hadoop/ozone/common/BlockGroup.java |  67 +++++-
 ...lockLocationProtocolClientSideTranslatorPB.java | 134 +++++++++++
 .../interface-client/src/main/proto/hdds.proto     |   1 +
 .../src/main/proto/ScmServerProtocol.proto         |   7 +
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |  32 ++-
 .../placement/metrics/SCMPerformanceMetrics.java   |   8 +
 ...lockLocationProtocolServerSideTranslatorPB.java |  14 ++
 ...inerLocationProtocolServerSideTranslatorPB.java |   1 +
 .../hdds/scm/server/SCMBlockProtocolServer.java    |  19 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |   3 +-
 .../hdds/scm/TestStorageContainerManager.java      |   1 +
 .../ozone/om/service/TestKeyDeletionService.java   | 255 +++++++++++++++++++++
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   8 -
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  25 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  28 ---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  20 +-
 .../ozone/om/ScmBlockLocationTestingClient.java    |  60 +++--
 .../ozone/om/service/TestKeyDeletingService.java   |   9 +-
 23 files changed, 677 insertions(+), 102 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
index 93e1f8c1e0..24bd67759d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
@@ -29,6 +29,7 @@ public final class ScmInfo {
   private final String clusterId;
   private final String scmId;
   private final List<String> peerRoles;
+  private final int metadataLayoutVersion;
 
   /**
    * Builder for ScmInfo.
@@ -37,6 +38,7 @@ public static class Builder {
     private String clusterId;
     private String scmId;
     private final List<String> peerRoles;
+    private int metadataLayoutVersion;
 
     public Builder() {
       peerRoles = new ArrayList<>();
@@ -72,15 +74,21 @@ public Builder setPeerRoles(List<String> roles) {
       return this;
     }
 
+    public Builder setMetaDataLayoutVersion(int version) {
+      this.metadataLayoutVersion = version;
+      return this;
+    }
+
     public ScmInfo build() {
-      return new ScmInfo(clusterId, scmId, peerRoles);
+      return new ScmInfo(clusterId, scmId, peerRoles, metadataLayoutVersion);
     }
   }
 
-  private ScmInfo(String clusterId, String scmId, List<String> peerRoles) {
+  private ScmInfo(String clusterId, String scmId, List<String> peerRoles, int 
metadataLayoutVersion) {
     this.clusterId = clusterId;
     this.scmId = scmId;
     this.peerRoles = Collections.unmodifiableList(peerRoles);
+    this.metadataLayoutVersion = metadataLayoutVersion;
   }
 
   /**
@@ -107,4 +115,8 @@ public List<String> getPeerRoles() {
     return peerRoles;
   }
 
+  public int getMetaDataLayoutVersion() {
+    return metadataLayoutVersion;
+  }
+
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
index 02e68515f3..3b12011405 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
@@ -41,7 +41,8 @@ public enum HDDSLayoutFeature implements LayoutFeature {
   HADOOP_PRC_PORTS_IN_DATANODEDETAILS(7, "Adding Hadoop RPC ports " +
                                      "to DatanodeDetails."),
   HBASE_SUPPORT(8, "Datanode RocksDB Schema Version 3 has an extra table " +
-          "for the last chunk of blocks to support HBase.)");
+          "for the last chunk of blocks to support HBase.)"),
+  DATA_DISTRIBUTION(9, "Enhanced block deletion function for data distribution 
feature.");
 
   //////////////////////////////  //////////////////////////////
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeletedBlock.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeletedBlock.java
new file mode 100644
index 0000000000..b611541578
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeletedBlock.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.common;
+
+import org.apache.hadoop.hdds.client.BlockID;
+
+/**
+ * DeletedBlock of Ozone (BlockID + usedBytes).
+ */
+public class DeletedBlock {
+
+  private BlockID blockID;
+  private long size;
+  private long replicatedSize;
+
+  public DeletedBlock(BlockID blockID, long size, long replicatedSize) {
+    this.blockID = blockID;
+    this.size = size;
+    this.replicatedSize = replicatedSize;
+  }
+
+  public BlockID getBlockID() {
+    return this.blockID;
+  }
+
+  public long getSize() {
+    return this.size;
+  }
+
+  public long getReplicatedSize() {
+    return this.replicatedSize;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(64);
+    sb.append(" localID: ").append(blockID.getContainerBlockID().getLocalID());
+    sb.append(" containerID: 
").append(blockID.getContainerBlockID().getContainerID());
+    sb.append(" size: ").append(size);
+    sb.append(" replicatedSize: ").append(replicatedSize);
+    return sb.toString();
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index a7324d481c..1fecbe58d3 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -62,6 +62,7 @@
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -278,7 +279,9 @@ public ScmInfo getScmInfo() throws IOException {
     resp = wrappedResponse.getGetScmInfoResponse();
     ScmInfo.Builder builder = new ScmInfo.Builder()
         .setClusterId(resp.getClusterId())
-        .setScmId(resp.getScmId());
+        .setScmId(resp.getScmId())
+        .setMetaDataLayoutVersion(resp.hasMetaDataLayoutVersion() ?
+            resp.getMetaDataLayoutVersion() : 
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
     return builder.build();
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 7072d6090b..725fc784f5 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -133,6 +133,7 @@
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import 
org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -775,7 +776,9 @@ public ScmInfo getScmInfo() throws IOException {
     ScmInfo.Builder builder = new ScmInfo.Builder()
         .setClusterId(resp.getClusterId())
         .setScmId(resp.getScmId())
-        .setPeerRoles(resp.getPeerRolesList());
+        .setPeerRoles(resp.getPeerRolesList())
+        .setMetaDataLayoutVersion(resp.hasMetaDataLayoutVersion() ?
+            resp.getMetaDataLayoutVersion() : 
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
     return builder.build();
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
index 5e8b0e1724..73ce40127e 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
 
 /**
@@ -29,14 +30,21 @@
 public final class BlockGroup {
 
   private String groupID;
+  @Deprecated
   private List<BlockID> blockIDs;
+  private List<DeletedBlock> deletedBlocks;
 
-  private BlockGroup(String groupID, List<BlockID> blockIDs) {
+  private BlockGroup(String groupID, List<BlockID> blockIDs, 
List<DeletedBlock> deletedBlocks) {
     this.groupID = groupID;
-    this.blockIDs = blockIDs;
+    this.blockIDs = blockIDs == null ? new ArrayList<>() : blockIDs;
+    this.deletedBlocks = deletedBlocks == null ? new ArrayList<>() : 
deletedBlocks;
   }
 
-  public List<BlockID> getBlockIDList() {
+  public List<DeletedBlock> getAllDeletedBlocks() {
+    return deletedBlocks;
+  }
+
+  public List<BlockID> getBlockIDs() {
     return blockIDs;
   }
 
@@ -45,6 +53,24 @@ public String getGroupID() {
   }
 
   public KeyBlocks getProto() {
+    return deletedBlocks.isEmpty() ? getProtoForBlockID() : 
getProtoForDeletedBlock();
+  }
+
+  public KeyBlocks getProtoForDeletedBlock() {
+    KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
+    for (DeletedBlock block : deletedBlocks) {
+      ScmBlockLocationProtocolProtos.DeletedBlock deletedBlock = 
ScmBlockLocationProtocolProtos.DeletedBlock
+          .newBuilder()
+          .setBlockId(block.getBlockID().getProtobuf())
+          .setSize(block.getSize())
+          .setReplicatedSize(block.getReplicatedSize())
+          .build();
+      kbb.addDeletedBlocks(deletedBlock);
+    }
+    return kbb.setKey(groupID).build();
+  }
+
+  public KeyBlocks getProtoForBlockID() {
     KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
     for (BlockID block : blockIDs) {
       kbb.addBlocks(block.getProtobuf());
@@ -58,6 +84,10 @@ public KeyBlocks getProto() {
    * @return a group of blocks.
    */
   public static BlockGroup getFromProto(KeyBlocks proto) {
+    return proto.getDeletedBlocksList().isEmpty() ? getFromBlockIDProto(proto) 
: getFromDeletedBlockProto(proto);
+  }
+
+  public static BlockGroup getFromBlockIDProto(KeyBlocks proto) {
     List<BlockID> blockIDs = new ArrayList<>();
     for (HddsProtos.BlockID block : proto.getBlocksList()) {
       blockIDs.add(new BlockID(block.getContainerBlockID().getContainerID(),
@@ -67,6 +97,20 @@ public static BlockGroup getFromProto(KeyBlocks proto) {
         .addAllBlockIDs(blockIDs).build();
   }
 
+  public static BlockGroup getFromDeletedBlockProto(KeyBlocks proto) {
+    List<DeletedBlock> blocks = new ArrayList<>();
+    for (ScmBlockLocationProtocolProtos.DeletedBlock block : 
proto.getDeletedBlocksList()) {
+      HddsProtos.ContainerBlockID containerBlockId = 
block.getBlockId().getContainerBlockID();
+
+      blocks.add(new DeletedBlock(new 
BlockID(containerBlockId.getContainerID(),
+          containerBlockId.getLocalID()),
+          block.getSize(),
+          block.getReplicatedSize()));
+    }
+    return BlockGroup.newBuilder().setKeyName(proto.getKey())
+        .addAllDeletedBlocks(blocks).build();
+  }
+
   public static Builder newBuilder() {
     return new Builder();
   }
@@ -75,7 +119,8 @@ public static Builder newBuilder() {
   public String toString() {
     return "BlockGroup[" +
         "groupID='" + groupID + '\'' +
-        ", blockIDs=" + blockIDs +
+        ", blockIDs=" + blockIDs + '\'' +
+        ", deletedBlocks=" + deletedBlocks +
         ']';
   }
 
@@ -85,20 +130,26 @@ public String toString() {
   public static class Builder {
 
     private String groupID;
-    private List<BlockID> blockIDs;
+    private List<BlockID> blocksIDs;
+    private List<DeletedBlock> blocks;
 
     public Builder setKeyName(String blockGroupID) {
       this.groupID = blockGroupID;
       return this;
     }
 
-    public Builder addAllBlockIDs(List<BlockID> keyBlocks) {
-      this.blockIDs = keyBlocks;
+    public Builder addAllBlockIDs(List<BlockID> blockIDs) {
+      this.blocksIDs = blockIDs;
+      return this;
+    }
+
+    public Builder addAllDeletedBlocks(List<DeletedBlock> keyBlocks) {
+      this.blocks = keyBlocks;
       return this;
     }
 
     public BlockGroup build() {
-      return new BlockGroup(groupID, blockIDs);
+      return new BlockGroup(groupID, blocksIDs, blocks);
     }
   }
 
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocolPB/TestScmBlockLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocolPB/TestScmBlockLocationProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000000..551a17627f
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocolPB/TestScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.protocolPB;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.INTERNAL_ERROR;
+import static 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.OK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for ScmBlockLocationProtocolClientSideTranslatorPB.
+ */
+public class TestScmBlockLocationProtocolClientSideTranslatorPB {
+
+  private SCMBlockLocationFailoverProxyProvider mockProxyProvider;
+  private ScmBlockLocationProtocolPB mockRpcProxy;
+  private ScmBlockLocationProtocolClientSideTranslatorPB translator;
+
+  @BeforeEach
+  public void setUp() {
+    mockProxyProvider = mock(SCMBlockLocationFailoverProxyProvider.class);
+    mockRpcProxy = mock(ScmBlockLocationProtocolPB.class);
+    
when(mockProxyProvider.getInterface()).thenReturn(ScmBlockLocationProtocolPB.class);
+    when(mockProxyProvider.getProxy()).thenReturn(new 
FailoverProxyProvider.ProxyInfo<>(mockRpcProxy, null));
+    translator = new 
ScmBlockLocationProtocolClientSideTranslatorPB(mockProxyProvider);
+  }
+
+  @Test
+  public void testGetScmInfo() throws IOException, ServiceException {
+    // Setup
+    HddsProtos.GetScmInfoResponseProto mockResponse = 
HddsProtos.GetScmInfoResponseProto.newBuilder()
+        .setClusterId("test-cluster-id")
+        .setScmId("test-scm-id")
+        .setMetaDataLayoutVersion(1)
+        .build();
+
+    SCMBlockLocationResponse responseWrapper = 
SCMBlockLocationResponse.newBuilder()
+        .setStatus(OK)
+        .setGetScmInfoResponse(mockResponse)
+        .setCmdType(ScmBlockLocationProtocolProtos.Type.GetScmInfo)
+        .build();
+
+    when(mockRpcProxy.send(any(), any(SCMBlockLocationRequest.class)))
+        .thenReturn(responseWrapper);
+
+    // Execute
+    ScmInfo result = translator.getScmInfo();
+
+    // Verify
+    assertEquals("test-cluster-id", result.getClusterId());
+    assertEquals("test-scm-id", result.getScmId());
+    assertEquals(1, result.getMetaDataLayoutVersion());
+    verify(mockRpcProxy).send(any(), any(SCMBlockLocationRequest.class));
+  }
+
+  @Test
+  public void testGetScmInfoWithoutMetaDataLayoutVersion() throws IOException, 
ServiceException {
+    // Setup
+    HddsProtos.GetScmInfoResponseProto mockResponse = 
HddsProtos.GetScmInfoResponseProto.newBuilder()
+        .setClusterId("test-cluster-id")
+        .setScmId("test-scm-id")
+        .build();
+
+    SCMBlockLocationResponse responseWrapper = 
SCMBlockLocationResponse.newBuilder()
+        .setStatus(OK)
+        .setGetScmInfoResponse(mockResponse)
+        .setCmdType(ScmBlockLocationProtocolProtos.Type.GetScmInfo)
+        .build();
+
+    when(mockRpcProxy.send(any(), any(SCMBlockLocationRequest.class)))
+        .thenReturn(responseWrapper);
+
+    // Execute
+    ScmInfo result = translator.getScmInfo();
+
+    // Verify
+    assertEquals("test-cluster-id", result.getClusterId());
+    assertEquals("test-scm-id", result.getScmId());
+    assertEquals(0, result.getMetaDataLayoutVersion());
+    verify(mockRpcProxy).send(any(), any(SCMBlockLocationRequest.class));
+  }
+
+  @Test
+  public void testGetScmInfoWithError() throws ServiceException {
+    // Setup
+    SCMBlockLocationResponse errorResponse = 
SCMBlockLocationResponse.newBuilder()
+        .setStatus(INTERNAL_ERROR)
+        .setMessage("Internal error occurred")
+        .setCmdType(ScmBlockLocationProtocolProtos.Type.GetScmInfo)
+        .build();
+
+    when(mockRpcProxy.send(any(), any(SCMBlockLocationRequest.class)))
+        .thenReturn(errorResponse);
+
+    // Execute & Verify
+    SCMException exception = assertThrows(SCMException.class, () -> {
+      translator.getScmInfo();
+    });
+    assertEquals("Internal error occurred", exception.getMessage());
+    verify(mockRpcProxy).send(any(), any(SCMBlockLocationRequest.class));
+  }
+}
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index ef76205d91..8bc9697482 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -280,6 +280,7 @@ message GetScmInfoResponseProto {
     required string clusterId = 1;
     required string scmId = 2;
     repeated string peerRoles = 3;
+    optional int32 metaDataLayoutVersion = 4;
 }
 
 message AddScmRequestProto {
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index fc24d2562f..ca4c50d4c7 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -181,6 +181,13 @@ message DeleteScmKeyBlocksRequestProto {
 message KeyBlocks {
   required string key = 1;
   repeated BlockID blocks = 2;
+  repeated DeletedBlock deletedBlocks = 3;
+}
+
+message DeletedBlock {
+  required BlockID blockId = 1;
+  optional uint64 size = 2 [default = 0];
+  optional uint64 replicatedSize = 3 [default = 0];
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 9b46968424..7a887828f9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -46,6 +46,7 @@
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -225,16 +226,29 @@ public void deleteBlocks(List<BlockGroup> 
keyBlocksInfoList)
     for (BlockGroup bg : keyBlocksInfoList) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Deleting blocks {}",
-            StringUtils.join(",", bg.getBlockIDList()));
+            StringUtils.join(",", (!bg.getBlockIDs().isEmpty()) ? 
bg.getBlockIDs() : bg.getAllDeletedBlocks()));
       }
-      for (BlockID block : bg.getBlockIDList()) {
-        long containerID = block.getContainerID();
-        if (containerBlocks.containsKey(containerID)) {
-          containerBlocks.get(containerID).add(block.getLocalID());
-        } else {
-          List<Long> item = new ArrayList<>();
-          item.add(block.getLocalID());
-          containerBlocks.put(containerID, item);
+      if (!bg.getBlockIDs().isEmpty()) {
+        for (BlockID block : bg.getBlockIDs()) {
+          long containerID = block.getContainerID();
+          if (containerBlocks.containsKey(containerID)) {
+            containerBlocks.get(containerID).add(block.getLocalID());
+          } else {
+            List<Long> item = new ArrayList<>();
+            item.add(block.getLocalID());
+            containerBlocks.put(containerID, item);
+          }
+        }
+      } else {
+        for (DeletedBlock block : bg.getAllDeletedBlocks()) {
+          long containerID = block.getBlockID().getContainerID();
+          if (containerBlocks.containsKey(containerID)) {
+            
containerBlocks.get(containerID).add(block.getBlockID().getLocalID());
+          } else {
+            List<Long> item = new ArrayList<>();
+            item.add(block.getBlockID().getLocalID());
+            containerBlocks.put(containerID, item);
+          }
         }
       }
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
index a01effa3a2..5bb6e01b28 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
@@ -117,5 +117,13 @@ public void updateDeleteKeySuccessBlocks(long keys) {
   public void updateDeleteKeyFailedBlocks(long keys) {
     deleteKeyBlocksFailure.incr(keys);
   }
+
+  public long getDeleteKeySuccessBlocks() {
+    return deleteKeyBlocksSuccess.value();
+  }
+
+  public long getDeleteKeyFailedBlocks() {
+    return deleteKeyBlocksFailure.value();
+  }
 }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 94fd0f5dd3..7ac65e4f30 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -145,6 +145,19 @@ private SCMBlockLocationResponse processMessage(
             request.getAllocateScmBlockRequest(), request.getVersion()));
         break;
       case DeleteScmKeyBlocks:
+        if (scm.getLayoutVersionManager().needsFinalization() &&
+            !scm.getLayoutVersionManager()
+                .isAllowed(HDDSLayoutFeature.DATA_DISTRIBUTION)
+        ) {
+          boolean isRequestHasNewData = 
request.getDeleteScmKeyBlocksRequest().getKeyBlocksList().stream()
+              .anyMatch(keyBlocks -> keyBlocks.getDeletedBlocksCount() > 0);
+
+          if (isRequestHasNewData) {
+            throw new SCMException("Cluster is not finalized yet, it is"
+                + " not enabled to to handle data distribution feature",
+                SCMException.ResultCodes.INTERNAL_ERROR);
+          }
+        }
         response.setDeleteScmKeyBlocksResponse(
             deleteScmKeyBlocks(request.getDeleteScmKeyBlocksRequest()));
         break;
@@ -251,6 +264,7 @@ public HddsProtos.GetScmInfoResponseProto getScmInfo(
     return HddsProtos.GetScmInfoResponseProto.newBuilder()
         .setClusterId(scmInfo.getClusterId())
         .setScmId(scmInfo.getScmId())
+        .setMetaDataLayoutVersion(scmInfo.getMetaDataLayoutVersion())
         .build();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 8ce5ae44ab..bbba37fdc2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1012,6 +1012,7 @@ public HddsProtos.GetScmInfoResponseProto getScmInfo(
         .setClusterId(scmInfo.getClusterId())
         .setScmId(scmInfo.getScmId())
         .addAllPeerRoles(scmInfo.getPeerRoles())
+        .setMetaDataLayoutVersion(scmInfo.getMetaDataLayoutVersion())
         .build();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 60c6384ba8..2330332e43 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -31,6 +31,7 @@
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.ProtocolMessageEnum;
@@ -80,6 +81,7 @@
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.DeletedBlock;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -268,13 +270,13 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
       List<BlockGroup> keyBlocksInfoList) throws IOException {
     long totalBlocks = 0;
     for (BlockGroup bg : keyBlocksInfoList) {
-      totalBlocks += bg.getBlockIDList().size();
+      totalBlocks += (!bg.getBlockIDs().isEmpty()) ? bg.getBlockIDs().size() : 
bg.getAllDeletedBlocks().size();
     }
+    List<DeleteBlockGroupResult> results = new ArrayList<>();
     if (LOG.isDebugEnabled()) {
       LOG.debug("SCM is informed by OM to delete {} keys. Total blocks to 
deleted {}.",
           keyBlocksInfoList.size(), totalBlocks);
     }
-    List<DeleteBlockGroupResult> results = new ArrayList<>();
     Map<String, String> auditMap = Maps.newHashMap();
     ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
     Exception e = null;
@@ -312,7 +314,10 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
     }
     for (BlockGroup bg : keyBlocksInfoList) {
       List<DeleteBlockResult> blockResult = new ArrayList<>();
-      for (BlockID b : bg.getBlockIDList()) {
+      for (DeletedBlock b : bg.getAllDeletedBlocks()) {
+        blockResult.add(new DeleteBlockResult(b.getBlockID(), resultCode));
+      }
+      for (BlockID b : bg.getBlockIDs()) {
         blockResult.add(new DeleteBlockResult(b, resultCode));
       }
       results.add(new DeleteBlockGroupResult(bg.getGroupID(), blockResult));
@@ -335,7 +340,8 @@ public ScmInfo getScmInfo() throws IOException {
       ScmInfo.Builder builder =
           new ScmInfo.Builder()
               .setClusterId(scm.getScmStorageConfig().getClusterID())
-              .setScmId(scm.getScmStorageConfig().getScmId());
+              .setScmId(scm.getScmStorageConfig().getScmId())
+              
.setMetaDataLayoutVersion(scm.getLayoutVersionManager().getMetadataLayoutVersion());
       return builder.build();
     } catch (Exception ex) {
       auditSuccess = false;
@@ -478,4 +484,9 @@ public AuditMessage buildAuditMessageForFailure(AuditAction 
op, Map<String,
   public void close() throws IOException {
     stop();
   }
+
+  @VisibleForTesting
+  public SCMPerformanceMetrics getMetrics() {
+    return perfMetrics;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 03774c4869..4b577ec4ed 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -907,7 +907,8 @@ public ScmInfo getScmInfo() {
           new ScmInfo.Builder()
               .setClusterId(scm.getScmStorageConfig().getClusterID())
               .setScmId(scm.getScmStorageConfig().getScmId())
-              
.setPeerRoles(scm.getScmHAManager().getRatisServer().getRatisRoles());
+              
.setPeerRoles(scm.getScmHAManager().getRatisServer().getRatisRoles())
+              
.setMetaDataLayoutVersion(scm.getLayoutVersionManager().getMetadataLayoutVersion());
       ScmInfo info = builder.build();
       AUDIT.logReadSuccess(buildAuditMessageForSuccess(
             SCMAction.GET_SCM_INFO, null));
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
index 1c220ef8d3..c0750352a1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
@@ -617,6 +617,7 @@ public void testScmInfo(@TempDir Path tempDir) throws 
Exception {
       ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
       assertEquals(clusterId, scmInfo.getClusterId());
       assertEquals(scmId, scmInfo.getScmId());
+      assertTrue(scmInfo.getMetaDataLayoutVersion() >= 0);
 
       String expectedVersion = HddsVersionInfo.HDDS_VERSION_INFO.getVersion();
       String actualVersion = scm.getSoftwareVersion();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
new file mode 100644
index 0000000000..062378cbc3
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATA_DISTRIBUTION;
+import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.HBASE_SUPPORT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import 
org.apache.hadoop.hdds.scm.container.placement.metrics.SCMPerformanceMetrics;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext;
+import org.apache.hadoop.hdds.upgrade.TestHddsUpgradeUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.UniformDatanodesFactory;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
+import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * DeletionService test to Pass Usage from OM to SCM.
+ */
+public class TestKeyDeletionService {
+  private static final String CLIENT_ID = UUID.randomUUID().toString();
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+  private static final int KEY_SIZE = 5 * 1024; // 5 KB
+  private static MiniOzoneCluster cluster;
+  private static StorageContainerLocationProtocol scmClient;
+  private static OzoneBucket bucket = null;
+
+  @BeforeAll
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500, 
TimeUnit.MILLISECONDS);
+    conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY, 
HBASE_SUPPORT.layoutVersion());
+
+    InjectedUpgradeFinalizationExecutor<SCMUpgradeFinalizationContext> 
scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor<>();
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setUpgradeFinalizationExecutor(scmFinalizationExecutor);
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(9)
+        .setSCMConfigurator(configurator)
+        .setDatanodeFactory(UniformDatanodesFactory.newBuilder()
+            .setLayoutVersion(HBASE_SUPPORT.layoutVersion()).build())
+        .build();
+    cluster.waitForClusterToBeReady();
+    scmClient = cluster.getStorageContainerLocationClient();
+    assertEquals(HBASE_SUPPORT.ordinal(), 
scmClient.getScmInfo().getMetaDataLayoutVersion());
+    OzoneClient ozoneClient = cluster.newClient();
+    // create a volume and a bucket to be used by OzoneFileSystem
+    ozoneClient.getObjectStore().createVolume(VOLUME_NAME);
+    
ozoneClient.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME);
+    bucket = 
ozoneClient.getObjectStore().getVolume(VOLUME_NAME).getBucket(BUCKET_NAME);
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public static Stream<Arguments> replicaType() {
+    return Stream.of(
+        arguments("RATIS", "ONE"),
+        arguments("RATIS", "THREE")
+    );
+  }
+
+  public static Stream<Arguments> ecType() {
+    return Stream.of(
+        arguments(ECReplicationConfig.EcCodec.RS, 3, 2, 2 * MB),
+        arguments(ECReplicationConfig.EcCodec.RS, 6, 3, 2 * MB)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("replicaType")
+  public void testDeletedKeyBytesPropagatedToSCM(String type, String factor) 
throws Exception {
+    String keyName = UUID.randomUUID().toString();
+    ReplicationConfig replicationConfig = RatisReplicationConfig
+        .getInstance(ReplicationFactor.valueOf(factor).toProto());
+    SCMPerformanceMetrics metrics = 
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+    long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks();
+    long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks();
+    // Step 1: write a key
+    createKey(keyName, replicationConfig);
+    // Step 2: Spy on BlockManager and inject it into SCM
+    BlockManager spyManager = injectSpyBlockManager(cluster);
+    // Step 3: Delete the key (which triggers deleteBlocks call)
+    bucket.deleteKey(keyName);
+    // Step 4: Verify deleteBlocks call and capture argument
+    verifyAndAssertQuota(spyManager, replicationConfig);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - 
initialSuccessBlocks == 1, 50, 1000);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - 
initialFailedBlocks == 0, 50, 1000);
+
+    // Launch finalization from the client. In the current implementation,
+    // this call will block until finalization completes.
+    Future<?> finalizationFuture = Executors.newSingleThreadExecutor().submit(
+        () -> {
+          try {
+            scmClient.finalizeScmUpgrade(CLIENT_ID);
+          } catch (IOException ex) {
+            fail("finalization client failed", ex);
+          }
+        });
+    finalizationFuture.get();
+    TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+    assertEquals(DATA_DISTRIBUTION.ordinal(), 
scmClient.getScmInfo().getMetaDataLayoutVersion());
+    // create and delete another key to verify the process after feature is 
finalized
+    keyName = UUID.randomUUID().toString();
+    createKey(keyName, replicationConfig);
+    bucket.deleteKey(keyName);
+    verifyAndAssertQuota(spyManager, replicationConfig);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - 
initialSuccessBlocks == 2, 50, 1000);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - 
initialFailedBlocks == 0, 50, 1000);
+  }
+
+  @ParameterizedTest
+  @MethodSource("ecType")
+  void testGetDefaultShouldCreateECReplicationConfFromConfValues(
+      ECReplicationConfig.EcCodec codec, int data, int parity, long chunkSize) 
throws Exception {
+    String keyName = UUID.randomUUID().toString();
+    ReplicationConfig replicationConfig = new ECReplicationConfig(data, 
parity, codec, (int) chunkSize);
+    SCMPerformanceMetrics metrics = 
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+    long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks();
+    long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks();
+    // Step 1: write a key
+    createKey(keyName, replicationConfig);
+    // Step 2: Spy on BlockManager and inject it into SCM
+    BlockManager spyManager = injectSpyBlockManager(cluster);
+    // Step 3: Delete the key (which triggers deleteBlocks call)
+    bucket.deleteKey(keyName);
+    // Step 4: Verify deleteBlocks call and capture argument
+    verifyAndAssertQuota(spyManager, replicationConfig);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - 
initialSuccessBlocks == 1, 50, 1000);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - 
initialFailedBlocks == 0, 50, 1000);
+
+    // Launch finalization from the client. In the current implementation,
+    // this call will block until finalization completes.
+    Future<?> finalizationFuture = Executors.newSingleThreadExecutor().submit(
+        () -> {
+          try {
+            scmClient.finalizeScmUpgrade(CLIENT_ID);
+          } catch (IOException ex) {
+            fail("finalization client failed", ex);
+          }
+        });
+    finalizationFuture.get();
+    TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+    assertEquals(DATA_DISTRIBUTION.ordinal(), 
scmClient.getScmInfo().getMetaDataLayoutVersion());
+    // create and delete another key to verify the process after feature is 
finalized
+    keyName = UUID.randomUUID().toString();
+    createKey(keyName, replicationConfig);
+    bucket.deleteKey(keyName);
+    verifyAndAssertQuota(spyManager, replicationConfig);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - 
initialSuccessBlocks == 2, 50, 1000);
+    GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - 
initialFailedBlocks == 0, 50, 1000);
+  }
+
+  private void createKey(String keyName, ReplicationConfig replicationConfig) 
throws IOException {
+    byte[] data = new byte[KEY_SIZE];
+    try (OzoneOutputStream out = bucket.createKey(keyName, KEY_SIZE,
+        replicationConfig, new HashMap<>())) {
+      out.write(data);
+    }
+  }
+
+  private BlockManager injectSpyBlockManager(MiniOzoneCluster 
miniOzoneCluster) throws Exception {
+    StorageContainerManager scm = 
miniOzoneCluster.getStorageContainerManager();
+    BlockManager realManager = scm.getScmBlockManager();
+    BlockManager spyManager = spy(realManager);
+
+    Field field = scm.getClass().getDeclaredField("scmBlockManager");
+    field.setAccessible(true);
+    field.set(scm, spyManager);
+    return spyManager;
+  }
+
+  private void verifyAndAssertQuota(BlockManager spyManager, ReplicationConfig 
replicationConfig) throws IOException {
+    ArgumentCaptor<List<BlockGroup>> captor = 
ArgumentCaptor.forClass(List.class);
+    verify(spyManager, 
timeout(50000).atLeastOnce()).deleteBlocks(captor.capture());
+
+    if (captor.getAllValues().stream().anyMatch(blockGroups -> 
blockGroups.stream().anyMatch(
+        group -> group.getAllDeletedBlocks().isEmpty()))) {
+      assertEquals(1, 
captor.getAllValues().get(0).get(0).getBlockIDs().size());
+      assertEquals(0, 
captor.getAllValues().get(0).get(0).getAllDeletedBlocks().size());
+      return;
+    }
+
+    long totalUsedBytes = captor.getAllValues().get(0).stream()
+        .flatMap(group -> group.getAllDeletedBlocks().stream())
+        .mapToLong(DeletedBlock::getReplicatedSize).sum();
+
+    long totalUnreplicatedBytes = captor.getAllValues().get(0).stream()
+        .flatMap(group -> group.getAllDeletedBlocks().stream())
+        .mapToLong(DeletedBlock::getSize).sum();
+
+    assertEquals(0, captor.getAllValues().get(0).get(0).getBlockIDs().size());
+    assertEquals(1, 
captor.getAllValues().get(0).get(0).getAllDeletedBlocks().size());
+    assertEquals(QuotaUtil.getReplicatedSize(KEY_SIZE, replicationConfig), 
totalUsedBytes);
+    assertEquals(KEY_SIZE, totalUnreplicatedBytes);
+  }
+}
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index ec9e34cec7..50af367923 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -34,7 +34,6 @@
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
 import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
@@ -659,13 +658,6 @@ String getMultipartKey(long volumeId, long bucketId,
    */
   long getBucketId(String volume, String bucket) throws IOException;
 
-  /**
-   * Returns {@code List<BlockGroup>} for a key in the deletedTable.
-   * @param deletedKey - key to be purged from the deletedTable
-   * @return {@link BlockGroup}
-   */
-  List<BlockGroup> getBlocksForKeyDelete(String deletedKey) throws IOException;
-
   /**
    * Given a volume/bucket, check whether it contains incomplete MPUs.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index bd186df168..a069817daa 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -124,6 +124,7 @@
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.StringCodec;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -137,6 +138,7 @@
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -156,6 +158,7 @@
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -736,6 +739,8 @@ public PendingKeysDeletion getPendingDeletionKeys(
       }
       int currentCount = 0;
       boolean maxReqSizeExceeded = false;
+      boolean isDataDistributionEnabled = 
ozoneManager.getScmInfo().getMetaDataLayoutVersion() >=
+          HDDSLayoutFeature.DATA_DISTRIBUTION.layoutVersion();
       while (delKeyIter.hasNext() && currentCount < count) {
         RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
         KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
@@ -747,11 +752,21 @@ public PendingKeysDeletion getPendingDeletionKeys(
 
             // Skip the key if the filter doesn't allow the file to be deleted.
             if (filter == null || filter.apply(Table.newKeyValue(kv.getKey(), 
info))) {
-              List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
-                  .flatMap(versionLocations -> 
versionLocations.getLocationList().stream()
-                      .map(b -> new BlockID(b.getContainerID(), 
b.getLocalID()))).collect(Collectors.toList());
-              BlockGroup keyBlocks = 
BlockGroup.newBuilder().setKeyName(kv.getKey())
-                  .addAllBlockIDs(blockIDS).build();
+              BlockGroup.Builder keyBlocksBuilder = 
BlockGroup.newBuilder().setKeyName(kv.getKey());
+              if (isDataDistributionEnabled) {
+                List<DeletedBlock> deletedBlocks = 
info.getKeyLocationVersions().stream()
+                    .flatMap(versionLocations -> 
versionLocations.getLocationList().stream()
+                        .map(b -> new DeletedBlock(new 
BlockID(b.getContainerID(), b.getLocalID()),
+                            b.getLength(), 
QuotaUtil.getReplicatedSize(b.getLength(), info.getReplicationConfig()))))
+                    .collect(Collectors.toList());
+                keyBlocksBuilder.addAllDeletedBlocks(deletedBlocks);
+              } else {
+                List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
+                    .flatMap(versionLocations -> 
versionLocations.getLocationList().stream()
+                        .map(b -> new BlockID(b.getContainerID(), 
b.getLocalID()))).collect(Collectors.toList());
+                keyBlocksBuilder.addAllBlockIDs(blockIDS);
+              }
+              BlockGroup keyBlocks = keyBlocksBuilder.build();
               int keyBlockSerializedSize = 
keyBlocks.getProto().getSerializedSize();
               serializedSize += keyBlockSerializedSize;
               if (serializedSize > ratisByteLimit) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 04e0998219..955603b044 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -61,7 +61,6 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.TableCacheMetrics;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -81,7 +80,6 @@
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -1770,32 +1768,6 @@ public long getBucketId(String volume, String bucket) 
throws IOException {
     return omBucketInfo.getObjectID();
   }
 
-  @Override
-  public List<BlockGroup> getBlocksForKeyDelete(String deletedKey)
-      throws IOException {
-    RepeatedOmKeyInfo omKeyInfo = getDeletedTable().get(deletedKey);
-    if (omKeyInfo == null) {
-      return null;
-    }
-
-    List<BlockGroup> result = new ArrayList<>();
-    // Add all blocks from all versions of the key to the deletion list
-    for (OmKeyInfo info : omKeyInfo.cloneOmKeyInfoList()) {
-      for (OmKeyLocationInfoGroup keyLocations :
-          info.getKeyLocationVersions()) {
-        List<BlockID> item = keyLocations.getLocationList().stream()
-            .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
-            .collect(Collectors.toList());
-        BlockGroup keyBlocks = BlockGroup.newBuilder()
-            .setKeyName(deletedKey)
-            .addAllBlockIDs(item)
-            .build();
-        result.add(keyBlocks);
-      }
-    }
-    return result;
-  }
-
   @Override
   public boolean containsIncompleteMPUs(String volume, String bucket)
       throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a5df1337dc..0a2c70984a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -24,7 +24,6 @@
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.hdds.utils.HAUtils.getScmInfo;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
 import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
 import static org.apache.hadoop.ozone.OmUtils.MAX_TRXN_ID;
@@ -500,6 +499,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private UncheckedAutoCloseableSupplier<IOmMetadataReader> rcOmMetadataReader;
   private OmSnapshotManager omSnapshotManager;
   private volatile DirectoryDeletingService dirDeletingService;
+  private ScmInfo scmInfo;
 
   @SuppressWarnings("methodlength")
   private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
@@ -617,9 +617,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption 
startupOption)
 
     // For testing purpose only, not hit scm from om as Hadoop UGI can't login
     // two principals in the same JVM.
-    ScmInfo scmInfo;
     if (!testSecureOmFlag) {
-      scmInfo = getScmInfo(configuration);
+      scmInfo = HAUtils.getScmInfo(configuration);
       if (!scmInfo.getClusterId().equals(omStorage.getClusterID())) {
         logVersionMismatch(conf, scmInfo);
         throw new OMException("SCM version info mismatch.",
@@ -893,7 +892,7 @@ public static OzoneManager createOm(OzoneConfiguration conf,
     return new OzoneManager(conf, startupOption);
   }
 
-  private void logVersionMismatch(OzoneConfiguration conf, ScmInfo scmInfo) {
+  private void logVersionMismatch(OzoneConfiguration conf, ScmInfo info) {
     List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
     StringBuilder scmBlockAddressBuilder = new StringBuilder();
     for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
@@ -905,9 +904,9 @@ private void logVersionMismatch(OzoneConfiguration conf, 
ScmInfo scmInfo) {
       scmBlockAddress = scmBlockAddress.substring(0,
           scmBlockAddress.lastIndexOf(","));
     }
-    if (!scmInfo.getClusterId().equals(omStorage.getClusterID())) {
+    if (!info.getClusterId().equals(omStorage.getClusterID())) {
       LOG.error("clusterId from {} is {}, but is {} in {}",
-          scmBlockAddress, scmInfo.getClusterId(),
+          scmBlockAddress, info.getClusterId(),
           omStorage.getClusterID(), omStorage.getVersionFile());
     }
   }
@@ -1040,6 +1039,13 @@ public ScmClient getScmClient() {
     return scmClient;
   }
 
+  /**
+   * Return scmInfo.
+   */
+  public ScmInfo getScmInfo() {
+    return this.scmInfo;
+  }
+
   /**
    * Return SecretManager for OM.
    */
@@ -1533,7 +1539,7 @@ public static boolean omInit(OzoneConfiguration conf) 
throws IOException,
     StorageState state = omStorage.getState();
     String scmId;
     try {
-      ScmInfo scmInfo = getScmInfo(conf);
+      ScmInfo scmInfo = HAUtils.getScmInfo(conf);
       scmId = scmInfo.getScmId();
       if (scmId == null || scmId.isEmpty()) {
         throw new IOException("Invalid SCM ID");
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 5edd683a43..40075ec897 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.DeletedBlock;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,38 +146,49 @@ private Pipeline createPipeline(DatanodeDetails datanode) 
{
   }
 
   @Override
-  public List<DeleteBlockGroupResult> deleteKeyBlocks(
-      List<BlockGroup> keyBlocksInfoList) throws IOException {
+  public List<DeleteBlockGroupResult> deleteKeyBlocks(List<BlockGroup> 
keyBlocksInfoList)
+      throws IOException {
     List<DeleteBlockGroupResult> results = new ArrayList<>();
-    List<DeleteBlockResult> blockResultList = new ArrayList<>();
-    Result result;
     for (BlockGroup keyBlocks : keyBlocksInfoList) {
-      for (BlockID blockKey : keyBlocks.getBlockIDList()) {
-        currentCall++;
-        switch (this.failCallsFrequency) {
-        case 0:
-          result = success;
-          numBlocksDeleted++;
-          break;
-        case 1:
-          result = unknownFailure;
-          break;
-        default:
-          if (currentCall % this.failCallsFrequency == 0) {
-            result = unknownFailure;
-          } else {
-            result = success;
-            numBlocksDeleted++;
-          }
+      List<DeleteBlockResult> blockResultList = new ArrayList<>();
+      // Process BlockIDs directly if present
+      if (keyBlocks.getBlockIDs() != null && 
!keyBlocks.getBlockIDs().isEmpty()) {
+        for (BlockID blockID : keyBlocks.getBlockIDs()) {
+          blockResultList.add(processBlock(blockID));
+        }
+      } else {
+        // Otherwise, use DeletedBlock's BlockID
+        for (DeletedBlock deletedBlock : keyBlocks.getAllDeletedBlocks()) {
+          blockResultList.add(processBlock(deletedBlock.getBlockID()));
         }
-        blockResultList.add(new DeleteBlockResult(blockKey, result));
       }
-      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
-          blockResultList));
+      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), 
blockResultList));
     }
     return results;
   }
 
+  private DeleteBlockResult processBlock(BlockID blockID) {
+    currentCall++;
+    Result result;
+    switch (failCallsFrequency) {
+    case 0:
+      result = success;
+      numBlocksDeleted++;
+      break;
+    case 1:
+      result = unknownFailure;
+      break;
+    default:
+      if (currentCall % failCallsFrequency == 0) {
+        result = unknownFailure;
+      } else {
+        result = success;
+        numBlocksDeleted++;
+      }
+    }
+    return new DeleteBlockResult(blockID, result);
+  }
+
   @Override
   public ScmInfo getScmInfo() throws IOException {
     ScmInfo.Builder builder =
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index ba6f644c49..3d1b63e2f7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -72,6 +72,7 @@
 import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.KeyManagerImpl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -799,8 +800,10 @@ public void testFailingModifiedKeyPurge() throws 
IOException {
               return 
OzoneManagerProtocolProtos.OMResponse.newBuilder().setCmdType(purgeRequest.get().getCmdType())
                   
.setStatus(OzoneManagerProtocolProtos.Status.TIMEOUT).build();
             });
-        List<BlockGroup> blockGroups = 
Collections.singletonList(BlockGroup.newBuilder().setKeyName("key1")
-            .addAllBlockIDs(Collections.singletonList(new BlockID(1, 
1))).build());
+        List<BlockGroup> blockGroups = Collections.singletonList(BlockGroup
+            .newBuilder().setKeyName("key1")
+            .addAllDeletedBlocks(Collections.singletonList(new 
DeletedBlock(new BlockID(1, 1), 3, 1)))
+            .build());
         List<String> renameEntriesToBeDeleted = 
Collections.singletonList("key2");
         OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
             .setBucketName("buck")
@@ -1074,7 +1077,7 @@ private long countBlocksPendingDeletion() {
       return keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE, ratisLimit)
           .getKeyBlocksList()
           .stream()
-          .map(BlockGroup::getBlockIDList)
+          .map(b -> b.getBlockIDs().isEmpty() ? b.getAllDeletedBlocks() : 
b.getBlockIDs())
           .mapToLong(Collection::size)
           .sum();
     } catch (IOException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to