This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-13177
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-13177 by this push:
new 3389a4d03a HDDS-13178. Include block size information in DeletedBlock
and pass it to SCM (#8566)
3389a4d03a is described below
commit 3389a4d03a1345db26ccfe38dfb5447db92ef8dd
Author: Priyesh Karatha <[email protected]>
AuthorDate: Thu Jul 17 15:09:30 2025 +0530
HDDS-13178. Include block size information in DeletedBlock and pass it to
SCM (#8566)
---
.../java/org/apache/hadoop/hdds/scm/ScmInfo.java | 16 +-
.../hadoop/hdds/upgrade/HDDSLayoutFeature.java | 3 +-
.../apache/hadoop/ozone/common/DeletedBlock.java | 58 +++++
...lockLocationProtocolClientSideTranslatorPB.java | 5 +-
...inerLocationProtocolClientSideTranslatorPB.java | 5 +-
.../org/apache/hadoop/ozone/common/BlockGroup.java | 67 +++++-
...lockLocationProtocolClientSideTranslatorPB.java | 134 +++++++++++
.../interface-client/src/main/proto/hdds.proto | 1 +
.../src/main/proto/ScmServerProtocol.proto | 7 +
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 32 ++-
.../placement/metrics/SCMPerformanceMetrics.java | 8 +
...lockLocationProtocolServerSideTranslatorPB.java | 14 ++
...inerLocationProtocolServerSideTranslatorPB.java | 1 +
.../hdds/scm/server/SCMBlockProtocolServer.java | 19 +-
.../hdds/scm/server/SCMClientProtocolServer.java | 3 +-
.../hdds/scm/TestStorageContainerManager.java | 1 +
.../ozone/om/service/TestKeyDeletionService.java | 255 +++++++++++++++++++++
.../apache/hadoop/ozone/om/OMMetadataManager.java | 8 -
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 25 +-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 28 ---
.../org/apache/hadoop/ozone/om/OzoneManager.java | 20 +-
.../ozone/om/ScmBlockLocationTestingClient.java | 60 +++--
.../ozone/om/service/TestKeyDeletingService.java | 9 +-
23 files changed, 677 insertions(+), 102 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
index 93e1f8c1e0..24bd67759d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmInfo.java
@@ -29,6 +29,7 @@ public final class ScmInfo {
private final String clusterId;
private final String scmId;
private final List<String> peerRoles;
+ private final int metadataLayoutVersion;
/**
* Builder for ScmInfo.
@@ -37,6 +38,7 @@ public static class Builder {
private String clusterId;
private String scmId;
private final List<String> peerRoles;
+ private int metadataLayoutVersion;
public Builder() {
peerRoles = new ArrayList<>();
@@ -72,15 +74,21 @@ public Builder setPeerRoles(List<String> roles) {
return this;
}
+ public Builder setMetaDataLayoutVersion(int version) {
+ this.metadataLayoutVersion = version;
+ return this;
+ }
+
public ScmInfo build() {
- return new ScmInfo(clusterId, scmId, peerRoles);
+ return new ScmInfo(clusterId, scmId, peerRoles, metadataLayoutVersion);
}
}
- private ScmInfo(String clusterId, String scmId, List<String> peerRoles) {
+ private ScmInfo(String clusterId, String scmId, List<String> peerRoles, int
metadataLayoutVersion) {
this.clusterId = clusterId;
this.scmId = scmId;
this.peerRoles = Collections.unmodifiableList(peerRoles);
+ this.metadataLayoutVersion = metadataLayoutVersion;
}
/**
@@ -107,4 +115,8 @@ public List<String> getPeerRoles() {
return peerRoles;
}
+ public int getMetaDataLayoutVersion() {
+ return metadataLayoutVersion;
+ }
+
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
index 02e68515f3..3b12011405 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java
@@ -41,7 +41,8 @@ public enum HDDSLayoutFeature implements LayoutFeature {
HADOOP_PRC_PORTS_IN_DATANODEDETAILS(7, "Adding Hadoop RPC ports " +
"to DatanodeDetails."),
HBASE_SUPPORT(8, "Datanode RocksDB Schema Version 3 has an extra table " +
- "for the last chunk of blocks to support HBase.)");
+ "for the last chunk of blocks to support HBase.)"),
+ DATA_DISTRIBUTION(9, "Enhanced block deletion function for data distribution
feature.");
////////////////////////////// //////////////////////////////
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeletedBlock.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeletedBlock.java
new file mode 100644
index 0000000000..b611541578
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeletedBlock.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.common;
+
+import org.apache.hadoop.hdds.client.BlockID;
+
+/**
+ * DeletedBlock of Ozone (BlockID + usedBytes).
+ */
+public class DeletedBlock {
+
+ private BlockID blockID;
+ private long size;
+ private long replicatedSize;
+
+ public DeletedBlock(BlockID blockID, long size, long replicatedSize) {
+ this.blockID = blockID;
+ this.size = size;
+ this.replicatedSize = replicatedSize;
+ }
+
+ public BlockID getBlockID() {
+ return this.blockID;
+ }
+
+ public long getSize() {
+ return this.size;
+ }
+
+ public long getReplicatedSize() {
+ return this.replicatedSize;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append(" localID: ").append(blockID.getContainerBlockID().getLocalID());
+ sb.append(" containerID:
").append(blockID.getContainerBlockID().getContainerID());
+ sb.append(" size: ").append(size);
+ sb.append(" replicatedSize: ").append(replicatedSize);
+ return sb.toString();
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index a7324d481c..1fecbe58d3 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -278,7 +279,9 @@ public ScmInfo getScmInfo() throws IOException {
resp = wrappedResponse.getGetScmInfoResponse();
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
- .setScmId(resp.getScmId());
+ .setScmId(resp.getScmId())
+ .setMetaDataLayoutVersion(resp.hasMetaDataLayoutVersion() ?
+ resp.getMetaDataLayoutVersion() :
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
return builder.build();
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 7072d6090b..725fc784f5 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -133,6 +133,7 @@
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import
org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -775,7 +776,9 @@ public ScmInfo getScmInfo() throws IOException {
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
.setScmId(resp.getScmId())
- .setPeerRoles(resp.getPeerRolesList());
+ .setPeerRoles(resp.getPeerRolesList())
+ .setMetaDataLayoutVersion(resp.hasMetaDataLayoutVersion() ?
+ resp.getMetaDataLayoutVersion() :
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
return builder.build();
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
index 5e8b0e1724..73ce40127e 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
/**
@@ -29,14 +30,21 @@
public final class BlockGroup {
private String groupID;
+ @Deprecated
private List<BlockID> blockIDs;
+ private List<DeletedBlock> deletedBlocks;
- private BlockGroup(String groupID, List<BlockID> blockIDs) {
+ private BlockGroup(String groupID, List<BlockID> blockIDs,
List<DeletedBlock> deletedBlocks) {
this.groupID = groupID;
- this.blockIDs = blockIDs;
+ this.blockIDs = blockIDs == null ? new ArrayList<>() : blockIDs;
+ this.deletedBlocks = deletedBlocks == null ? new ArrayList<>() :
deletedBlocks;
}
- public List<BlockID> getBlockIDList() {
+ public List<DeletedBlock> getAllDeletedBlocks() {
+ return deletedBlocks;
+ }
+
+ public List<BlockID> getBlockIDs() {
return blockIDs;
}
@@ -45,6 +53,24 @@ public String getGroupID() {
}
public KeyBlocks getProto() {
+ return deletedBlocks.isEmpty() ? getProtoForBlockID() :
getProtoForDeletedBlock();
+ }
+
+ public KeyBlocks getProtoForDeletedBlock() {
+ KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
+ for (DeletedBlock block : deletedBlocks) {
+ ScmBlockLocationProtocolProtos.DeletedBlock deletedBlock =
ScmBlockLocationProtocolProtos.DeletedBlock
+ .newBuilder()
+ .setBlockId(block.getBlockID().getProtobuf())
+ .setSize(block.getSize())
+ .setReplicatedSize(block.getReplicatedSize())
+ .build();
+ kbb.addDeletedBlocks(deletedBlock);
+ }
+ return kbb.setKey(groupID).build();
+ }
+
+ public KeyBlocks getProtoForBlockID() {
KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
for (BlockID block : blockIDs) {
kbb.addBlocks(block.getProtobuf());
@@ -58,6 +84,10 @@ public KeyBlocks getProto() {
* @return a group of blocks.
*/
public static BlockGroup getFromProto(KeyBlocks proto) {
+ return proto.getDeletedBlocksList().isEmpty() ? getFromBlockIDProto(proto)
: getFromDeletedBlockProto(proto);
+ }
+
+ public static BlockGroup getFromBlockIDProto(KeyBlocks proto) {
List<BlockID> blockIDs = new ArrayList<>();
for (HddsProtos.BlockID block : proto.getBlocksList()) {
blockIDs.add(new BlockID(block.getContainerBlockID().getContainerID(),
@@ -67,6 +97,20 @@ public static BlockGroup getFromProto(KeyBlocks proto) {
.addAllBlockIDs(blockIDs).build();
}
+ public static BlockGroup getFromDeletedBlockProto(KeyBlocks proto) {
+ List<DeletedBlock> blocks = new ArrayList<>();
+ for (ScmBlockLocationProtocolProtos.DeletedBlock block :
proto.getDeletedBlocksList()) {
+ HddsProtos.ContainerBlockID containerBlockId =
block.getBlockId().getContainerBlockID();
+
+ blocks.add(new DeletedBlock(new
BlockID(containerBlockId.getContainerID(),
+ containerBlockId.getLocalID()),
+ block.getSize(),
+ block.getReplicatedSize()));
+ }
+ return BlockGroup.newBuilder().setKeyName(proto.getKey())
+ .addAllDeletedBlocks(blocks).build();
+ }
+
public static Builder newBuilder() {
return new Builder();
}
@@ -75,7 +119,8 @@ public static Builder newBuilder() {
public String toString() {
return "BlockGroup[" +
"groupID='" + groupID + '\'' +
- ", blockIDs=" + blockIDs +
+ ", blockIDs=" + blockIDs + '\'' +
+ ", deletedBlocks=" + deletedBlocks +
']';
}
@@ -85,20 +130,26 @@ public String toString() {
public static class Builder {
private String groupID;
- private List<BlockID> blockIDs;
+ private List<BlockID> blocksIDs;
+ private List<DeletedBlock> blocks;
public Builder setKeyName(String blockGroupID) {
this.groupID = blockGroupID;
return this;
}
- public Builder addAllBlockIDs(List<BlockID> keyBlocks) {
- this.blockIDs = keyBlocks;
+ public Builder addAllBlockIDs(List<BlockID> blockIDs) {
+ this.blocksIDs = blockIDs;
+ return this;
+ }
+
+ public Builder addAllDeletedBlocks(List<DeletedBlock> keyBlocks) {
+ this.blocks = keyBlocks;
return this;
}
public BlockGroup build() {
- return new BlockGroup(groupID, blockIDs);
+ return new BlockGroup(groupID, blocksIDs, blocks);
}
}
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocolPB/TestScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocolPB/TestScmBlockLocationProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000000..551a17627f
--- /dev/null
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocolPB/TestScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.protocolPB;
+
+import static
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.INTERNAL_ERROR;
+import static
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.OK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
+import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for ScmBlockLocationProtocolClientSideTranslatorPB.
+ */
+public class TestScmBlockLocationProtocolClientSideTranslatorPB {
+
+ private SCMBlockLocationFailoverProxyProvider mockProxyProvider;
+ private ScmBlockLocationProtocolPB mockRpcProxy;
+ private ScmBlockLocationProtocolClientSideTranslatorPB translator;
+
+ @BeforeEach
+ public void setUp() {
+ mockProxyProvider = mock(SCMBlockLocationFailoverProxyProvider.class);
+ mockRpcProxy = mock(ScmBlockLocationProtocolPB.class);
+
when(mockProxyProvider.getInterface()).thenReturn(ScmBlockLocationProtocolPB.class);
+ when(mockProxyProvider.getProxy()).thenReturn(new
FailoverProxyProvider.ProxyInfo<>(mockRpcProxy, null));
+ translator = new
ScmBlockLocationProtocolClientSideTranslatorPB(mockProxyProvider);
+ }
+
+ @Test
+ public void testGetScmInfo() throws IOException, ServiceException {
+ // Setup
+ HddsProtos.GetScmInfoResponseProto mockResponse =
HddsProtos.GetScmInfoResponseProto.newBuilder()
+ .setClusterId("test-cluster-id")
+ .setScmId("test-scm-id")
+ .setMetaDataLayoutVersion(1)
+ .build();
+
+ SCMBlockLocationResponse responseWrapper =
SCMBlockLocationResponse.newBuilder()
+ .setStatus(OK)
+ .setGetScmInfoResponse(mockResponse)
+ .setCmdType(ScmBlockLocationProtocolProtos.Type.GetScmInfo)
+ .build();
+
+ when(mockRpcProxy.send(any(), any(SCMBlockLocationRequest.class)))
+ .thenReturn(responseWrapper);
+
+ // Execute
+ ScmInfo result = translator.getScmInfo();
+
+ // Verify
+ assertEquals("test-cluster-id", result.getClusterId());
+ assertEquals("test-scm-id", result.getScmId());
+ assertEquals(1, result.getMetaDataLayoutVersion());
+ verify(mockRpcProxy).send(any(), any(SCMBlockLocationRequest.class));
+ }
+
+ @Test
+ public void testGetScmInfoWithoutMetaDataLayoutVersion() throws IOException,
ServiceException {
+ // Setup
+ HddsProtos.GetScmInfoResponseProto mockResponse =
HddsProtos.GetScmInfoResponseProto.newBuilder()
+ .setClusterId("test-cluster-id")
+ .setScmId("test-scm-id")
+ .build();
+
+ SCMBlockLocationResponse responseWrapper =
SCMBlockLocationResponse.newBuilder()
+ .setStatus(OK)
+ .setGetScmInfoResponse(mockResponse)
+ .setCmdType(ScmBlockLocationProtocolProtos.Type.GetScmInfo)
+ .build();
+
+ when(mockRpcProxy.send(any(), any(SCMBlockLocationRequest.class)))
+ .thenReturn(responseWrapper);
+
+ // Execute
+ ScmInfo result = translator.getScmInfo();
+
+ // Verify
+ assertEquals("test-cluster-id", result.getClusterId());
+ assertEquals("test-scm-id", result.getScmId());
+ assertEquals(0, result.getMetaDataLayoutVersion());
+ verify(mockRpcProxy).send(any(), any(SCMBlockLocationRequest.class));
+ }
+
+ @Test
+ public void testGetScmInfoWithError() throws ServiceException {
+ // Setup
+ SCMBlockLocationResponse errorResponse =
SCMBlockLocationResponse.newBuilder()
+ .setStatus(INTERNAL_ERROR)
+ .setMessage("Internal error occurred")
+ .setCmdType(ScmBlockLocationProtocolProtos.Type.GetScmInfo)
+ .build();
+
+ when(mockRpcProxy.send(any(), any(SCMBlockLocationRequest.class)))
+ .thenReturn(errorResponse);
+
+ // Execute & Verify
+ SCMException exception = assertThrows(SCMException.class, () -> {
+ translator.getScmInfo();
+ });
+ assertEquals("Internal error occurred", exception.getMessage());
+ verify(mockRpcProxy).send(any(), any(SCMBlockLocationRequest.class));
+ }
+}
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index ef76205d91..8bc9697482 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -280,6 +280,7 @@ message GetScmInfoResponseProto {
required string clusterId = 1;
required string scmId = 2;
repeated string peerRoles = 3;
+ optional int32 metaDataLayoutVersion = 4;
}
message AddScmRequestProto {
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index fc24d2562f..ca4c50d4c7 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -181,6 +181,13 @@ message DeleteScmKeyBlocksRequestProto {
message KeyBlocks {
required string key = 1;
repeated BlockID blocks = 2;
+ repeated DeletedBlock deletedBlocks = 3;
+}
+
+message DeletedBlock {
+ required BlockID blockId = 1;
+ optional uint64 size = 2 [default = 0];
+ optional uint64 replicatedSize = 3 [default = 0];
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 9b46968424..7a887828f9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -225,16 +226,29 @@ public void deleteBlocks(List<BlockGroup>
keyBlocksInfoList)
for (BlockGroup bg : keyBlocksInfoList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting blocks {}",
- StringUtils.join(",", bg.getBlockIDList()));
+ StringUtils.join(",", (!bg.getBlockIDs().isEmpty()) ?
bg.getBlockIDs() : bg.getAllDeletedBlocks()));
}
- for (BlockID block : bg.getBlockIDList()) {
- long containerID = block.getContainerID();
- if (containerBlocks.containsKey(containerID)) {
- containerBlocks.get(containerID).add(block.getLocalID());
- } else {
- List<Long> item = new ArrayList<>();
- item.add(block.getLocalID());
- containerBlocks.put(containerID, item);
+ if (!bg.getBlockIDs().isEmpty()) {
+ for (BlockID block : bg.getBlockIDs()) {
+ long containerID = block.getContainerID();
+ if (containerBlocks.containsKey(containerID)) {
+ containerBlocks.get(containerID).add(block.getLocalID());
+ } else {
+ List<Long> item = new ArrayList<>();
+ item.add(block.getLocalID());
+ containerBlocks.put(containerID, item);
+ }
+ }
+ } else {
+ for (DeletedBlock block : bg.getAllDeletedBlocks()) {
+ long containerID = block.getBlockID().getContainerID();
+ if (containerBlocks.containsKey(containerID)) {
+
containerBlocks.get(containerID).add(block.getBlockID().getLocalID());
+ } else {
+ List<Long> item = new ArrayList<>();
+ item.add(block.getBlockID().getLocalID());
+ containerBlocks.put(containerID, item);
+ }
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
index a01effa3a2..5bb6e01b28 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java
@@ -117,5 +117,13 @@ public void updateDeleteKeySuccessBlocks(long keys) {
public void updateDeleteKeyFailedBlocks(long keys) {
deleteKeyBlocksFailure.incr(keys);
}
+
+ public long getDeleteKeySuccessBlocks() {
+ return deleteKeyBlocksSuccess.value();
+ }
+
+ public long getDeleteKeyFailedBlocks() {
+ return deleteKeyBlocksFailure.value();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 94fd0f5dd3..7ac65e4f30 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -145,6 +145,19 @@ private SCMBlockLocationResponse processMessage(
request.getAllocateScmBlockRequest(), request.getVersion()));
break;
case DeleteScmKeyBlocks:
+ if (scm.getLayoutVersionManager().needsFinalization() &&
+ !scm.getLayoutVersionManager()
+ .isAllowed(HDDSLayoutFeature.DATA_DISTRIBUTION)
+ ) {
+ boolean isRequestHasNewData =
request.getDeleteScmKeyBlocksRequest().getKeyBlocksList().stream()
+ .anyMatch(keyBlocks -> keyBlocks.getDeletedBlocksCount() > 0);
+
+ if (isRequestHasNewData) {
+ throw new SCMException("Cluster is not finalized yet, it is"
+ + " not enabled to to handle data distribution feature",
+ SCMException.ResultCodes.INTERNAL_ERROR);
+ }
+ }
response.setDeleteScmKeyBlocksResponse(
deleteScmKeyBlocks(request.getDeleteScmKeyBlocksRequest()));
break;
@@ -251,6 +264,7 @@ public HddsProtos.GetScmInfoResponseProto getScmInfo(
return HddsProtos.GetScmInfoResponseProto.newBuilder()
.setClusterId(scmInfo.getClusterId())
.setScmId(scmInfo.getScmId())
+ .setMetaDataLayoutVersion(scmInfo.getMetaDataLayoutVersion())
.build();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 8ce5ae44ab..bbba37fdc2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1012,6 +1012,7 @@ public HddsProtos.GetScmInfoResponseProto getScmInfo(
.setClusterId(scmInfo.getClusterId())
.setScmId(scmInfo.getScmId())
.addAllPeerRoles(scmInfo.getPeerRoles())
+ .setMetaDataLayoutVersion(scmInfo.getMetaDataLayoutVersion())
.build();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 60c6384ba8..2330332e43 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -31,6 +31,7 @@
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
@@ -80,6 +81,7 @@
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.DeletedBlock;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,13 +270,13 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
long totalBlocks = 0;
for (BlockGroup bg : keyBlocksInfoList) {
- totalBlocks += bg.getBlockIDList().size();
+ totalBlocks += (!bg.getBlockIDs().isEmpty()) ? bg.getBlockIDs().size() :
bg.getAllDeletedBlocks().size();
}
+ List<DeleteBlockGroupResult> results = new ArrayList<>();
if (LOG.isDebugEnabled()) {
LOG.debug("SCM is informed by OM to delete {} keys. Total blocks to
deleted {}.",
keyBlocksInfoList.size(), totalBlocks);
}
- List<DeleteBlockGroupResult> results = new ArrayList<>();
Map<String, String> auditMap = Maps.newHashMap();
ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
Exception e = null;
@@ -312,7 +314,10 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
}
for (BlockGroup bg : keyBlocksInfoList) {
List<DeleteBlockResult> blockResult = new ArrayList<>();
- for (BlockID b : bg.getBlockIDList()) {
+ for (DeletedBlock b : bg.getAllDeletedBlocks()) {
+ blockResult.add(new DeleteBlockResult(b.getBlockID(), resultCode));
+ }
+ for (BlockID b : bg.getBlockIDs()) {
blockResult.add(new DeleteBlockResult(b, resultCode));
}
results.add(new DeleteBlockGroupResult(bg.getGroupID(), blockResult));
@@ -335,7 +340,8 @@ public ScmInfo getScmInfo() throws IOException {
ScmInfo.Builder builder =
new ScmInfo.Builder()
.setClusterId(scm.getScmStorageConfig().getClusterID())
- .setScmId(scm.getScmStorageConfig().getScmId());
+ .setScmId(scm.getScmStorageConfig().getScmId())
+
.setMetaDataLayoutVersion(scm.getLayoutVersionManager().getMetadataLayoutVersion());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
@@ -478,4 +484,9 @@ public AuditMessage buildAuditMessageForFailure(AuditAction
op, Map<String,
public void close() throws IOException {
stop();
}
+
+ @VisibleForTesting
+ public SCMPerformanceMetrics getMetrics() {
+ return perfMetrics;
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 03774c4869..4b577ec4ed 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -907,7 +907,8 @@ public ScmInfo getScmInfo() {
new ScmInfo.Builder()
.setClusterId(scm.getScmStorageConfig().getClusterID())
.setScmId(scm.getScmStorageConfig().getScmId())
-
.setPeerRoles(scm.getScmHAManager().getRatisServer().getRatisRoles());
+
.setPeerRoles(scm.getScmHAManager().getRatisServer().getRatisRoles())
+
.setMetaDataLayoutVersion(scm.getLayoutVersionManager().getMetadataLayoutVersion());
ScmInfo info = builder.build();
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.GET_SCM_INFO, null));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
index 1c220ef8d3..c0750352a1 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
@@ -617,6 +617,7 @@ public void testScmInfo(@TempDir Path tempDir) throws
Exception {
ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
assertEquals(clusterId, scmInfo.getClusterId());
assertEquals(scmId, scmInfo.getScmId());
+ assertTrue(scmInfo.getMetaDataLayoutVersion() >= 0);
String expectedVersion = HddsVersionInfo.HDDS_VERSION_INFO.getVersion();
String actualVersion = scm.getSoftwareVersion();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
new file mode 100644
index 0000000000..062378cbc3
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATA_DISTRIBUTION;
+import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.HBASE_SUPPORT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import
org.apache.hadoop.hdds.scm.container.placement.metrics.SCMPerformanceMetrics;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext;
+import org.apache.hadoop.hdds.upgrade.TestHddsUpgradeUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.UniformDatanodesFactory;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
+import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * DeletionService test to Pass Usage from OM to SCM.
+ */
+public class TestKeyDeletionService {
+ private static final String CLIENT_ID = UUID.randomUUID().toString();
+ private static final String VOLUME_NAME = "vol1";
+ private static final String BUCKET_NAME = "bucket1";
+ private static final int KEY_SIZE = 5 * 1024; // 5 KB
+ private static MiniOzoneCluster cluster;
+ private static StorageContainerLocationProtocol scmClient;
+ private static OzoneBucket bucket = null;
+
+ @BeforeAll
+ public static void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
TimeUnit.MILLISECONDS);
+ conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY,
HBASE_SUPPORT.layoutVersion());
+
+ InjectedUpgradeFinalizationExecutor<SCMUpgradeFinalizationContext>
scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor<>();
+ SCMConfigurator configurator = new SCMConfigurator();
+ configurator.setUpgradeFinalizationExecutor(scmFinalizationExecutor);
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(9)
+ .setSCMConfigurator(configurator)
+ .setDatanodeFactory(UniformDatanodesFactory.newBuilder()
+ .setLayoutVersion(HBASE_SUPPORT.layoutVersion()).build())
+ .build();
+ cluster.waitForClusterToBeReady();
+ scmClient = cluster.getStorageContainerLocationClient();
+ assertEquals(HBASE_SUPPORT.ordinal(),
scmClient.getScmInfo().getMetaDataLayoutVersion());
+ OzoneClient ozoneClient = cluster.newClient();
+ // create a volume and a bucket to be used by OzoneFileSystem
+ ozoneClient.getObjectStore().createVolume(VOLUME_NAME);
+
ozoneClient.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME);
+ bucket =
ozoneClient.getObjectStore().getVolume(VOLUME_NAME).getBucket(BUCKET_NAME);
+ }
+
+ @AfterAll
+ public static void teardown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public static Stream<Arguments> replicaType() {
+ return Stream.of(
+ arguments("RATIS", "ONE"),
+ arguments("RATIS", "THREE")
+ );
+ }
+
+ public static Stream<Arguments> ecType() {
+ return Stream.of(
+ arguments(ECReplicationConfig.EcCodec.RS, 3, 2, 2 * MB),
+ arguments(ECReplicationConfig.EcCodec.RS, 6, 3, 2 * MB)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("replicaType")
+ public void testDeletedKeyBytesPropagatedToSCM(String type, String factor)
throws Exception {
+ String keyName = UUID.randomUUID().toString();
+ ReplicationConfig replicationConfig = RatisReplicationConfig
+ .getInstance(ReplicationFactor.valueOf(factor).toProto());
+ SCMPerformanceMetrics metrics =
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+ long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks();
+ long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks();
+ // Step 1: write a key
+ createKey(keyName, replicationConfig);
+ // Step 2: Spy on BlockManager and inject it into SCM
+ BlockManager spyManager = injectSpyBlockManager(cluster);
+ // Step 3: Delete the key (which triggers deleteBlocks call)
+ bucket.deleteKey(keyName);
+ // Step 4: Verify deleteBlocks call and capture argument
+ verifyAndAssertQuota(spyManager, replicationConfig);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 1, 50, 1000);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
+
+ // Launch finalization from the client. In the current implementation,
+ // this call will block until finalization completes.
+ Future<?> finalizationFuture = Executors.newSingleThreadExecutor().submit(
+ () -> {
+ try {
+ scmClient.finalizeScmUpgrade(CLIENT_ID);
+ } catch (IOException ex) {
+ fail("finalization client failed", ex);
+ }
+ });
+ finalizationFuture.get();
+ TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+ assertEquals(DATA_DISTRIBUTION.ordinal(),
scmClient.getScmInfo().getMetaDataLayoutVersion());
+ // create and delete another key to verify the process after feature is
finalized
+ keyName = UUID.randomUUID().toString();
+ createKey(keyName, replicationConfig);
+ bucket.deleteKey(keyName);
+ verifyAndAssertQuota(spyManager, replicationConfig);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 2, 50, 1000);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
+ }
+
+ @ParameterizedTest
+ @MethodSource("ecType")
+ void testGetDefaultShouldCreateECReplicationConfFromConfValues(
+ ECReplicationConfig.EcCodec codec, int data, int parity, long chunkSize)
throws Exception {
+ String keyName = UUID.randomUUID().toString();
+ ReplicationConfig replicationConfig = new ECReplicationConfig(data,
parity, codec, (int) chunkSize);
+ SCMPerformanceMetrics metrics =
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+ long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks();
+ long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks();
+ // Step 1: write a key
+ createKey(keyName, replicationConfig);
+ // Step 2: Spy on BlockManager and inject it into SCM
+ BlockManager spyManager = injectSpyBlockManager(cluster);
+ // Step 3: Delete the key (which triggers deleteBlocks call)
+ bucket.deleteKey(keyName);
+ // Step 4: Verify deleteBlocks call and capture argument
+ verifyAndAssertQuota(spyManager, replicationConfig);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 1, 50, 1000);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
+
+ // Launch finalization from the client. In the current implementation,
+ // this call will block until finalization completes.
+ Future<?> finalizationFuture = Executors.newSingleThreadExecutor().submit(
+ () -> {
+ try {
+ scmClient.finalizeScmUpgrade(CLIENT_ID);
+ } catch (IOException ex) {
+ fail("finalization client failed", ex);
+ }
+ });
+ finalizationFuture.get();
+ TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+ assertEquals(DATA_DISTRIBUTION.ordinal(),
scmClient.getScmInfo().getMetaDataLayoutVersion());
+ // create and delete another key to verify the process after feature is
finalized
+ keyName = UUID.randomUUID().toString();
+ createKey(keyName, replicationConfig);
+ bucket.deleteKey(keyName);
+ verifyAndAssertQuota(spyManager, replicationConfig);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 2, 50, 1000);
+ GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
+ }
+
+ private void createKey(String keyName, ReplicationConfig replicationConfig)
throws IOException {
+ byte[] data = new byte[KEY_SIZE];
+ try (OzoneOutputStream out = bucket.createKey(keyName, KEY_SIZE,
+ replicationConfig, new HashMap<>())) {
+ out.write(data);
+ }
+ }
+
+ private BlockManager injectSpyBlockManager(MiniOzoneCluster
miniOzoneCluster) throws Exception {
+ StorageContainerManager scm =
miniOzoneCluster.getStorageContainerManager();
+ BlockManager realManager = scm.getScmBlockManager();
+ BlockManager spyManager = spy(realManager);
+
+ Field field = scm.getClass().getDeclaredField("scmBlockManager");
+ field.setAccessible(true);
+ field.set(scm, spyManager);
+ return spyManager;
+ }
+
+ private void verifyAndAssertQuota(BlockManager spyManager, ReplicationConfig
replicationConfig) throws IOException {
+ ArgumentCaptor<List<BlockGroup>> captor =
ArgumentCaptor.forClass(List.class);
+ verify(spyManager,
timeout(50000).atLeastOnce()).deleteBlocks(captor.capture());
+
+ if (captor.getAllValues().stream().anyMatch(blockGroups ->
blockGroups.stream().anyMatch(
+ group -> group.getAllDeletedBlocks().isEmpty()))) {
+ assertEquals(1,
captor.getAllValues().get(0).get(0).getBlockIDs().size());
+ assertEquals(0,
captor.getAllValues().get(0).get(0).getAllDeletedBlocks().size());
+ return;
+ }
+
+ long totalUsedBytes = captor.getAllValues().get(0).stream()
+ .flatMap(group -> group.getAllDeletedBlocks().stream())
+ .mapToLong(DeletedBlock::getReplicatedSize).sum();
+
+ long totalUnreplicatedBytes = captor.getAllValues().get(0).stream()
+ .flatMap(group -> group.getAllDeletedBlocks().stream())
+ .mapToLong(DeletedBlock::getSize).sum();
+
+ assertEquals(0, captor.getAllValues().get(0).get(0).getBlockIDs().size());
+ assertEquals(1,
captor.getAllValues().get(0).get(0).getAllDeletedBlocks().size());
+ assertEquals(QuotaUtil.getReplicatedSize(KEY_SIZE, replicationConfig),
totalUsedBytes);
+ assertEquals(KEY_SIZE, totalUnreplicatedBytes);
+ }
+}
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index ec9e34cec7..50af367923 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -34,7 +34,6 @@
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
-import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
@@ -659,13 +658,6 @@ String getMultipartKey(long volumeId, long bucketId,
*/
long getBucketId(String volume, String bucket) throws IOException;
- /**
- * Returns {@code List<BlockGroup>} for a key in the deletedTable.
- * @param deletedKey - key to be purged from the deletedTable
- * @return {@link BlockGroup}
- */
- List<BlockGroup> getBlocksForKeyDelete(String deletedKey) throws IOException;
-
/**
* Given a volume/bucket, check whether it contains incomplete MPUs.
*
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index bd186df168..a069817daa 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -124,6 +124,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -137,6 +138,7 @@
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -156,6 +158,7 @@
import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -736,6 +739,8 @@ public PendingKeysDeletion getPendingDeletionKeys(
}
int currentCount = 0;
boolean maxReqSizeExceeded = false;
+ boolean isDataDistributionEnabled =
ozoneManager.getScmInfo().getMetaDataLayoutVersion() >=
+ HDDSLayoutFeature.DATA_DISTRIBUTION.layoutVersion();
while (delKeyIter.hasNext() && currentCount < count) {
RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
@@ -747,11 +752,21 @@ public PendingKeysDeletion getPendingDeletionKeys(
// Skip the key if the filter doesn't allow the file to be deleted.
if (filter == null || filter.apply(Table.newKeyValue(kv.getKey(),
info))) {
- List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
- .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
- .map(b -> new BlockID(b.getContainerID(),
b.getLocalID()))).collect(Collectors.toList());
- BlockGroup keyBlocks =
BlockGroup.newBuilder().setKeyName(kv.getKey())
- .addAllBlockIDs(blockIDS).build();
+ BlockGroup.Builder keyBlocksBuilder =
BlockGroup.newBuilder().setKeyName(kv.getKey());
+ if (isDataDistributionEnabled) {
+ List<DeletedBlock> deletedBlocks =
info.getKeyLocationVersions().stream()
+ .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
+ .map(b -> new DeletedBlock(new
BlockID(b.getContainerID(), b.getLocalID()),
+ b.getLength(),
QuotaUtil.getReplicatedSize(b.getLength(), info.getReplicationConfig()))))
+ .collect(Collectors.toList());
+ keyBlocksBuilder.addAllDeletedBlocks(deletedBlocks);
+ } else {
+ List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
+ .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
+ .map(b -> new BlockID(b.getContainerID(),
b.getLocalID()))).collect(Collectors.toList());
+ keyBlocksBuilder.addAllBlockIDs(blockIDS);
+ }
+ BlockGroup keyBlocks = keyBlocksBuilder.build();
int keyBlockSerializedSize =
keyBlocks.getProto().getSerializedSize();
serializedSize += keyBlockSerializedSize;
if (serializedSize > ratisByteLimit) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 04e0998219..955603b044 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -61,7 +61,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.TableCacheMetrics;
import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -81,7 +80,6 @@
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -1770,32 +1768,6 @@ public long getBucketId(String volume, String bucket)
throws IOException {
return omBucketInfo.getObjectID();
}
- @Override
- public List<BlockGroup> getBlocksForKeyDelete(String deletedKey)
- throws IOException {
- RepeatedOmKeyInfo omKeyInfo = getDeletedTable().get(deletedKey);
- if (omKeyInfo == null) {
- return null;
- }
-
- List<BlockGroup> result = new ArrayList<>();
- // Add all blocks from all versions of the key to the deletion list
- for (OmKeyInfo info : omKeyInfo.cloneOmKeyInfoList()) {
- for (OmKeyLocationInfoGroup keyLocations :
- info.getKeyLocationVersions()) {
- List<BlockID> item = keyLocations.getLocationList().stream()
- .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
- .collect(Collectors.toList());
- BlockGroup keyBlocks = BlockGroup.newBuilder()
- .setKeyName(deletedKey)
- .addAllBlockIDs(item)
- .build();
- result.add(keyBlocks);
- }
- }
- return result;
- }
-
@Override
public boolean containsIncompleteMPUs(String volume, String bucket)
throws IOException {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a5df1337dc..0a2c70984a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -24,7 +24,6 @@
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.hdds.utils.HAUtils.getScmInfo;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
import static org.apache.hadoop.ozone.OmUtils.MAX_TRXN_ID;
@@ -500,6 +499,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private UncheckedAutoCloseableSupplier<IOmMetadataReader> rcOmMetadataReader;
private OmSnapshotManager omSnapshotManager;
private volatile DirectoryDeletingService dirDeletingService;
+ private ScmInfo scmInfo;
@SuppressWarnings("methodlength")
private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
@@ -617,9 +617,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption
startupOption)
// For testing purpose only, not hit scm from om as Hadoop UGI can't login
// two principals in the same JVM.
- ScmInfo scmInfo;
if (!testSecureOmFlag) {
- scmInfo = getScmInfo(configuration);
+ scmInfo = HAUtils.getScmInfo(configuration);
if (!scmInfo.getClusterId().equals(omStorage.getClusterID())) {
logVersionMismatch(conf, scmInfo);
throw new OMException("SCM version info mismatch.",
@@ -893,7 +892,7 @@ public static OzoneManager createOm(OzoneConfiguration conf,
return new OzoneManager(conf, startupOption);
}
- private void logVersionMismatch(OzoneConfiguration conf, ScmInfo scmInfo) {
+ private void logVersionMismatch(OzoneConfiguration conf, ScmInfo info) {
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
StringBuilder scmBlockAddressBuilder = new StringBuilder();
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
@@ -905,9 +904,9 @@ private void logVersionMismatch(OzoneConfiguration conf,
ScmInfo scmInfo) {
scmBlockAddress = scmBlockAddress.substring(0,
scmBlockAddress.lastIndexOf(","));
}
- if (!scmInfo.getClusterId().equals(omStorage.getClusterID())) {
+ if (!info.getClusterId().equals(omStorage.getClusterID())) {
LOG.error("clusterId from {} is {}, but is {} in {}",
- scmBlockAddress, scmInfo.getClusterId(),
+ scmBlockAddress, info.getClusterId(),
omStorage.getClusterID(), omStorage.getVersionFile());
}
}
@@ -1040,6 +1039,13 @@ public ScmClient getScmClient() {
return scmClient;
}
+ /**
+ * Return scmInfo.
+ */
+ public ScmInfo getScmInfo() {
+ return this.scmInfo;
+ }
+
/**
* Return SecretManager for OM.
*/
@@ -1533,7 +1539,7 @@ public static boolean omInit(OzoneConfiguration conf)
throws IOException,
StorageState state = omStorage.getState();
String scmId;
try {
- ScmInfo scmInfo = getScmInfo(conf);
+ ScmInfo scmInfo = HAUtils.getScmInfo(conf);
scmId = scmInfo.getScmId();
if (scmId == null || scmId.isEmpty()) {
throw new IOException("Invalid SCM ID");
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 5edd683a43..40075ec897 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.DeletedBlock;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,38 +146,49 @@ private Pipeline createPipeline(DatanodeDetails datanode)
{
}
@Override
- public List<DeleteBlockGroupResult> deleteKeyBlocks(
- List<BlockGroup> keyBlocksInfoList) throws IOException {
+ public List<DeleteBlockGroupResult> deleteKeyBlocks(List<BlockGroup>
keyBlocksInfoList)
+ throws IOException {
List<DeleteBlockGroupResult> results = new ArrayList<>();
- List<DeleteBlockResult> blockResultList = new ArrayList<>();
- Result result;
for (BlockGroup keyBlocks : keyBlocksInfoList) {
- for (BlockID blockKey : keyBlocks.getBlockIDList()) {
- currentCall++;
- switch (this.failCallsFrequency) {
- case 0:
- result = success;
- numBlocksDeleted++;
- break;
- case 1:
- result = unknownFailure;
- break;
- default:
- if (currentCall % this.failCallsFrequency == 0) {
- result = unknownFailure;
- } else {
- result = success;
- numBlocksDeleted++;
- }
+ List<DeleteBlockResult> blockResultList = new ArrayList<>();
+ // Process BlockIDs directly if present
+ if (keyBlocks.getBlockIDs() != null &&
!keyBlocks.getBlockIDs().isEmpty()) {
+ for (BlockID blockID : keyBlocks.getBlockIDs()) {
+ blockResultList.add(processBlock(blockID));
+ }
+ } else {
+ // Otherwise, use DeletedBlock's BlockID
+ for (DeletedBlock deletedBlock : keyBlocks.getAllDeletedBlocks()) {
+ blockResultList.add(processBlock(deletedBlock.getBlockID()));
}
- blockResultList.add(new DeleteBlockResult(blockKey, result));
}
- results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
- blockResultList));
+ results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
blockResultList));
}
return results;
}
+ private DeleteBlockResult processBlock(BlockID blockID) {
+ currentCall++;
+ Result result;
+ switch (failCallsFrequency) {
+ case 0:
+ result = success;
+ numBlocksDeleted++;
+ break;
+ case 1:
+ result = unknownFailure;
+ break;
+ default:
+ if (currentCall % failCallsFrequency == 0) {
+ result = unknownFailure;
+ } else {
+ result = success;
+ numBlocksDeleted++;
+ }
+ }
+ return new DeleteBlockResult(blockID, result);
+ }
+
@Override
public ScmInfo getScmInfo() throws IOException {
ScmInfo.Builder builder =
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index ba6f644c49..3d1b63e2f7 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -72,6 +72,7 @@
import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeletedBlock;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -799,8 +800,10 @@ public void testFailingModifiedKeyPurge() throws
IOException {
return
OzoneManagerProtocolProtos.OMResponse.newBuilder().setCmdType(purgeRequest.get().getCmdType())
.setStatus(OzoneManagerProtocolProtos.Status.TIMEOUT).build();
});
- List<BlockGroup> blockGroups =
Collections.singletonList(BlockGroup.newBuilder().setKeyName("key1")
- .addAllBlockIDs(Collections.singletonList(new BlockID(1,
1))).build());
+ List<BlockGroup> blockGroups = Collections.singletonList(BlockGroup
+ .newBuilder().setKeyName("key1")
+ .addAllDeletedBlocks(Collections.singletonList(new
DeletedBlock(new BlockID(1, 1), 3, 1)))
+ .build());
List<String> renameEntriesToBeDeleted =
Collections.singletonList("key2");
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
.setBucketName("buck")
@@ -1074,7 +1077,7 @@ private long countBlocksPendingDeletion() {
return keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE, ratisLimit)
.getKeyBlocksList()
.stream()
- .map(BlockGroup::getBlockIDList)
+ .map(b -> b.getBlockIDs().isEmpty() ? b.getAllDeletedBlocks() :
b.getBlockIDs())
.mapToLong(Collection::size)
.sum();
} catch (IOException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]