This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch HDDS-13177
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-13177 by this push:
new bdceafa19a HDDS-13461. Refactor BlockGroup for Upgrade Compatibility
(#8822)
bdceafa19a is described below
commit bdceafa19a225b45ee8487c8d537d5c28c973279
Author: Priyesh Karatha <[email protected]>
AuthorDate: Wed Jul 23 20:07:38 2025 +0530
HDDS-13461. Refactor BlockGroup for Upgrade Compatibility (#8822)
---
...lockLocationProtocolClientSideTranslatorPB.java | 18 ++-
.../org/apache/hadoop/ozone/common/BlockGroup.java | 42 ++----
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 32 ++---
.../hdds/scm/server/SCMBlockProtocolServer.java | 6 +-
...nService.java => TestBlockDeletionService.java} | 153 +++++++++------------
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 25 +---
.../ozone/om/ScmBlockLocationTestingClient.java | 11 +-
.../ozone/om/service/TestKeyDeletingService.java | 2 +-
8 files changed, 117 insertions(+), 172 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index 1fecbe58d3..722e952b81 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -86,6 +86,7 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
private final ScmBlockLocationProtocolPB rpcProxy;
private SCMBlockLocationFailoverProxyProvider failoverProxyProvider;
+ private volatile ScmInfo scmInfo;
/**
* Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
@@ -231,8 +232,12 @@ public List<AllocatedBlock> allocateBlock(
@Override
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
+
+ boolean useDataDistribution = getScmInfoSafe().getMetaDataLayoutVersion()
>=
+ HDDSLayoutFeature.DATA_DISTRIBUTION.layoutVersion();
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
- .map(BlockGroup::getProto).collect(Collectors.toList());
+ .map(blockGroup -> blockGroup.getProto(useDataDistribution))
+ .collect(Collectors.toList());
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
.newBuilder()
.addAllKeyBlocks(keyBlocksProto)
@@ -258,6 +263,14 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
return results;
}
+ private synchronized ScmInfo getScmInfoSafe() throws IOException {
+ if (scmInfo == null || scmInfo.getMetaDataLayoutVersion() <
+ HDDSLayoutFeature.DATA_DISTRIBUTION.layoutVersion()) {
+ getScmInfo(); // refresh cached scmInfo
+ }
+ return scmInfo;
+ }
+
/**
* Gets the cluster Id and Scm Id from SCM.
* @return ScmInfo
@@ -282,7 +295,8 @@ public ScmInfo getScmInfo() throws IOException {
.setScmId(resp.getScmId())
.setMetaDataLayoutVersion(resp.hasMetaDataLayoutVersion() ?
resp.getMetaDataLayoutVersion() :
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
- return builder.build();
+ scmInfo = builder.build();
+ return scmInfo;
}
/**
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
index 73ce40127e..b8d8036d71 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
@@ -30,13 +30,10 @@
public final class BlockGroup {
private String groupID;
- @Deprecated
- private List<BlockID> blockIDs;
private List<DeletedBlock> deletedBlocks;
private BlockGroup(String groupID, List<BlockID> blockIDs,
List<DeletedBlock> deletedBlocks) {
this.groupID = groupID;
- this.blockIDs = blockIDs == null ? new ArrayList<>() : blockIDs;
this.deletedBlocks = deletedBlocks == null ? new ArrayList<>() :
deletedBlocks;
}
@@ -44,19 +41,19 @@ public List<DeletedBlock> getAllDeletedBlocks() {
return deletedBlocks;
}
- public List<BlockID> getBlockIDs() {
- return blockIDs;
- }
-
public String getGroupID() {
return groupID;
}
public KeyBlocks getProto() {
- return deletedBlocks.isEmpty() ? getProtoForBlockID() :
getProtoForDeletedBlock();
+ return getProtoForDeletedBlock();
}
- public KeyBlocks getProtoForDeletedBlock() {
+ public KeyBlocks getProto(boolean isIncludeBlockSize) {
+ return isIncludeBlockSize ? getProtoForDeletedBlock() :
getProtoForBlockID();
+ }
+
+ private KeyBlocks getProtoForDeletedBlock() {
KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
for (DeletedBlock block : deletedBlocks) {
ScmBlockLocationProtocolProtos.DeletedBlock deletedBlock =
ScmBlockLocationProtocolProtos.DeletedBlock
@@ -70,10 +67,10 @@ public KeyBlocks getProtoForDeletedBlock() {
return kbb.setKey(groupID).build();
}
- public KeyBlocks getProtoForBlockID() {
+ private KeyBlocks getProtoForBlockID() {
KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
- for (BlockID block : blockIDs) {
- kbb.addBlocks(block.getProtobuf());
+ for (DeletedBlock block : deletedBlocks) {
+ kbb.addBlocks(block.getBlockID().getProtobuf());
}
return kbb.setKey(groupID).build();
}
@@ -84,31 +81,21 @@ public KeyBlocks getProtoForBlockID() {
* @return a group of blocks.
*/
public static BlockGroup getFromProto(KeyBlocks proto) {
- return proto.getDeletedBlocksList().isEmpty() ? getFromBlockIDProto(proto)
: getFromDeletedBlockProto(proto);
- }
-
- public static BlockGroup getFromBlockIDProto(KeyBlocks proto) {
- List<BlockID> blockIDs = new ArrayList<>();
+ List<DeletedBlock> deletedBlocks = new ArrayList<>();
for (HddsProtos.BlockID block : proto.getBlocksList()) {
- blockIDs.add(new BlockID(block.getContainerBlockID().getContainerID(),
- block.getContainerBlockID().getLocalID()));
+ deletedBlocks.add(new DeletedBlock(new
BlockID(block.getContainerBlockID().getContainerID(),
+ block.getContainerBlockID().getLocalID()), 0, 0));
}
- return BlockGroup.newBuilder().setKeyName(proto.getKey())
- .addAllBlockIDs(blockIDs).build();
- }
- public static BlockGroup getFromDeletedBlockProto(KeyBlocks proto) {
- List<DeletedBlock> blocks = new ArrayList<>();
for (ScmBlockLocationProtocolProtos.DeletedBlock block :
proto.getDeletedBlocksList()) {
HddsProtos.ContainerBlockID containerBlockId =
block.getBlockId().getContainerBlockID();
-
- blocks.add(new DeletedBlock(new
BlockID(containerBlockId.getContainerID(),
+ deletedBlocks.add(new DeletedBlock(new
BlockID(containerBlockId.getContainerID(),
containerBlockId.getLocalID()),
block.getSize(),
block.getReplicatedSize()));
}
return BlockGroup.newBuilder().setKeyName(proto.getKey())
- .addAllDeletedBlocks(blocks).build();
+ .addAllDeletedBlocks(deletedBlocks).build();
}
public static Builder newBuilder() {
@@ -119,7 +106,6 @@ public static Builder newBuilder() {
public String toString() {
return "BlockGroup[" +
"groupID='" + groupID + '\'' +
- ", blockIDs=" + blockIDs + '\'' +
", deletedBlocks=" + deletedBlocks +
']';
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 7a887828f9..ffa9d5a204 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Objects;
import javax.management.ObjectName;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -226,29 +225,16 @@ public void deleteBlocks(List<BlockGroup>
keyBlocksInfoList)
for (BlockGroup bg : keyBlocksInfoList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting blocks {}",
- StringUtils.join(",", (!bg.getBlockIDs().isEmpty()) ?
bg.getBlockIDs() : bg.getAllDeletedBlocks()));
+ StringUtils.join(",", bg.getAllDeletedBlocks()));
}
- if (!bg.getBlockIDs().isEmpty()) {
- for (BlockID block : bg.getBlockIDs()) {
- long containerID = block.getContainerID();
- if (containerBlocks.containsKey(containerID)) {
- containerBlocks.get(containerID).add(block.getLocalID());
- } else {
- List<Long> item = new ArrayList<>();
- item.add(block.getLocalID());
- containerBlocks.put(containerID, item);
- }
- }
- } else {
- for (DeletedBlock block : bg.getAllDeletedBlocks()) {
- long containerID = block.getBlockID().getContainerID();
- if (containerBlocks.containsKey(containerID)) {
-
containerBlocks.get(containerID).add(block.getBlockID().getLocalID());
- } else {
- List<Long> item = new ArrayList<>();
- item.add(block.getBlockID().getLocalID());
- containerBlocks.put(containerID, item);
- }
+ for (DeletedBlock block : bg.getAllDeletedBlocks()) {
+ long containerID = block.getBlockID().getContainerID();
+ if (containerBlocks.containsKey(containerID)) {
+
containerBlocks.get(containerID).add(block.getBlockID().getLocalID());
+ } else {
+ List<Long> item = new ArrayList<>();
+ item.add(block.getBlockID().getLocalID());
+ containerBlocks.put(containerID, item);
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 2330332e43..2dc862f208 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -46,7 +46,6 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -270,7 +269,7 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
long totalBlocks = 0;
for (BlockGroup bg : keyBlocksInfoList) {
- totalBlocks += (!bg.getBlockIDs().isEmpty()) ? bg.getBlockIDs().size() :
bg.getAllDeletedBlocks().size();
+ totalBlocks += bg.getAllDeletedBlocks().size();
}
List<DeleteBlockGroupResult> results = new ArrayList<>();
if (LOG.isDebugEnabled()) {
@@ -317,9 +316,6 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
for (DeletedBlock b : bg.getAllDeletedBlocks()) {
blockResult.add(new DeleteBlockResult(b.getBlockID(), resultCode));
}
- for (BlockID b : bg.getBlockIDs()) {
- blockResult.add(new DeleteBlockResult(b, resultCode));
- }
results.add(new DeleteBlockGroupResult(bg.getGroupID(), blockResult));
}
auditMap.put("KeyBlockToDelete", keyBlocksInfoList.toString());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestBlockDeletionService.java
similarity index 66%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestBlockDeletionService.java
index 062378cbc3..7151053c62 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletionService.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestBlockDeletionService.java
@@ -20,7 +20,6 @@
import static
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATA_DISTRIBUTION;
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.HBASE_SUPPORT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConsts.MB;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -41,6 +40,7 @@
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import
org.apache.hadoop.hdds.scm.container.placement.metrics.SCMPerformanceMetrics;
@@ -62,6 +62,7 @@
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -70,14 +71,27 @@
/**
* DeletionService test to Pass Usage from OM to SCM.
*/
-public class TestKeyDeletionService {
+public class TestBlockDeletionService {
private static final String CLIENT_ID = UUID.randomUUID().toString();
private static final String VOLUME_NAME = "vol1";
private static final String BUCKET_NAME = "bucket1";
private static final int KEY_SIZE = 5 * 1024; // 5 KB
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocol scmClient;
- private static OzoneBucket bucket = null;
+ private static OzoneBucket bucket;
+ private static SCMPerformanceMetrics metrics;
+ private static
InjectedUpgradeFinalizationExecutor<SCMUpgradeFinalizationContext>
scmFinalizationExecutor;
+
+ public static Stream<Arguments> replicationConfigProvider() {
+ return Stream.of(
+
arguments(RatisReplicationConfig.getInstance(ReplicationFactor.ONE.toProto())),
+
arguments(RatisReplicationConfig.getInstance(ReplicationFactor.THREE.toProto())),
+ arguments(new ECReplicationConfig(3, 2,
ECReplicationConfig.EcCodec.RS, 2 * 1024 * 1024)),
+ arguments(new ECReplicationConfig(6, 3,
ECReplicationConfig.EcCodec.RS, 2 * 1024 * 1024)),
+
arguments(StandaloneReplicationConfig.getInstance(ReplicationFactor.ONE.toProto())),
+
arguments(StandaloneReplicationConfig.getInstance(ReplicationFactor.THREE.toProto()))
+ );
+ }
@BeforeAll
public static void init() throws Exception {
@@ -85,11 +99,12 @@ public static void init() throws Exception {
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
TimeUnit.MILLISECONDS);
conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY,
HBASE_SUPPORT.layoutVersion());
- InjectedUpgradeFinalizationExecutor<SCMUpgradeFinalizationContext>
scmFinalizationExecutor =
- new InjectedUpgradeFinalizationExecutor<>();
+ scmFinalizationExecutor = new InjectedUpgradeFinalizationExecutor<>();
SCMConfigurator configurator = new SCMConfigurator();
configurator.setUpgradeFinalizationExecutor(scmFinalizationExecutor);
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(9)
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(9)
.setSCMConfigurator(configurator)
.setDatanodeFactory(UniformDatanodesFactory.newBuilder()
.setLayoutVersion(HBASE_SUPPORT.layoutVersion()).build())
@@ -97,6 +112,8 @@ public static void init() throws Exception {
cluster.waitForClusterToBeReady();
scmClient = cluster.getStorageContainerLocationClient();
assertEquals(HBASE_SUPPORT.ordinal(),
scmClient.getScmInfo().getMetaDataLayoutVersion());
+ metrics =
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+
OzoneClient ozoneClient = cluster.newClient();
// create a volume and a bucket to be used by OzoneFileSystem
ozoneClient.getObjectStore().createVolume(VOLUME_NAME);
@@ -111,102 +128,71 @@ public static void teardown() {
}
}
- public static Stream<Arguments> replicaType() {
- return Stream.of(
- arguments("RATIS", "ONE"),
- arguments("RATIS", "THREE")
- );
- }
-
- public static Stream<Arguments> ecType() {
- return Stream.of(
- arguments(ECReplicationConfig.EcCodec.RS, 3, 2, 2 * MB),
- arguments(ECReplicationConfig.EcCodec.RS, 6, 3, 2 * MB)
- );
- }
-
- @ParameterizedTest
- @MethodSource("replicaType")
- public void testDeletedKeyBytesPropagatedToSCM(String type, String factor)
throws Exception {
- String keyName = UUID.randomUUID().toString();
- ReplicationConfig replicationConfig = RatisReplicationConfig
- .getInstance(ReplicationFactor.valueOf(factor).toProto());
- SCMPerformanceMetrics metrics =
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+ @Test
+ public void testDeleteKeyQuotaWithUpgrade() throws Exception {
long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks();
long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks();
+
+ ReplicationConfig replicationConfig =
RatisReplicationConfig.getInstance(ReplicationFactor.THREE.toProto());
+ // PRE-UPGRADE
// Step 1: write a key
+ String keyName = UUID.randomUUID().toString();
createKey(keyName, replicationConfig);
// Step 2: Spy on BlockManager and inject it into SCM
- BlockManager spyManager = injectSpyBlockManager(cluster);
+ BlockManager spyManagerBefore = injectSpyBlockManager(cluster);
+ ArgumentCaptor<List<BlockGroup>> captor =
ArgumentCaptor.forClass(List.class);
// Step 3: Delete the key (which triggers deleteBlocks call)
bucket.deleteKey(keyName);
// Step 4: Verify deleteBlocks call and capture argument
- verifyAndAssertQuota(spyManager, replicationConfig);
+ verify(spyManagerBefore,
timeout(50000).atLeastOnce()).deleteBlocks(captor.capture());
+ verifyAndAssertQuota(replicationConfig, captor, false);
GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 1, 50, 1000);
GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
- // Launch finalization from the client. In the current implementation,
- // this call will block until finalization completes.
- Future<?> finalizationFuture = Executors.newSingleThreadExecutor().submit(
- () -> {
- try {
- scmClient.finalizeScmUpgrade(CLIENT_ID);
- } catch (IOException ex) {
- fail("finalization client failed", ex);
- }
- });
+ // UPGRADE SCM (if specified)
+ // Step 5: wait for finalizing upgrade
+ Future<?> finalizationFuture =
Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ scmClient.finalizeScmUpgrade(CLIENT_ID);
+ } catch (IOException ex) {
+ fail("finalization client failed", ex);
+ }
+ });
finalizationFuture.get();
TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
assertEquals(DATA_DISTRIBUTION.ordinal(),
scmClient.getScmInfo().getMetaDataLayoutVersion());
- // create and delete another key to verify the process after feature is
finalized
+
+ // POST-UPGRADE
+ //Step 6: Repeat the same steps in pre-upgrade
keyName = UUID.randomUUID().toString();
createKey(keyName, replicationConfig);
+ BlockManager spyManagerAfter = injectSpyBlockManager(cluster);
bucket.deleteKey(keyName);
- verifyAndAssertQuota(spyManager, replicationConfig);
+ verify(spyManagerAfter,
timeout(50000).atLeastOnce()).deleteBlocks(captor.capture());
+ verifyAndAssertQuota(replicationConfig, captor, true);
GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 2, 50, 1000);
GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
}
@ParameterizedTest
- @MethodSource("ecType")
- void testGetDefaultShouldCreateECReplicationConfFromConfValues(
- ECReplicationConfig.EcCodec codec, int data, int parity, long chunkSize)
throws Exception {
- String keyName = UUID.randomUUID().toString();
- ReplicationConfig replicationConfig = new ECReplicationConfig(data,
parity, codec, (int) chunkSize);
- SCMPerformanceMetrics metrics =
cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics();
+ @MethodSource("replicationConfigProvider")
+ public void
testDeleteKeyQuotaWithDifferentReplicationTypes(ReplicationConfig
replicationConfig) throws Exception {
long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks();
long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks();
+
// Step 1: write a key
+ String keyName = UUID.randomUUID().toString();
createKey(keyName, replicationConfig);
// Step 2: Spy on BlockManager and inject it into SCM
- BlockManager spyManager = injectSpyBlockManager(cluster);
+ BlockManager spyManagerBefore = injectSpyBlockManager(cluster);
+ ArgumentCaptor<List<BlockGroup>> captor =
ArgumentCaptor.forClass(List.class);
// Step 3: Delete the key (which triggers deleteBlocks call)
bucket.deleteKey(keyName);
// Step 4: Verify deleteBlocks call and capture argument
- verifyAndAssertQuota(spyManager, replicationConfig);
+ verify(spyManagerBefore,
timeout(50000).atLeastOnce()).deleteBlocks(captor.capture());
+ verifyAndAssertQuota(replicationConfig, captor, true);
GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 1, 50, 1000);
GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
-
- // Launch finalization from the client. In the current implementation,
- // this call will block until finalization completes.
- Future<?> finalizationFuture = Executors.newSingleThreadExecutor().submit(
- () -> {
- try {
- scmClient.finalizeScmUpgrade(CLIENT_ID);
- } catch (IOException ex) {
- fail("finalization client failed", ex);
- }
- });
- finalizationFuture.get();
- TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
- assertEquals(DATA_DISTRIBUTION.ordinal(),
scmClient.getScmInfo().getMetaDataLayoutVersion());
- // create and delete another key to verify the process after feature is
finalized
- keyName = UUID.randomUUID().toString();
- createKey(keyName, replicationConfig);
- bucket.deleteKey(keyName);
- verifyAndAssertQuota(spyManager, replicationConfig);
- GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() -
initialSuccessBlocks == 2, 50, 1000);
- GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() -
initialFailedBlocks == 0, 50, 1000);
}
private void createKey(String keyName, ReplicationConfig replicationConfig)
throws IOException {
@@ -228,28 +214,23 @@ private BlockManager
injectSpyBlockManager(MiniOzoneCluster miniOzoneCluster) th
return spyManager;
}
- private void verifyAndAssertQuota(BlockManager spyManager, ReplicationConfig
replicationConfig) throws IOException {
- ArgumentCaptor<List<BlockGroup>> captor =
ArgumentCaptor.forClass(List.class);
- verify(spyManager,
timeout(50000).atLeastOnce()).deleteBlocks(captor.capture());
-
- if (captor.getAllValues().stream().anyMatch(blockGroups ->
blockGroups.stream().anyMatch(
- group -> group.getAllDeletedBlocks().isEmpty()))) {
- assertEquals(1,
captor.getAllValues().get(0).get(0).getBlockIDs().size());
- assertEquals(0,
captor.getAllValues().get(0).get(0).getAllDeletedBlocks().size());
- return;
- }
+ private void verifyAndAssertQuota(ReplicationConfig replicationConfig,
+ ArgumentCaptor<List<BlockGroup>> captor,
+ boolean isIncludeBlockSize) throws
IOException {
+ int index = captor.getAllValues().size() - 1;
+ List<BlockGroup> blockGroups = captor.getAllValues().get(index);
- long totalUsedBytes = captor.getAllValues().get(0).stream()
+ long totalUsedBytes = blockGroups.stream()
.flatMap(group -> group.getAllDeletedBlocks().stream())
.mapToLong(DeletedBlock::getReplicatedSize).sum();
- long totalUnreplicatedBytes = captor.getAllValues().get(0).stream()
+ long totalUnreplicatedBytes = blockGroups.stream()
.flatMap(group -> group.getAllDeletedBlocks().stream())
.mapToLong(DeletedBlock::getSize).sum();
- assertEquals(0, captor.getAllValues().get(0).get(0).getBlockIDs().size());
- assertEquals(1,
captor.getAllValues().get(0).get(0).getAllDeletedBlocks().size());
- assertEquals(QuotaUtil.getReplicatedSize(KEY_SIZE, replicationConfig),
totalUsedBytes);
- assertEquals(KEY_SIZE, totalUnreplicatedBytes);
+ assertEquals(1, blockGroups.get(0).getAllDeletedBlocks().size());
+ assertEquals(isIncludeBlockSize ?
+ QuotaUtil.getReplicatedSize(KEY_SIZE, replicationConfig) : 0,
totalUsedBytes);
+ assertEquals(isIncludeBlockSize ? KEY_SIZE : 0, totalUnreplicatedBytes);
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a069817daa..930026b924 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -124,7 +124,6 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -739,8 +738,6 @@ public PendingKeysDeletion getPendingDeletionKeys(
}
int currentCount = 0;
boolean maxReqSizeExceeded = false;
- boolean isDataDistributionEnabled =
ozoneManager.getScmInfo().getMetaDataLayoutVersion() >=
- HDDSLayoutFeature.DATA_DISTRIBUTION.layoutVersion();
while (delKeyIter.hasNext() && currentCount < count) {
RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
@@ -752,21 +749,13 @@ public PendingKeysDeletion getPendingDeletionKeys(
// Skip the key if the filter doesn't allow the file to be deleted.
if (filter == null || filter.apply(Table.newKeyValue(kv.getKey(),
info))) {
- BlockGroup.Builder keyBlocksBuilder =
BlockGroup.newBuilder().setKeyName(kv.getKey());
- if (isDataDistributionEnabled) {
- List<DeletedBlock> deletedBlocks =
info.getKeyLocationVersions().stream()
- .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
- .map(b -> new DeletedBlock(new
BlockID(b.getContainerID(), b.getLocalID()),
- b.getLength(),
QuotaUtil.getReplicatedSize(b.getLength(), info.getReplicationConfig()))))
- .collect(Collectors.toList());
- keyBlocksBuilder.addAllDeletedBlocks(deletedBlocks);
- } else {
- List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
- .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
- .map(b -> new BlockID(b.getContainerID(),
b.getLocalID()))).collect(Collectors.toList());
- keyBlocksBuilder.addAllBlockIDs(blockIDS);
- }
- BlockGroup keyBlocks = keyBlocksBuilder.build();
+ List<DeletedBlock> deletedBlocks =
info.getKeyLocationVersions().stream()
+ .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
+ .map(b -> new DeletedBlock(new
BlockID(b.getContainerID(), b.getLocalID()),
+ b.getLength(),
QuotaUtil.getReplicatedSize(b.getLength(), info.getReplicationConfig()))))
+ .collect(Collectors.toList());
+ BlockGroup keyBlocks =
BlockGroup.newBuilder().setKeyName(kv.getKey())
+ .addAllDeletedBlocks(deletedBlocks).build();
int keyBlockSerializedSize =
keyBlocks.getProto().getSerializedSize();
serializedSize += keyBlockSerializedSize;
if (serializedSize > ratisByteLimit) {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 40075ec897..5b63672ad6 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -152,15 +152,8 @@ public List<DeleteBlockGroupResult>
deleteKeyBlocks(List<BlockGroup> keyBlocksIn
for (BlockGroup keyBlocks : keyBlocksInfoList) {
List<DeleteBlockResult> blockResultList = new ArrayList<>();
// Process BlockIDs directly if present
- if (keyBlocks.getBlockIDs() != null &&
!keyBlocks.getBlockIDs().isEmpty()) {
- for (BlockID blockID : keyBlocks.getBlockIDs()) {
- blockResultList.add(processBlock(blockID));
- }
- } else {
- // Otherwise, use DeletedBlock's BlockID
- for (DeletedBlock deletedBlock : keyBlocks.getAllDeletedBlocks()) {
- blockResultList.add(processBlock(deletedBlock.getBlockID()));
- }
+ for (DeletedBlock deletedBlock : keyBlocks.getAllDeletedBlocks()) {
+ blockResultList.add(processBlock(deletedBlock.getBlockID()));
}
results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
blockResultList));
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 3d1b63e2f7..8170184c30 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -1077,7 +1077,7 @@ private long countBlocksPendingDeletion() {
return keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE, ratisLimit)
.getKeyBlocksList()
.stream()
- .map(b -> b.getBlockIDs().isEmpty() ? b.getAllDeletedBlocks() :
b.getBlockIDs())
+ .map(BlockGroup::getAllDeletedBlocks)
.mapToLong(Collection::size)
.sum();
} catch (IOException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]