This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3c0bafae58 HDDS-13001. Use DatanodeID in SCMBlockDeletingService.
(#8420)
3c0bafae58 is described below
commit 3c0bafae58d64ce8e53190148ad6c03f661232c0
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Sun May 11 06:28:46 2025 +0530
HDDS-13001. Use DatanodeID in SCMBlockDeletingService. (#8420)
---
.../apache/hadoop/hdds/protocol/DatanodeID.java | 4 ++-
.../protocol/commands/CommandForDatanode.java | 12 +++----
.../block/DatanodeDeletedBlockTransactions.java | 10 +++---
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 6 ++--
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 17 ++++-----
.../hdds/scm/block/SCMBlockDeletingService.java | 6 ++--
.../scm/block/ScmBlockDeletingServiceMetrics.java | 12 +++----
.../scm/container/CloseContainerEventHandler.java | 2 +-
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 2 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 2 +-
.../hdds/scm/pipeline/PipelineActionHandler.java | 2 +-
.../hdds/scm/pipeline/PipelineReportHandler.java | 2 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 4 +--
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 40 +++++++++++-----------
.../scm/block/TestSCMBlockDeletingService.java | 6 ++--
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 4 +--
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 8 ++---
17 files changed, 71 insertions(+), 68 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeID.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeID.java
index ba6b685fba..53a6b7b4df 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeID.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeID.java
@@ -118,7 +118,9 @@ private static HddsProtos.UUID toProto(final UUID id) {
}
// TODO: Remove this in follow-up Jira. (HDDS-12015)
- UUID getUuid() {
+ // Exposing this temporarily to help with refactoring.
+ @Deprecated
+ public UUID getUuid() {
return uuid;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
index fa2173218e..0326df66a9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
@@ -20,6 +20,7 @@
import com.google.protobuf.Message;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
/**
@@ -28,22 +29,21 @@
public class CommandForDatanode<T extends Message> implements
IdentifiableEventPayload {
- private final UUID datanodeId;
-
+ private final DatanodeID datanodeId;
private final SCMCommand<T> command;
public CommandForDatanode(DatanodeDetails datanode, SCMCommand<T> command) {
- this(datanode.getUuid(), command);
+ this(datanode.getID(), command);
}
- // TODO: Command for datanode should take DatanodeDetails as parameter.
- public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) {
+ public CommandForDatanode(DatanodeID datanodeId, SCMCommand<T> command) {
this.datanodeId = datanodeId;
this.command = command;
}
+ @Deprecated
public UUID getDatanodeId() {
- return datanodeId;
+ return datanodeId.getUuid();
}
public SCMCommand<T> getCommand() {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 4547556c4e..989971492a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -22,8 +22,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
/**
@@ -32,7 +32,7 @@
*/
class DatanodeDeletedBlockTransactions {
// A list of TXs mapped to a certain datanode ID.
- private final Map<UUID, List<DeletedBlocksTransaction>> transactions =
+ private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions =
new HashMap<>();
// counts blocks deleted across datanodes. Blocks deleted will be counted
// for all the replicas and may not be unique.
@@ -41,7 +41,7 @@ class DatanodeDeletedBlockTransactions {
DatanodeDeletedBlockTransactions() {
}
- void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
+ void addTransactionToDN(DatanodeID dnID, DeletedBlocksTransaction tx) {
transactions.computeIfAbsent(dnID, k -> new LinkedList<>()).add(tx);
blocksDeleted += tx.getLocalIDCount();
if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
@@ -51,7 +51,7 @@ void addTransactionToDN(UUID dnID, DeletedBlocksTransaction
tx) {
}
}
- Map<UUID, List<DeletedBlocksTransaction>> getDatanodeTransactionMap() {
+ Map<DatanodeID, List<DeletedBlocksTransaction>> getDatanodeTransactionMap() {
return transactions;
}
@@ -59,7 +59,7 @@ int getBlocksDeleted() {
return blocksDeleted;
}
- List<String> getTransactionIDList(UUID dnId) {
+ List<String> getTransactionIDList(DatanodeID dnId) {
return Optional.ofNullable(transactions.get(dnId))
.orElse(new LinkedList<>())
.stream()
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 1ac97dae3b..21e4d1b7c5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -22,8 +22,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -91,7 +91,7 @@ void incrementCount(List<Long> txIDs)
* @param dnTxSet Set of transaction IDs for the DataNode.
*/
void recordTransactionCreated(
- UUID dnId, long scmCmdId, Set<Long> dnTxSet);
+ DatanodeID dnId, long scmCmdId, Set<Long> dnTxSet);
/**
* Handles the cleanup process when a DataNode is reported dead. This method
@@ -100,7 +100,7 @@ void recordTransactionCreated(
*
* @param dnId The identifier of the dead DataNode.
*/
- void onDatanodeDead(UUID dnId);
+ void onDatanodeDead(DatanodeID dnId);
/**
* Records the event of sending a block deletion command to a DataNode. This
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 0fa5a09fe2..8c39de41d1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -342,7 +343,7 @@ private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDetails details = replica.getDatanodeDetails();
if (!transactionStatusManager.isDuplication(
details, updatedTxn.getTxID(), commandStatus)) {
- transactions.addTransactionToDN(details.getUuid(), updatedTxn);
+ transactions.addTransactionToDN(details.getID(), updatedTxn);
metrics.incrProcessedTransaction();
}
}
@@ -506,10 +507,10 @@ public void setScmCommandTimeoutMs(long
scmCommandTimeoutMs) {
}
@Override
- public void recordTransactionCreated(UUID dnId, long scmCmdId,
+ public void recordTransactionCreated(DatanodeID dnId, long scmCmdId,
Set<Long> dnTxSet) {
getSCMDeletedBlockTransactionStatusManager()
- .recordTransactionCreated(dnId, scmCmdId, dnTxSet);
+ .recordTransactionCreated(dnId.getUuid(), scmCmdId, dnTxSet);
}
@Override
@@ -518,8 +519,8 @@ public int getTransactionToDNsCommitMapSize() {
}
@Override
- public void onDatanodeDead(UUID dnId) {
- getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
+ public void onDatanodeDead(DatanodeID dnId) {
+
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId.getUuid());
}
@Override
@@ -536,7 +537,7 @@ public void onMessage(
}
DatanodeDetails details = deleteBlockStatus.getDatanodeDetails();
- UUID dnId = details.getUuid();
+ DatanodeID dnId = details.getID();
for (CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) {
CommandStatus.Status status = commandStatus.getStatus();
lock.lock();
@@ -545,7 +546,7 @@ public void onMessage(
ContainerBlocksDeletionACKProto ackProto =
commandStatus.getBlockDeletionAck();
getSCMDeletedBlockTransactionStatusManager()
- .commitTransactions(ackProto.getResultsList(), dnId);
+ .commitTransactions(ackProto.getResultsList(), dnId.getUuid());
metrics.incrBlockDeletionCommandSuccess();
metrics.incrDNCommandsSuccess(dnId, 1);
} else if (status == CommandStatus.Status.FAILED) {
@@ -557,7 +558,7 @@ public void onMessage(
}
getSCMDeletedBlockTransactionStatusManager()
- .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
+ .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(),
dnId.getUuid());
} finally {
lock.unlock();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 6d80dbf0b5..81b40729c4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -38,6 +37,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.ScmConfig;
@@ -183,9 +183,9 @@ public EmptyTaskResult call() throws Exception {
}
Set<Long> processedTxIDs = new HashSet<>();
- for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
transactions.getDatanodeTransactionMap().entrySet()) {
- UUID dnId = entry.getKey();
+ DatanodeID dnId = entry.getKey();
List<DeletedBlocksTransaction> dnTXs = entry.getValue();
if (!dnTXs.isEmpty()) {
Set<Long> dnTxSet = dnTXs.stream()
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java
index 20293bbd4a..495d9bdf10 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.hdds.scm.block;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -95,7 +95,7 @@ public final class ScmBlockDeletingServiceMetrics implements
MetricsSource {
@Metric(about = "The number of dataNodes of delete transactions.")
private MutableGaugeLong numBlockDeletionTransactionDataNodes;
- private final Map<UUID, DatanodeCommandCounts> numCommandsDatanode = new
ConcurrentHashMap<>();
+ private final Map<DatanodeID, DatanodeCommandCounts> numCommandsDatanode =
new ConcurrentHashMap<>();
private ScmBlockDeletingServiceMetrics() {
this.registry = new MetricsRegistry(SOURCE_NAME);
@@ -164,17 +164,17 @@ public void setNumBlockDeletionTransactionDataNodes(long
dataNodes) {
this.numBlockDeletionTransactionDataNodes.set(dataNodes);
}
- public void incrDNCommandsSent(UUID id, long delta) {
+ public void incrDNCommandsSent(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsSent(delta);
}
- public void incrDNCommandsSuccess(UUID id, long delta) {
+ public void incrDNCommandsSuccess(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsSuccess(delta);
}
- public void incrDNCommandsFailure(UUID id, long delta) {
+ public void incrDNCommandsFailure(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsFailure(delta);
}
@@ -239,7 +239,7 @@ public void getMetrics(MetricsCollector metricsCollector,
boolean all) {
numBlockDeletionTransactionDataNodes.snapshot(builder, all);
MetricsRecordBuilder recordBuilder = builder;
- for (Map.Entry<UUID, DatanodeCommandCounts> e :
numCommandsDatanode.entrySet()) {
+ for (Map.Entry<DatanodeID, DatanodeCommandCounts> e :
numCommandsDatanode.entrySet()) {
recordBuilder = recordBuilder.endRecord().addRecord(SOURCE_NAME)
.add(new MetricsTag(Interns.info("datanode",
"Datanode host for deletion commands"), e.getKey().toString()))
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 716762ba73..e21bcc7df2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -158,7 +158,7 @@ private Void triggerCloseCallback(
throws ContainerNotFoundException {
getNodes(container).forEach(node ->
publisher.fireEvent(DATANODE_COMMAND,
- new CommandForDatanode<>(node.getUuid(), command)));
+ new CommandForDatanode<>(node, command)));
return null;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 9f69d9456d..bd6ee9503b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -116,7 +116,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
// remove DeleteBlocksCommand associated with the dead node unless it
// is IN_MAINTENANCE
if (deletedBlockLog != null && !isNodeInMaintenance) {
- deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
+ deletedBlockLog.onDatanodeDead(datanodeDetails.getID());
}
//move dead datanode out of ClusterNetworkTopology
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c269cf0144..edf934084d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -781,7 +781,7 @@ protected void
sendFinalizeToDatanodeIfNeeded(DatanodeDetails datanodeDetails,
// Send Finalize command to the data node. Its OK to
// send Finalize command multiple times.
scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(datanodeDetails.getUuid(),
+ new CommandForDatanode<>(datanodeDetails,
finalizeCmd));
} catch (NotLeaderException ex) {
LOG.warn("Skip sending finalize upgrade command since current SCM"
+
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 94a17d5989..9d2ced1bea 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -119,7 +119,7 @@ private void closeUnknownPipeline(final EventPublisher
publisher,
SCMCommand<?> command = new ClosePipelineCommand(pid);
command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(datanode.getUuid(), command));
+ new CommandForDatanode<>(datanode, command));
} catch (NotLeaderException nle) {
LOG.info("Cannot process Pipeline Action for pipeline {} as " +
"current SCM is not leader anymore.", pid);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 7e3bf8d5b1..91c58c754c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -101,7 +101,7 @@ private void handlePipelineNotFoundException(final
PipelineReport report,
final SCMCommand<?> command = new ClosePipelineCommand(pipelineID);
command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(dn.getUuid(), command));
+ new CommandForDatanode<>(dn, command));
} catch (NotLeaderException ex) {
// Do nothing if the leader has changed.
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 5b80743eb9..491e4d15ad 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -208,7 +208,7 @@ public synchronized Pipeline create(RatisReplicationConfig
replicationConfig,
LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}",
pipeline.getId(), node);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(node.getUuid(), createCommand));
+ new CommandForDatanode<>(node, createCommand));
});
return pipeline;
@@ -269,7 +269,7 @@ public void close(Pipeline pipeline) throws
NotLeaderException {
closeCommand.setTerm(scmContext.getTermOfLeader());
pipeline.getNodes().forEach(node -> {
final CommandForDatanode<?> datanodeCommand =
- new CommandForDatanode<>(node.getUuid(), closeCommand);
+ new CommandForDatanode<>(node, closeCommand);
LOG.info("Send pipeline:{} close command to datanode {}",
pipeline.getId(), datanodeCommand.getDatanodeId());
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index f2289ecb2f..140d45790f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -295,16 +296,15 @@ private List<DeletedBlocksTransaction>
getAllTransactions() throws Exception {
private List<DeletedBlocksTransaction> getTransactions(
int maximumAllowedBlocksNum) throws IOException, TimeoutException {
DatanodeDeletedBlockTransactions transactions =
- deletedBlockLog.getTransactions(maximumAllowedBlocksNum,
- dnList.stream().collect(Collectors.toSet()));
+ deletedBlockLog.getTransactions(maximumAllowedBlocksNum, new
HashSet<>(dnList));
List<DeletedBlocksTransaction> txns = new LinkedList<>();
for (DatanodeDetails dn : dnList) {
txns.addAll(Optional.ofNullable(
- transactions.getDatanodeTransactionMap().get(dn.getUuid()))
+ transactions.getDatanodeTransactionMap().get(dn.getID()))
.orElseGet(LinkedList::new));
}
// Simulated transactions are sent
- for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
transactions.getDatanodeTransactionMap().entrySet()) {
DeleteBlocksCommand command = new DeleteBlocksCommand(entry.getValue());
recordScmCommandToStatusManager(entry.getKey(), command);
@@ -556,29 +556,29 @@ public void testCommitTransactions() throws Exception {
}
private void recordScmCommandToStatusManager(
- UUID dnId, DeleteBlocksCommand command) {
+ DatanodeID dnId, DeleteBlocksCommand command) {
Set<Long> dnTxSet = command.blocksTobeDeleted()
.stream().map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toSet());
deletedBlockLog.recordTransactionCreated(dnId, command.getId(), dnTxSet);
}
- private void sendSCMDeleteBlocksCommand(UUID dnId, SCMCommand<?> scmCommand)
{
+ private void sendSCMDeleteBlocksCommand(DatanodeID dnId, SCMCommand<?>
scmCommand) {
deletedBlockLog.onSent(
- DatanodeDetails.newBuilder().setUuid(dnId).build(), scmCommand);
+ DatanodeDetails.newBuilder().setUuid(dnId.getUuid()).build(),
scmCommand);
}
private void assertNoDuplicateTransactions(
DatanodeDeletedBlockTransactions transactions1,
DatanodeDeletedBlockTransactions transactions2) {
- Map<UUID, List<DeletedBlocksTransaction>> map1 =
+ Map<DatanodeID, List<DeletedBlocksTransaction>> map1 =
transactions1.getDatanodeTransactionMap();
- Map<UUID, List<DeletedBlocksTransaction>> map2 =
+ Map<DatanodeID, List<DeletedBlocksTransaction>> map2 =
transactions2.getDatanodeTransactionMap();
- for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
map1.entrySet()) {
- UUID dnId = entry.getKey();
+ DatanodeID dnId = entry.getKey();
Set<DeletedBlocksTransaction> txSet1 = new HashSet<>(entry.getValue());
Set<DeletedBlocksTransaction> txSet2 = new HashSet<>(map2.get(dnId));
@@ -592,14 +592,14 @@ private void assertNoDuplicateTransactions(
private void assertContainsAllTransactions(
DatanodeDeletedBlockTransactions transactions1,
DatanodeDeletedBlockTransactions transactions2) {
- Map<UUID, List<DeletedBlocksTransaction>> map1 =
+ Map<DatanodeID, List<DeletedBlocksTransaction>> map1 =
transactions1.getDatanodeTransactionMap();
- Map<UUID, List<DeletedBlocksTransaction>> map2 =
+ Map<DatanodeID, List<DeletedBlocksTransaction>> map2 =
transactions2.getDatanodeTransactionMap();
- for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
map1.entrySet()) {
- UUID dnId = entry.getKey();
+ DatanodeID dnId = entry.getKey();
Set<DeletedBlocksTransaction> txSet1 = new HashSet<>(entry.getValue());
Set<DeletedBlocksTransaction> txSet2 = new HashSet<>(map2.get(dnId));
@@ -607,7 +607,7 @@ private void assertContainsAllTransactions(
}
}
- private void commitSCMCommandStatus(Long scmCmdId, UUID dnID,
+ private void commitSCMCommandStatus(Long scmCmdId, DatanodeID dnID,
StorageContainerDatanodeProtocolProtos.CommandStatus.Status status) {
List<StorageContainerDatanodeProtocolProtos
.CommandStatus> deleteBlockStatus = new ArrayList<>();
@@ -619,15 +619,15 @@ private void commitSCMCommandStatus(Long scmCmdId, UUID
dnID,
.getProtoBufMessage());
deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
- .commitSCMCommandStatus(deleteBlockStatus, dnID);
+ .commitSCMCommandStatus(deleteBlockStatus, dnID.getUuid());
}
private void createDeleteBlocksCommandAndAction(
DatanodeDeletedBlockTransactions transactions,
- BiConsumer<UUID, DeleteBlocksCommand> afterCreate) {
- for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ BiConsumer<DatanodeID, DeleteBlocksCommand> afterCreate) {
+ for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
transactions.getDatanodeTransactionMap().entrySet()) {
- UUID dnId = entry.getKey();
+ DatanodeID dnId = entry.getKey();
List<DeletedBlocksTransaction> dnTXs = entry.getValue();
DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
afterCreate.accept(dnId, command);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
index 5b77ef8dbd..ae07a9c3ce 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
@@ -88,9 +88,9 @@ public void setup() throws Exception {
when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn(
datanodeDetails);
DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L),
1);
- ddbt.addTransactionToDN(datanode1.getUuid(), tx1);
- ddbt.addTransactionToDN(datanode2.getUuid(), tx1);
- ddbt.addTransactionToDN(datanode3.getUuid(), tx1);
+ ddbt.addTransactionToDN(datanode1.getID(), tx1);
+ ddbt.addTransactionToDN(datanode2.getID(), tx1);
+ ddbt.addTransactionToDN(datanode3.getID(), tx1);
DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class);
when(mockDeletedBlockLog.getTransactions(
anyInt(), anySet())).thenReturn(ddbt);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index c957d0497a..f7ffb6e0ff 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -237,7 +237,7 @@ public void testOnMessage(@TempDir File tempDir) throws
Exception {
clearInvocations(publisher);
verify(deletedBlockLog, times(0))
- .onDatanodeDead(datanode1.getUuid());
+ .onDatanodeDead(datanode1.getID());
Set<ContainerReplica> container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
@@ -266,7 +266,7 @@ public void testOnMessage(@TempDir File tempDir) throws
Exception {
assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(),
cmd.getType()));
verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY,
datanode1);
- verify(deletedBlockLog).onDatanodeDead(datanode1.getUuid());
+ verify(deletedBlockLog).onDatanodeDead(datanode1.getID());
container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index bd10fb3da0..9e9336767d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -1070,11 +1070,11 @@ public void testCommandCount()
HddsProtos.ReplicationFactor.THREE, emptyList());
nodeManager.onMessage(
- new CommandForDatanode<>(datanode1, closeContainerCommand), null);
+ new CommandForDatanode<>(DatanodeID.of(datanode1),
closeContainerCommand), null);
nodeManager.onMessage(
- new CommandForDatanode<>(datanode1, closeContainerCommand), null);
+ new CommandForDatanode<>(DatanodeID.of(datanode1),
closeContainerCommand), null);
nodeManager.onMessage(
- new CommandForDatanode<>(datanode1, createPipelineCommand), null);
+ new CommandForDatanode<>(DatanodeID.of(datanode1),
createPipelineCommand), null);
assertEquals(2, nodeManager.getCommandQueueCount(
datanode1, SCMCommandProto.Type.closeContainerCommand));
@@ -1773,7 +1773,7 @@ public void testHandlingSCMCommandEvent()
Arrays.asList(report), emptyList()),
HddsTestUtils.getRandomPipelineReports());
eq.fireEvent(DATANODE_COMMAND,
- new CommandForDatanode<>(datanodeDetails.getUuid(),
+ new CommandForDatanode<>(datanodeDetails,
new CloseContainerCommand(1L,
PipelineID.randomId())));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]