This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e1c779abe7 HDDS-13051. Use DatanodeID in server-scm. (#8465)
e1c779abe7 is described below
commit e1c779abe71e1f5dd46bf927ffe3cb3ad753530f
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri May 16 20:30:21 2025 -0700
HDDS-13051. Use DatanodeID in server-scm. (#8465)
---
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 34 +++----
.../SCMDeletedBlockTransactionStatusManager.java | 103 +++++++++------------
.../balancer/AbstractFindTargetGreedy.java | 8 +-
.../scm/container/balancer/FindSourceGreedy.java | 8 +-
.../hadoop/hdds/scm/node/DatanodeUsageInfo.java | 5 +
.../hdds/scm/pipeline/PipelineManagerImpl.java | 27 +++---
.../hdds/scm/safemode/DataNodeSafeModeRule.java | 7 +-
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 8 +-
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 40 ++++----
.../TestSCMDeleteBlocksCommandStatusManager.java | 21 +++--
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 23 ++---
11 files changed, 123 insertions(+), 161 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 8c39de41d1..ed80cdbdb3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -326,24 +325,22 @@ public void addTransactions(Map<Long, List<Long>>
containerBlocksMap)
}
@Override
- public void close() throws IOException {
+ public void close() {
}
private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDeletedBlockTransactions transactions,
- Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas,
- Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+ Set<ContainerReplica> replicas,
+ Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
DeletedBlocksTransaction updatedTxn =
DeletedBlocksTransaction.newBuilder(tx)
- .setCount(transactionStatusManager.getOrDefaultRetryCount(
- tx.getTxID(), 0))
+ .setCount(transactionStatusManager.getRetryCount(tx.getTxID()))
.build();
-
for (ContainerReplica replica : replicas) {
- DatanodeDetails details = replica.getDatanodeDetails();
+ final DatanodeID datanodeID = replica.getDatanodeDetails().getID();
if (!transactionStatusManager.isDuplication(
- details, updatedTxn.getTxID(), commandStatus)) {
- transactions.addTransactionToDN(details.getID(), updatedTxn);
+ datanodeID, tx.getTxID(), commandStatus)) {
+ transactions.addTransactionToDN(datanodeID, updatedTxn);
metrics.incrProcessedTransaction();
}
}
@@ -373,7 +370,7 @@ private Boolean
checkInadequateReplica(Set<ContainerReplica> replicas,
if (!dnList.contains(datanodeDetails)) {
DatanodeDetails dnDetail = replica.getDatanodeDetails();
LOG.debug("Skip Container = {}, because DN = {} is not in dnList.",
- containerId, dnDetail.getUuid());
+ containerId, dnDetail);
return true;
}
}
@@ -426,10 +423,10 @@ public DatanodeDeletedBlockTransactions getTransactions(
// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
- Map<UUID, Map<Long, CmdStatus>> commandStatus =
+ final Map<DatanodeID, Map<Long, CmdStatus>> commandStatus =
getSCMDeletedBlockTransactionStatusManager()
.getCommandStatusByTxId(dnList.stream().
- map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
+ map(DatanodeDetails::getID).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
@@ -458,8 +455,7 @@ public DatanodeDeletedBlockTransactions getTransactions(
metrics.incrSkippedTransaction();
continue;
}
- getTransaction(
- txn, transactions, dnList, replicas, commandStatus);
+ getTransaction(txn, transactions, replicas, commandStatus);
} else if (txn.getCount() >= maxRetry ||
containerManager.getContainer(id).isOpen()) {
metrics.incrSkippedTransaction();
}
@@ -510,7 +506,7 @@ public void setScmCommandTimeoutMs(long
scmCommandTimeoutMs) {
public void recordTransactionCreated(DatanodeID dnId, long scmCmdId,
Set<Long> dnTxSet) {
getSCMDeletedBlockTransactionStatusManager()
- .recordTransactionCreated(dnId.getUuid(), scmCmdId, dnTxSet);
+ .recordTransactionCreated(dnId, scmCmdId, dnTxSet);
}
@Override
@@ -520,7 +516,7 @@ public int getTransactionToDNsCommitMapSize() {
@Override
public void onDatanodeDead(DatanodeID dnId) {
-
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId.getUuid());
+ getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
}
@Override
@@ -546,7 +542,7 @@ public void onMessage(
ContainerBlocksDeletionACKProto ackProto =
commandStatus.getBlockDeletionAck();
getSCMDeletedBlockTransactionStatusManager()
- .commitTransactions(ackProto.getResultsList(), dnId.getUuid());
+ .commitTransactions(ackProto.getResultsList(), dnId);
metrics.incrBlockDeletionCommandSuccess();
metrics.incrDNCommandsSuccess(dnId, 1);
} else if (status == CommandStatus.Status.FAILED) {
@@ -558,7 +554,7 @@ public void onMessage(
}
getSCMDeletedBlockTransactionStatusManager()
- .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(),
dnId.getUuid());
+ .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
} finally {
lock.unlock();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
index 4be3a1a574..f3ca531cc2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
@@ -27,7 +27,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -35,10 +34,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -58,7 +57,7 @@ public class SCMDeletedBlockTransactionStatusManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
// Maps txId to set of DNs which are successful in committing the transaction
- private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+ private final Map<Long, Set<DatanodeID>> transactionToDNsCommitMap;
// Maps txId to its retry counts;
private final Map<Long, Integer> transactionToRetryCountMap;
// The access to DeletedBlocksTXTable is protected by
@@ -100,11 +99,10 @@ public SCMDeletedBlockTransactionStatusManager(
protected static class SCMDeleteBlocksCommandStatusManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
- private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+ private final Map<DatanodeID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
- private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
- new HashSet<>(Arrays.asList(SENT));
+ private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
Collections.singleton(SENT);
public SCMDeleteBlocksCommandStatusManager() {
this.scmCmdStatusRecord = new ConcurrentHashMap<>();
@@ -128,14 +126,13 @@ public enum CmdStatus {
}
protected static final class CmdStatusData {
- private final UUID dnId;
+ private final DatanodeID dnId;
private final long scmCmdId;
private final Set<Long> deletedBlocksTxIds;
private Instant updateTime;
private CmdStatus status;
- private CmdStatusData(
- UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+ private CmdStatusData(DatanodeID dnId, long scmTxID, Set<Long>
deletedBlocksTxIds) {
this.dnId = dnId;
this.scmCmdId = scmTxID;
this.deletedBlocksTxIds = deletedBlocksTxIds;
@@ -146,7 +143,7 @@ public Set<Long> getDeletedBlocksTxIds() {
return Collections.unmodifiableSet(deletedBlocksTxIds);
}
- public UUID getDnId() {
+ DatanodeID getDnId() {
return dnId;
}
@@ -180,7 +177,7 @@ public String toString() {
}
protected static CmdStatusData createScmCmdStatusData(
- UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+ DatanodeID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
}
@@ -190,22 +187,22 @@ protected void recordScmCommand(CmdStatusData statusData)
{
new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
}
- protected void onSent(UUID dnId, long scmCmdId) {
+ void onSent(DatanodeID dnId, long scmCmdId) {
updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
}
- protected void onDatanodeDead(UUID dnId) {
+ void onDatanodeDead(DatanodeID dnId) {
LOG.info("Clean SCMCommand record for Datanode: {}", dnId);
scmCmdStatusRecord.remove(dnId);
}
- protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+ void updateStatusByDNCommandStatus(DatanodeID dnId, long scmCmdId,
CommandStatus.Status newState) {
updateStatus(dnId, scmCmdId, newState);
}
protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
- for (UUID dnId : scmCmdStatusRecord.keySet()) {
+ for (DatanodeID dnId : scmCmdStatusRecord.keySet()) {
for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
removeTimeoutScmCommand(
dnId, getScmCommandIds(dnId, status), timeoutMs);
@@ -213,14 +210,14 @@ protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
}
}
- public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+ void cleanTimeoutSCMCommand(DatanodeID dnId, long timeoutMs) {
for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
removeTimeoutScmCommand(
dnId, getScmCommandIds(dnId, status), timeoutMs);
}
}
- private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+ private Set<Long> getScmCommandIds(DatanodeID dnId, CmdStatus status) {
Set<Long> scmCmdIds = new HashSet<>();
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null) {
@@ -234,7 +231,7 @@ private Set<Long> getScmCommandIds(UUID dnId, CmdStatus
status) {
return scmCmdIds;
}
- private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+ private Instant getUpdateTime(DatanodeID dnId, long scmCmdId) {
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null || record.get(scmCmdId) == null) {
return null;
@@ -242,7 +239,7 @@ private Instant getUpdateTime(UUID dnId, long scmCmdId) {
return record.get(scmCmdId).getUpdateTime();
}
- private void updateStatus(UUID dnId, long scmCmdId,
+ private void updateStatus(DatanodeID dnId, long scmCmdId,
CommandStatus.Status newStatus) {
Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
if (recordForDn == null) {
@@ -309,7 +306,7 @@ private void updateStatus(UUID dnId, long scmCmdId,
}
}
- private void removeTimeoutScmCommand(UUID dnId,
+ private void removeTimeoutScmCommand(DatanodeID dnId,
Set<Long> scmCmdIds, long timeoutMs) {
Instant now = Instant.now();
for (Long scmCmdId : scmCmdIds) {
@@ -323,7 +320,7 @@ private void removeTimeoutScmCommand(UUID dnId,
}
}
- private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+ private CmdStatusData removeScmCommand(DatanodeID dnId, long scmCmdId) {
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null || record.get(scmCmdId) == null) {
return null;
@@ -333,12 +330,10 @@ private CmdStatusData removeScmCommand(UUID dnId, long
scmCmdId) {
return statusData;
}
- public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
- Set<UUID> dnIds) {
- Map<UUID, Map<Long, CmdStatus>> result =
- new HashMap<>(scmCmdStatusRecord.size());
+ Map<DatanodeID, Map<Long, CmdStatus>>
getCommandStatusByTxId(Set<DatanodeID> dnIds) {
+ final Map<DatanodeID, Map<Long, CmdStatus>> result = new
HashMap<>(scmCmdStatusRecord.size());
- for (UUID dnId : dnIds) {
+ for (DatanodeID dnId : dnIds) {
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null) {
continue;
@@ -361,7 +356,7 @@ private void clear() {
}
@VisibleForTesting
- Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+ Map<DatanodeID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
return scmCmdStatusRecord;
}
}
@@ -395,22 +390,19 @@ public void resetRetryCount(List<Long> txIDs) throws
IOException {
}
}
- public int getOrDefaultRetryCount(long txID, int defaultValue) {
- return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+ int getRetryCount(long txID) {
+ return transactionToRetryCountMap.getOrDefault(txID, 0);
}
public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
- scmDeleteBlocksCommandStatusManager.onSent(
- dnId.getUuid(), scmCommand.getId());
+ scmDeleteBlocksCommandStatusManager.onSent(dnId.getID(),
scmCommand.getId());
}
- public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
- Set<UUID> dnIds) {
+ Map<DatanodeID, Map<Long, CmdStatus>> getCommandStatusByTxId(Set<DatanodeID>
dnIds) {
return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
}
- public void recordTransactionCreated(
- UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
+ void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set<Long>
dnTxSet) {
scmDeleteBlocksCommandStatusManager.recordScmCommand(
SCMDeleteBlocksCommandStatusManager
.createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
@@ -428,21 +420,19 @@ public void cleanAllTimeoutSCMCommand(long timeoutMs) {
scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
}
- public void onDatanodeDead(UUID dnId) {
+ void onDatanodeDead(DatanodeID dnId) {
scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
}
- public boolean isDuplication(DatanodeDetails dnDetail, long tx,
- Map<UUID, Map<Long, CmdStatus>> commandStatus) {
- if (alreadyExecuted(dnDetail.getUuid(), tx)) {
+ boolean isDuplication(DatanodeID datanodeID, long tx, Map<DatanodeID,
Map<Long, CmdStatus>> commandStatus) {
+ if (alreadyExecuted(datanodeID, tx)) {
return true;
}
- return inProcessing(dnDetail.getUuid(), tx, commandStatus);
+ return inProcessing(datanodeID, tx, commandStatus);
}
- public boolean alreadyExecuted(UUID dnId, long txId) {
- Set<UUID> dnsWithTransactionCommitted =
- transactionToDNsCommitMap.get(txId);
+ private boolean alreadyExecuted(DatanodeID dnId, long txId) {
+ final Set<DatanodeID> dnsWithTransactionCommitted =
transactionToDNsCommitMap.get(txId);
return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted
.contains(dnId);
}
@@ -455,11 +445,9 @@ public boolean alreadyExecuted(UUID dnId, long txId) {
* @param dnId - ID of datanode which acknowledges the delete block command.
*/
@VisibleForTesting
- public void commitTransactions(
- List<DeleteBlockTransactionResult> transactionResults, UUID dnId) {
+ public void commitTransactions(List<DeleteBlockTransactionResult>
transactionResults, DatanodeID dnId) {
ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
- Set<UUID> dnsWithCommittedTxn;
for (DeleteBlockTransactionResult transactionResult :
transactionResults) {
if (isTransactionFailed(transactionResult)) {
@@ -470,7 +458,7 @@ public void commitTransactions(
metrics.incrBlockDeletionTransactionSuccess();
long txID = transactionResult.getTxID();
// set of dns which have successfully committed transaction txId.
- dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
+ final Set<DatanodeID> dnsWithCommittedTxn =
transactionToDNsCommitMap.get(txID);
final ContainerID containerId = ContainerID.valueOf(
transactionResult.getContainerID());
if (dnsWithCommittedTxn == null) {
@@ -494,9 +482,9 @@ public void commitTransactions(
// the nodes returned in the pipeline match the replication factor.
if (min(replicas.size(), dnsWithCommittedTxn.size())
>= container.getReplicationConfig().getRequiredNodes()) {
- List<UUID> containerDns = replicas.stream()
+ final List<DatanodeID> containerDns = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
- .map(DatanodeDetails::getUuid)
+ .map(DatanodeDetails::getID)
.collect(Collectors.toList());
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
@@ -526,34 +514,29 @@ public void commitTransactions(
}
@VisibleForTesting
- public void commitSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
- UUID dnId) {
+ void commitSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
DatanodeID dnId) {
processSCMCommandStatus(deleteBlockStatus, dnId);
scmDeleteBlocksCommandStatusManager.
cleanTimeoutSCMCommand(dnId, scmCommandTimeoutMs);
}
- private boolean inProcessing(UUID dnId, long deletedBlocksTxId,
- Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+ static boolean inProcessing(DatanodeID dnId, long deletedBlocksTxId,
+ Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
Map<Long, CmdStatus> deletedBlocksTxStatus = commandStatus.get(dnId);
return deletedBlocksTxStatus != null &&
deletedBlocksTxStatus.get(deletedBlocksTxId) != null;
}
- private void processSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
- UUID dnID) {
- Map<Long, CommandStatus> lastStatus = new HashMap<>();
+ private void processSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
DatanodeID dnID) {
Map<Long, CommandStatus.Status> summary = new HashMap<>();
-
// The CommandStatus is ordered in the report. So we can focus only on the
// last status in the command report.
deleteBlockStatus.forEach(cmdStatus -> {
- lastStatus.put(cmdStatus.getCmdId(), cmdStatus);
summary.put(cmdStatus.getCmdId(), cmdStatus.getStatus());
});
LOG.debug("CommandStatus {} from Datanode: {} ", summary, dnID);
- for (Map.Entry<Long, CommandStatus> entry : lastStatus.entrySet()) {
- CommandStatus.Status status = entry.getValue().getStatus();
+ for (Map.Entry<Long, CommandStatus.Status> entry : summary.entrySet()) {
+ final CommandStatus.Status status = entry.getValue();
scmDeleteBlocksCommandStatusManager.updateStatusByDNCommandStatus(
dnID, entry.getKey(), status);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java
index dad5ce8534..0735f6946a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java
@@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -81,9 +80,7 @@ protected int compareByUsage(DatanodeUsageInfo a,
DatanodeUsageInfo b) {
if (ret != 0) {
return ret;
}
- UUID uuidA = a.getDatanodeDetails().getUuid();
- UUID uuidB = b.getDatanodeDetails().getUuid();
- return uuidA.compareTo(uuidB);
+ return a.getDatanodeID().compareTo(b.getDatanodeID());
}
private void setConfiguration(ContainerBalancerConfiguration conf) {
@@ -228,8 +225,7 @@ public void increaseSizeEntering(DatanodeDetails target,
long size) {
}
return;
}
- logger.warn("Cannot find {} in the candidates target nodes",
- target.getUuid());
+ logger.warn("Cannot find {} in the candidates target nodes", target);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
index 6388684596..b641936bd8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
@@ -56,9 +55,7 @@ public class FindSourceGreedy implements FindSourceStrategy {
if (ret != 0) {
return ret;
}
- UUID uuidA = a.getDatanodeDetails().getUuid();
- UUID uuidB = b.getDatanodeDetails().getUuid();
- return uuidA.compareTo(uuidB);
+ return a.getDatanodeID().compareTo(b.getDatanodeID());
});
this.nodeManager = nodeManager;
}
@@ -110,8 +107,7 @@ public void increaseSizeLeaving(DatanodeDetails dui, long
size) {
addBackSourceDataNode(dui);
return;
}
- LOG.warn("Cannot find datanode {} in candidate source datanodes",
- dui.getUuid());
+ LOG.warn("Cannot find datanode {} in candidate source datanodes", dui);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java
index 03fbba3041..f24a56c111 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java
@@ -19,6 +19,7 @@
import java.util.Comparator;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeUsageInfoProto;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -128,6 +129,10 @@ public DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
}
+ public DatanodeID getDatanodeID() {
+ return datanodeDetails.getID();
+ }
+
/**
* Gets SCMNodeStat of this DatanodeUsageInfo.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 56a29536b4..2b7b082350 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -551,16 +551,15 @@ public void closeStalePipelines(DatanodeDetails
datanodeDetails) {
@VisibleForTesting
List<Pipeline> getStalePipelines(DatanodeDetails datanodeDetails) {
- List<Pipeline> pipelines = getPipelines();
- return pipelines.stream()
- .filter(p -> p.getNodes().stream()
- .anyMatch(n -> n.getUuid()
- .equals(datanodeDetails.getUuid())
- && (!n.getIpAddress()
- .equals(datanodeDetails.getIpAddress())
- || !n.getHostName()
- .equals(datanodeDetails.getHostName()))))
- .collect(Collectors.toList());
+ return getPipelines().stream()
+ .filter(p -> p.getNodes().stream().anyMatch(n ->
sameIdDifferentHostOrAddress(n, datanodeDetails)))
+ .collect(Collectors.toList());
+ }
+
+ static boolean sameIdDifferentHostOrAddress(DatanodeDetails left,
DatanodeDetails right) {
+ return left.getID().equals(right.getID())
+ && (!left.getIpAddress().equals(right.getIpAddress())
+ || !left.getHostName().equals(right.getHostName()));
}
/**
@@ -904,12 +903,8 @@ private void recordMetricsForPipeline(Pipeline pipeline) {
metrics.incNumPipelineContainSameDatanodes();
//TODO remove until pipeline allocation is proved equally distributed.
for (Pipeline overlapPipeline : overlapPipelines) {
- LOG.info("Pipeline: " + pipeline.getId().toString() +
- " contains same datanodes as previous pipelines: " +
- overlapPipeline.getId().toString() + " nodeIds: " +
- pipeline.getNodes().get(0).getUuid().toString() +
- ", " + pipeline.getNodes().get(1).getUuid().toString() +
- ", " + pipeline.getNodes().get(2).getUuid().toString());
+ LOG.info("{} and {} have exactly the same set of datanodes: {}",
+ pipeline.getId(), overlapPipeline.getId(),
pipeline.getNodeSet());
}
}
return;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java
index cb487ce8a3..1389b14ee3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java
@@ -18,9 +18,10 @@
package org.apache.hadoop.hdds.scm.safemode;
import java.util.HashSet;
-import java.util.UUID;
+import java.util.Set;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -41,7 +42,7 @@ public class DataNodeSafeModeRule extends
private int requiredDns;
private int registeredDns = 0;
// Set to track registered DataNodes.
- private HashSet<UUID> registeredDnSet;
+ private final Set<DatanodeID> registeredDnSet;
private NodeManager nodeManager;
public DataNodeSafeModeRule(EventQueue eventQueue,
@@ -72,7 +73,7 @@ protected boolean validate() {
@Override
protected void process(NodeRegistrationContainerReport reportsProto) {
- registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid());
+ registeredDnSet.add(reportsProto.getDatanodeDetails().getID());
registeredDns = registeredDnSet.size();
if (scmInSafeMode()) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index dc79a21f8f..1f73de568e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -187,9 +187,7 @@ public List<SCMCommand<?>>
dispatch(SCMHeartbeatRequestProto heartbeat) {
}
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Heartbeat dispatched: datanode=" + datanodeDetails.getUuid()
+ ", Commands= " + commands);
- }
+ LOG.debug("Heartbeat dispatched for datanode {} with commands {}",
datanodeDetails, commands);
return commands;
}
@@ -320,7 +318,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return this.getDatanodeDetails().getUuid().hashCode();
+ return this.getDatanodeDetails().getID().hashCode();
}
@Override
@@ -368,7 +366,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return this.getDatanodeDetails().getUuid().hashCode();
+ return this.getDatanodeDetails().getID().hashCode();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 140d45790f..59607591be 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -41,7 +41,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -100,8 +99,8 @@ public class TestDeletedBlockLog {
private StorageContainerManager scm;
private List<DatanodeDetails> dnList;
private SCMHADBTransactionBuffer scmHADBTransactionBuffer;
- private Map<Long, ContainerInfo> containers = new HashMap<>();
- private Map<Long, Set<ContainerReplica>> replicas = new HashMap<>();
+ private final Map<ContainerID, ContainerInfo> containers = new HashMap<>();
+ private final Map<ContainerID, Set<ContainerReplica>> replicas = new
HashMap<>();
private ScmBlockDeletingServiceMetrics metrics;
private static final int THREE = ReplicationFactor.THREE_VALUE;
private static final int ONE = ReplicationFactor.ONE_VALUE;
@@ -146,7 +145,7 @@ private void setupContainerManager() throws IOException {
when(containerManager.getContainerReplicas(any()))
.thenAnswer(invocationOnMock -> {
ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0];
- return replicas.get(cid.getId());
+ return replicas.get(cid);
});
when(containerManager.getContainer(any()))
.thenAnswer(invocationOnMock -> {
@@ -159,7 +158,7 @@ private void setupContainerManager() throws IOException {
Map<ContainerID, Long> map =
(Map<ContainerID, Long>) invocationOnMock.getArguments()[0];
for (Map.Entry<ContainerID, Long> e : map.entrySet()) {
- ContainerInfo info = containers.get(e.getKey().getId());
+ ContainerInfo info = containers.get(e.getKey());
try {
assertThat(e.getValue()).isGreaterThan(info.getDeleteTransactionId());
} catch (AssertionError err) {
@@ -191,9 +190,10 @@ private void updateContainerMetadata(long cid,
.setDatanodeDetails(datanodeDetails)
.build())
.collect(Collectors.toSet());
- containers.put(cid, container);
- containerTable.put(ContainerID.valueOf(cid), container);
- replicas.put(cid, replicaSet);
+ final ContainerID containerID = container.containerID();
+ containers.put(containerID, container);
+ containerTable.put(containerID, container);
+ replicas.put(containerID, replicaSet);
}
@AfterEach
@@ -226,24 +226,21 @@ private Map<Long, List<Long>> generateData(int dataSize,
}
private void addTransactions(Map<Long, List<Long>> containerBlocksMap,
- boolean shouldFlush)
- throws IOException, TimeoutException {
+ boolean shouldFlush) throws IOException {
deletedBlockLog.addTransactions(containerBlocksMap);
if (shouldFlush) {
scmHADBTransactionBuffer.flush();
}
}
- private void incrementCount(List<Long> txIDs)
- throws IOException, TimeoutException {
+ private void incrementCount(List<Long> txIDs) throws IOException {
deletedBlockLog.incrementCount(txIDs);
scmHADBTransactionBuffer.flush();
// mock scmHADBTransactionBuffer does not flush deletedBlockLog
deletedBlockLog.onFlush();
}
- private void resetCount(List<Long> txIDs)
- throws IOException, TimeoutException {
+ private void resetCount(List<Long> txIDs) throws IOException {
deletedBlockLog.resetCount(txIDs);
scmHADBTransactionBuffer.flush();
deletedBlockLog.onFlush();
@@ -254,7 +251,7 @@ private void commitTransactions(
DatanodeDetails... dns) throws IOException {
for (DatanodeDetails dnDetails : dns) {
deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
- .commitTransactions(transactionResults, dnDetails.getUuid());
+ .commitTransactions(transactionResults, dnDetails.getID());
}
scmHADBTransactionBuffer.flush();
}
@@ -294,7 +291,7 @@ private List<DeletedBlocksTransaction> getAllTransactions()
throws Exception {
}
private List<DeletedBlocksTransaction> getTransactions(
- int maximumAllowedBlocksNum) throws IOException, TimeoutException {
+ int maximumAllowedBlocksNum) throws IOException {
DatanodeDeletedBlockTransactions transactions =
deletedBlockLog.getTransactions(maximumAllowedBlocksNum, new
HashSet<>(dnList));
List<DeletedBlocksTransaction> txns = new LinkedList<>();
@@ -564,8 +561,7 @@ private void recordScmCommandToStatusManager(
}
private void sendSCMDeleteBlocksCommand(DatanodeID dnId, SCMCommand<?>
scmCommand) {
- deletedBlockLog.onSent(
- DatanodeDetails.newBuilder().setUuid(dnId.getUuid()).build(),
scmCommand);
+ deletedBlockLog.onSent(DatanodeDetails.newBuilder().setID(dnId).build(),
scmCommand);
}
private void assertNoDuplicateTransactions(
@@ -619,7 +615,7 @@ private void commitSCMCommandStatus(Long scmCmdId,
DatanodeID dnID,
.getProtoBufMessage());
deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
- .commitSCMCommandStatus(deleteBlockStatus, dnID.getUuid());
+ .commitSCMCommandStatus(deleteBlockStatus, dnID);
}
private void createDeleteBlocksCommandAndAction(
@@ -833,8 +829,7 @@ public void testPersistence() throws Exception {
}
@Test
- public void testDeletedBlockTransactions()
- throws IOException, TimeoutException {
+ public void testDeletedBlockTransactions() throws IOException {
deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
mockContainerHealthResult(true);
int txNum = 10;
@@ -888,8 +883,7 @@ public void testDeletedBlockTransactions()
}
@Test
- public void testDeletedBlockTransactionsOfDeletedContainer()
- throws IOException, TimeoutException {
+ public void testDeletedBlockTransactionsOfDeletedContainer() throws
IOException {
int txNum = 10;
List<DeletedBlocksTransaction> blocks;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
index dfb8ad83b5..a2f9699149 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
@@ -26,11 +26,12 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -41,8 +42,8 @@
public class TestSCMDeleteBlocksCommandStatusManager {
private SCMDeleteBlocksCommandStatusManager manager;
- private UUID dnId1;
- private UUID dnId2;
+ private DatanodeID dnId1;
+ private DatanodeID dnId2;
private long scmCmdId1;
private long scmCmdId2;
private long scmCmdId3;
@@ -56,8 +57,8 @@ public class TestSCMDeleteBlocksCommandStatusManager {
public void setup() throws Exception {
manager = new SCMDeleteBlocksCommandStatusManager();
// Create test data
- dnId1 = UUID.randomUUID();
- dnId2 = UUID.randomUUID();
+ dnId1 = DatanodeID.randomID();
+ dnId2 = DatanodeID.randomID();
scmCmdId1 = 1L;
scmCmdId2 = 2L;
scmCmdId3 = 3L;
@@ -208,10 +209,10 @@ public void testCleanAllTimeoutSCMCommand() {
// Transactions in states EXECUTED and NEED_RESEND will be cleaned up
// directly, while transactions in states PENDING_EXECUTED and SENT
// will be cleaned up after timeout
- recordAndSentCommand(manager, dnId1, Arrays.asList(scmCmdId1),
- Arrays.asList(deletedBlocksTxIds1));
- recordAndSentCommand(manager, dnId2, Arrays.asList(scmCmdId2),
- Arrays.asList(deletedBlocksTxIds2));
+ recordAndSentCommand(manager, dnId1, Collections.singletonList(scmCmdId1),
+ Collections.singletonList(deletedBlocksTxIds1));
+ recordAndSentCommand(manager, dnId2, Collections.singletonList(scmCmdId2),
+ Collections.singletonList(deletedBlocksTxIds2));
Map<Long, CmdStatusData> dn1StatusRecord =
manager.getScmCmdStatusRecord().get(dnId1);
@@ -238,7 +239,7 @@ public void testCleanAllTimeoutSCMCommand() {
private void recordAndSentCommand(
SCMDeleteBlocksCommandStatusManager statusManager,
- UUID dnId, List<Long> scmCmdIds, List<Set<Long>> txIds) {
+ DatanodeID dnId, List<Long> scmCmdIds, List<Set<Long>> txIds) {
assertEquals(scmCmdIds.size(), txIds.size());
for (int i = 0; i < scmCmdIds.size(); i++) {
long scmCmdId = scmCmdIds.get(i);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 8b20e279a0..c7b9aac971 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -59,9 +59,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -715,7 +713,7 @@ public void testAddContainerWithClosedPipeline() throws
Exception {
}
@Test
- public void testPipelineCloseFlow() throws IOException, TimeoutException {
+ public void testPipelineCloseFlow() throws IOException {
LogCapturer logCapturer =
LogCapturer.captureLogs(PipelineManagerImpl.class);
PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager.createPipeline(
@@ -754,17 +752,17 @@ public void testGetStalePipelines() throws IOException {
// For existing pipelines
List<Pipeline> pipelines = new ArrayList<>();
- UUID[] uuids = new UUID[3];
+ final DatanodeID[] ids = new DatanodeID[3];
String[] ipAddresses = new String[3];
String[] hostNames = new String[3];
for (int i = 0; i < 3; i++) {
- uuids[i] = UUID.randomUUID();
+ ids[i] = DatanodeID.randomID();
ipAddresses[i] = "1.2.3." + (i + 1);
hostNames[i] = "host" + i;
Pipeline pipeline = mock(Pipeline.class);
DatanodeDetails datanodeDetails = mock(DatanodeDetails.class);
- when(datanodeDetails.getUuid()).thenReturn(uuids[i]);
+ when(datanodeDetails.getID()).thenReturn(ids[i]);
when(datanodeDetails.getIpAddress()).thenReturn(ipAddresses[i]);
when(datanodeDetails.getHostName()).thenReturn(hostNames[i]);
List<DatanodeDetails> nodes = new ArrayList<>();
@@ -785,8 +783,8 @@ public void testGetStalePipelines() throws IOException {
// node with changed uuid
DatanodeDetails node0 = mock(DatanodeDetails.class);
- UUID changedUUID = UUID.randomUUID();
- when(node0.getUuid()).thenReturn(changedUUID);
+ DatanodeID changedUUID = DatanodeID.randomID();
+ when(node0.getID()).thenReturn(changedUUID);
when(node0.getIpAddress()).thenReturn(ipAddresses[0]);
when(node0.getHostName()).thenReturn(hostNames[0]);
@@ -795,7 +793,7 @@ public void testGetStalePipelines() throws IOException {
// node with changed IP
DatanodeDetails node1 = mock(DatanodeDetails.class);
- when(node1.getUuid()).thenReturn(uuids[0]);
+ when(node1.getID()).thenReturn(ids[0]);
when(node1.getIpAddress()).thenReturn("1.2.3.100");
when(node1.getHostName()).thenReturn(hostNames[0]);
@@ -807,7 +805,7 @@ public void testGetStalePipelines() throws IOException {
// node with changed host name
DatanodeDetails node2 = mock(DatanodeDetails.class);
- when(node2.getUuid()).thenReturn(uuids[0]);
+ when(node2.getID()).thenReturn(ids[0]);
when(node2.getIpAddress()).thenReturn(ipAddresses[0]);
when(node2.getHostName()).thenReturn("host100");
@@ -819,7 +817,7 @@ public void testGetStalePipelines() throws IOException {
}
@Test
- public void testCloseStalePipelines() throws IOException, TimeoutException {
+ public void testCloseStalePipelines() throws IOException {
SCMHADBTransactionBuffer buffer =
new SCMHADBTransactionBufferStub(dbStore);
PipelineManagerImpl pipelineManager =
@@ -842,8 +840,7 @@ public void testCloseStalePipelines() throws IOException,
TimeoutException {
}
@Test
- public void testWaitForAllocatedPipeline()
- throws IOException, TimeoutException {
+ public void testWaitForAllocatedPipeline() throws IOException {
SCMHADBTransactionBuffer buffer =
new SCMHADBTransactionBufferStub(dbStore);
PipelineManagerImpl pipelineManager =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]