This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bb7cebcdda HDDS-13437. Avoid scheduling replications on full datanodes
by tracking pending op size in SCM (#8813)
bb7cebcdda is described below
commit bb7cebcdda93978f8a5c14da4ee6816b8a912f20
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Mon Jul 28 19:39:38 2025 +0530
HDDS-13437. Avoid scheduling replications on full datanodes by tracking
pending op size in SCM (#8813)
---
.../container/replication/ContainerReplicaOp.java | 18 +-
.../replication/ContainerReplicaPendingOps.java | 98 +++++++++-
.../container/replication/ReplicationManager.java | 26 +--
.../replication/ReplicationManagerUtil.java | 51 ++++++
.../hdds/scm/server/StorageContainerManager.java | 7 +-
.../TestContainerReplicaPendingOps.java | 197 +++++++++++++++++----
.../replication/TestECUnderReplicationHandler.java | 1 +
.../replication/TestMisReplicationHandler.java | 1 +
...estQuasiClosedStuckUnderReplicationHandler.java | 1 +
.../TestRatisUnderReplicationHandler.java | 1 +
.../replication/TestReplicationManager.java | 6 +-
.../TestReplicationManagerScenarios.java | 3 +-
.../replication/TestReplicationManagerUtil.java | 122 +++++++++++++
.../scm/ReconStorageContainerManagerFacade.java | 3 +-
14 files changed, 481 insertions(+), 54 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
index 3c3e6fd712..2be951a181 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -30,19 +30,29 @@ public class ContainerReplicaOp {
private final int replicaIndex;
private final SCMCommand<?> command;
private final long deadlineEpochMillis;
+ private final long containerSize;
public static ContainerReplicaOp create(PendingOpType opType,
DatanodeDetails target, int replicaIndex) {
- return new ContainerReplicaOp(opType, target, replicaIndex, null,
System.currentTimeMillis());
+ return new ContainerReplicaOp(opType, target, replicaIndex, null,
+ System.currentTimeMillis(), 0);
}
public ContainerReplicaOp(PendingOpType opType,
- DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis) {
+ DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
+ long deadlineEpochMillis) {
+ this(opType, target, replicaIndex, command, deadlineEpochMillis, 0);
+ }
+
+ public ContainerReplicaOp(PendingOpType opType,
+ DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
+ long deadlineEpochMillis, long containerSize) {
this.opType = opType;
this.target = target;
this.replicaIndex = replicaIndex;
this.command = command;
this.deadlineEpochMillis = deadlineEpochMillis;
+ this.containerSize = containerSize;
}
public PendingOpType getOpType() {
@@ -65,6 +75,10 @@ public long getDeadlineEpochMillis() {
return deadlineEpochMillis;
}
+ public long getContainerSize() {
+ return containerSize;
+ }
+
/**
* Enum representing different types of pending Ops.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
index d2d4389d73..8b1766cd97 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
@@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -59,12 +60,49 @@ public class ContainerReplicaPendingOps {
private ReplicationManagerMetrics replicationMetrics = null;
private final List<ContainerReplicaPendingOpsSubscriber> subscribers =
new ArrayList<>();
+ // tracks how much data is pending to be added to a target Datanode because
of pending ADD ops
+ private final ConcurrentHashMap<DatanodeID, SizeAndTime>
containerSizeScheduled = new ConcurrentHashMap<>();
+ private ReplicationManager.ReplicationManagerConfiguration rmConf;
public ContainerReplicaPendingOps(Clock clock) {
this.clock = clock;
resetCounters();
}
+ public ContainerReplicaPendingOps(Clock clock,
ReplicationManager.ReplicationManagerConfiguration rmConf) {
+ this(clock);
+ this.rmConf = rmConf;
+ }
+
+ /**
+ * Used as the value of {@link
ContainerReplicaPendingOps#containerSizeScheduled} map for tracking the size of
+ * containers with pending ADD ops. Immutable.
+ */
+ public static class SizeAndTime {
+ // number of bytes pending ADD on a target DN
+ private final long size;
+ // timestamp (milliseconds since epoch) when the latest op was scheduled
for this DN
+ private final long lastUpdatedTime;
+
+ public SizeAndTime(long size, long lastUpdatedTime) {
+ this.size = size;
+ this.lastUpdatedTime = lastUpdatedTime;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getLastUpdatedTime() {
+ return lastUpdatedTime;
+ }
+
+ @Override
+ public String toString() {
+ return "Size: " + size + ", lastUpdatedTime: " + lastUpdatedTime;
+ }
+ }
+
private void resetCounters() {
for (PendingOpType opType: PendingOpType.values()) {
AtomicLong[] counters = new AtomicLong[2];
@@ -85,6 +123,7 @@ public void clear() {
try {
pendingOps.clear();
resetCounters();
+ containerSizeScheduled.clear();
} finally {
globalLock.writeLock().unlock();
}
@@ -123,8 +162,10 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID
containerID) {
* be discarded.
*/
public void scheduleAddReplica(ContainerID containerID,
- DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis) {
- addReplica(ADD, containerID, target, replicaIndex, command,
deadlineEpochMillis);
+ DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis, long containerSize,
+ long scheduledEpochMillis) {
+ addReplica(ADD, containerID, target, replicaIndex, command,
deadlineEpochMillis, containerSize,
+ scheduledEpochMillis);
}
/**
@@ -139,7 +180,7 @@ public void scheduleAddReplica(ContainerID containerID,
*/
public void scheduleDeleteReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis) {
- addReplica(DELETE, containerID, target, replicaIndex, command,
deadlineEpochMillis);
+ addReplica(DELETE, containerID, target, replicaIndex, command,
deadlineEpochMillis, 0L, clock.millis());
}
/**
@@ -226,6 +267,9 @@ public void removeExpiredEntries() {
// For delete ops, we don't remove them from the list as RM must
resend them, or they
// will be removed via a container report when they are
confirmed as deleted.
iterator.remove();
+ if (op.getOpType() == ADD) {
+ releaseScheduledContainerSize(op);
+ }
decrementCounter(op.getOpType(), op.getReplicaIndex());
}
expiredOps.add(op);
@@ -246,6 +290,22 @@ public void removeExpiredEntries() {
}
}
+ private void releaseScheduledContainerSize(ContainerReplicaOp op) {
+ containerSizeScheduled.computeIfPresent(op.getTarget().getID(), (k, v) -> {
+ long newSize = v.getSize() - op.getContainerSize();
+ boolean isSizeNonPositive = newSize <= 0;
+ boolean hasOpExpired = clock.millis() - v.getLastUpdatedTime() >
rmConf.getEventTimeout();
+ if (isSizeNonPositive || hasOpExpired) {
+ /*
+ If the scheduled size is now less than or equal to 0, or if the last
op has expired, implying that the ops
+ before it must have completed or expired, then remove this entry from
the map
+ */
+ return null;
+ }
+ return new SizeAndTime(newSize, v.getLastUpdatedTime());
+ });
+ }
+
private void updateTimeoutMetrics(ContainerReplicaOp op) {
if (op.getOpType() == ADD && isMetricsNotNull()) {
if (isEC(op.getReplicaIndex())) {
@@ -262,9 +322,10 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
}
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
private void addReplica(ContainerReplicaOp.PendingOpType opType,
ContainerID containerID, DatanodeDetails target, int replicaIndex,
SCMCommand<?> command,
- long deadlineEpochMillis) {
+ long deadlineEpochMillis, long containerSize, long scheduledEpochMillis)
{
Lock lock = writeLock(containerID);
lock(lock);
try {
@@ -274,7 +335,17 @@ private void addReplica(ContainerReplicaOp.PendingOpType
opType,
List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
containerID, s -> new ArrayList<>());
ops.add(new ContainerReplicaOp(opType,
- target, replicaIndex, command, deadlineEpochMillis));
+ target, replicaIndex, command, deadlineEpochMillis, containerSize));
+ DatanodeID id = target.getID();
+ if (opType == ADD) {
+ containerSizeScheduled.compute(id, (k, v) -> {
+ if (v == null) {
+ return new SizeAndTime(containerSize, scheduledEpochMillis);
+ } else {
+ return new SizeAndTime(v.getSize() + containerSize,
scheduledEpochMillis);
+ }
+ });
+ }
incrementCounter(opType, replicaIndex);
} finally {
unlock(lock);
@@ -300,6 +371,15 @@ private boolean
completeOp(ContainerReplicaOp.PendingOpType opType,
found = true;
completedOps.add(op);
iterator.remove();
+ if (opType == ADD) {
+ containerSizeScheduled.computeIfPresent(target.getID(), (k, v)
-> {
+ long newSize = v.getSize() - op.getContainerSize();
+ if (newSize <= 0) {
+ return null;
+ }
+ return new SizeAndTime(newSize, v.getLastUpdatedTime());
+ });
+ }
decrementCounter(op.getOpType(), replicaIndex);
}
}
@@ -364,6 +444,14 @@ private void unlock(Lock lock) {
lock.unlock();
}
+ public ConcurrentHashMap<DatanodeID, SizeAndTime>
getContainerSizeScheduled() {
+ return containerSizeScheduled;
+ }
+
+ public Clock getClock() {
+ return clock;
+ }
+
private boolean isMetricsNotNull() {
return replicationMetrics != null;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 89259cec08..33ab37a938 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -84,6 +84,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -205,7 +206,8 @@ public class ReplicationManager implements SCMService,
ContainerReplicaPendingOp
* @param replicaPendingOps The pendingOps instance
*/
@SuppressWarnings("parameternumber")
- public ReplicationManager(final ConfigurationSource conf,
+ public ReplicationManager(final ReplicationManagerConfiguration rmConf,
+ final ConfigurationSource conf,
final ContainerManager containerManager,
final PlacementPolicy ratisContainerPlacement,
final PlacementPolicy ecContainerPlacement,
@@ -217,7 +219,7 @@ public ReplicationManager(final ConfigurationSource conf,
throws IOException {
this.containerManager = containerManager;
this.scmContext = scmContext;
- this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
+ this.rmConf = rmConf;
this.replicationServerConf =
conf.getObject(ReplicationServer.ReplicationConfig.class);
this.running = false;
@@ -688,13 +690,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo
containerInfo,
ReconstructECContainersCommand rcc = (ReconstructECContainersCommand)
cmd;
List<DatanodeDetails> targets = rcc.getTargetDatanodes();
final ByteString targetIndexes = rcc.getMissingContainerIndexes();
+ long requiredSize =
HddsServerUtil.requiredReplicationSpace(containerInfo.getUsedBytes());
for (int i = 0; i < targetIndexes.size(); i++) {
- containerReplicaPendingOps.scheduleAddReplica(
- containerInfo.containerID(), targets.get(i),
targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs);
+
containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(),
targets.get(i),
+ targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs, requiredSize,
clock.millis());
}
getMetrics().incrEcReconstructionCmdsSentTotal();
} else if (cmd.getType() == Type.replicateContainerCommand) {
ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd;
+ long requiredSize =
HddsServerUtil.requiredReplicationSpace(containerInfo.getUsedBytes());
if (rcc.getTargetDatanode() == null) {
/*
@@ -702,17 +706,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo
containerInfo,
op's target Datanode should be the Datanode this command is being
sent to.
*/
- containerReplicaPendingOps.scheduleAddReplica(
- containerInfo.containerID(),
- targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
+
containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(),
targetDatanode,
+ rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs, requiredSize,
clock.millis());
} else {
/*
This means the source will push replica to the target, so the op's
target Datanode should be the Datanode the replica will be pushed to.
*/
- containerReplicaPendingOps.scheduleAddReplica(
- containerInfo.containerID(),
- rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd,
scmDeadlineEpochMs);
+
containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(),
rcc.getTargetDatanode(),
+ rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs, requiredSize,
clock.millis());
}
if (rcc.getReplicaIndex() > 0) {
@@ -1484,6 +1486,10 @@ static NodeStatus getNodeStatus(DatanodeDetails dn,
NodeManager nm) {
}
}
+ public NodeManager getNodeManager() {
+ return nodeManager;
+ }
+
private int getRemainingMaintenanceRedundancy(boolean isEC) {
return isEC
? rmConf.getMaintenanceRemainingRedundancy()
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index 267da52173..8b2ad75824 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -34,7 +35,10 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
@@ -195,9 +199,56 @@ An UNHEALTHY replica with unique origin node id of a
QUASI_CLOSED container shou
excludedNodes.add(pending.getTarget());
}
}
+ excludeFullNodes(replicationManager, container, excludedNodes);
return new ExcludedAndUsedNodes(excludedNodes, usedNodes);
}
+ private static void excludeFullNodes(ReplicationManager replicationManager,
+ ContainerInfo container, List<DatanodeDetails> excludedNodes) {
+ ContainerReplicaPendingOps pendingOps =
replicationManager.getContainerReplicaPendingOps();
+ Map<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
+ containerSizeScheduled = pendingOps.getContainerSizeScheduled();
+ if (containerSizeScheduled == null || containerSizeScheduled.isEmpty()) {
+ return;
+ }
+
+ final long requiredSize =
HddsServerUtil.requiredReplicationSpace(container.getUsedBytes());
+ NodeManager nodeManager = replicationManager.getNodeManager();
+
+ for (Map.Entry<DatanodeID, ContainerReplicaPendingOps.SizeAndTime> entry :
containerSizeScheduled.entrySet()) {
+ DatanodeDetails dn = nodeManager.getNode(entry.getKey());
+ if (dn == null || excludedNodes.contains(dn)) {
+ continue;
+ }
+
+ SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
+ if (nodeMetric == null) {
+ continue;
+ }
+
+ long scheduledSize = 0;
+ ContainerReplicaPendingOps.SizeAndTime sizeAndTime = entry.getValue();
+ if (sizeAndTime != null) {
+ // Only consider sizes added in the last event timeout window
+ if (pendingOps.getClock().millis() - sizeAndTime.getLastUpdatedTime()
+ < replicationManager.getConfig().getEventTimeout()) {
+ scheduledSize = sizeAndTime.getSize();
+ } else {
+ LOG.debug("Expired op {} found while computing exclude nodes",
entry);
+ }
+ }
+
+ SCMNodeStat scmNodeStat = nodeMetric.get();
+ if (scmNodeStat.getRemaining().get() -
scmNodeStat.getFreeSpaceToSpare().get() - scheduledSize
+ < requiredSize) {
+ LOG.debug("Adding datanode {} to exclude list. Remaining: {},
freeSpaceToSpare: {}, scheduledSize: {}, " +
+ "requiredSize: {}. ContainerInfo: {}.", dn,
scmNodeStat.getRemaining().get(),
+ scmNodeStat.getFreeSpaceToSpare().get(), scheduledSize,
requiredSize, container);
+ excludedNodes.add(dn);
+ }
+ }
+ }
+
/**
* Simple class to hold the excluded and used nodes lists.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 00163aa214..9149d07841 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -765,8 +765,10 @@ private void initializeSystemManagers(OzoneConfiguration
conf,
finalizationManager.buildUpgradeContext(scmNodeManager, pipelineManager,
scmContext);
+ ReplicationManager.ReplicationManagerConfiguration rmConf =
+
conf.getObject(ReplicationManager.ReplicationManagerConfiguration.class);
containerReplicaPendingOps =
- new ContainerReplicaPendingOps(systemClock);
+ new ContainerReplicaPendingOps(systemClock, rmConf);
long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration(
ScmConfigKeys
@@ -819,6 +821,7 @@ private void initializeSystemManagers(OzoneConfiguration
conf,
replicationManager = configurator.getReplicationManager();
} else {
replicationManager = new ReplicationManager(
+ rmConf,
conf,
containerManager,
containerPlacementPolicy,
@@ -828,7 +831,7 @@ private void initializeSystemManagers(OzoneConfiguration
conf,
scmNodeManager,
systemClock,
containerReplicaPendingOps);
- reconfigurationHandler.register(replicationManager.getConfig());
+ reconfigurationHandler.register(rmConf);
}
serviceManager.register(replicationManager);
// RM gets notified of expired pending delete from
containerReplicaPendingOps by subscribing to it
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
index d645beb624..ee813f0942 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
@@ -22,6 +22,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -32,11 +34,12 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
@@ -61,18 +64,20 @@ public class TestContainerReplicaPendingOps {
private long deadline;
private SCMCommand<?> addCmd;
private SCMCommand<?> deleteCmd;
+ private static final long FIVE_GB_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024;
+ private static final long THREE_GB_CONTAINER_SIZE = 3L * 1024 * 1024 * 1024;
+ private ReplicationManager.ReplicationManagerConfiguration rmConf;
@BeforeEach
public void setup() {
clock = new TestClock(Instant.now(), ZoneOffset.UTC);
deadline = clock.millis() + 10000; // Current time plus 10 seconds
- pendingOps = new ContainerReplicaPendingOps(clock);
- ConfigurationSource conf = new OzoneConfiguration();
- ReplicationManager.ReplicationManagerConfiguration rmConf = conf
- .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+ OzoneConfiguration conf = new OzoneConfiguration();
+ rmConf =
conf.getObject(ReplicationManager.ReplicationManagerConfiguration.class);
ReplicationManager rm = mock(ReplicationManager.class);
when(rm.getConfig()).thenReturn(rmConf);
+ pendingOps = new ContainerReplicaPendingOps(clock, rmConf);
metrics = ReplicationManagerMetrics.create(rm);
pendingOps.setReplicationMetrics(metrics);
dn1 = MockDatanodeDetails.randomDatanodeDetails();
@@ -99,7 +104,8 @@ public void testGetPendingOpsReturnsEmptyList() {
@Test
public void testClear() {
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 0,
deleteCmd, deadline);
assertEquals(1,
pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD));
@@ -116,16 +122,25 @@ public void testClear() {
@Test
public void testCanAddReplicasForAdd() {
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 0, addCmd,
deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
// Duplicate for DN2
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 0, addCmd,
deadline + 1);
+ pendingOps.scheduleAddReplica(
+ ContainerID.valueOf(1), dn2, 0, addCmd, deadline + 1,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
// Not a duplicate for DN2 as different index. Should not happen in
practice as it is not valid to have 2 indexes
// on the same node.
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 1, addCmd,
deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd,
deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd,
deadline + 1);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 1, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd,
deadline,
+ THREE_GB_CONTAINER_SIZE, clock.millis());
+ pendingOps.scheduleAddReplica(
+ ContainerID.valueOf(2), dn1, 1, addCmd, deadline + 1,
+ THREE_GB_CONTAINER_SIZE, clock.millis());
List<ContainerReplicaOp> ops =
pendingOps.getPendingOps(ContainerID.valueOf(1));
@@ -185,9 +200,11 @@ public void testCanAddReplicasForDelete() {
@Test
public void testCompletingOps() {
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 0,
deleteCmd, deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn2, 0,
deleteCmd, deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 1,
deleteCmd, deadline);
List<ContainerReplicaOp> ops =
@@ -218,9 +235,11 @@ public void testCompletingOps() {
@Test
public void testRemoveSpecificOp() {
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 0,
deleteCmd, deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn2, 0,
deleteCmd, deadline);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
deadline,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 1,
deleteCmd, deadline);
ContainerID cid = ContainerID.valueOf(1);
@@ -241,11 +260,14 @@ public void testRemoveExpiredEntries() {
long laterExpiry = clock.millis() + 2000;
long latestExpiry = clock.millis() + 3000;
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 0,
deleteCmd, expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
expiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
expiry,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn2, 0,
deleteCmd, laterExpiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
laterExpiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd,
laterExpiry,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 1,
deleteCmd, latestExpiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd,
latestExpiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd,
latestExpiry,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
List<ContainerReplicaOp> ops =
pendingOps.getPendingOps(ContainerID.valueOf(1));
@@ -302,10 +324,13 @@ private boolean isOpPresent(List<ContainerReplicaOp> ops,
DatanodeDetails dn,
public void testReplicationMetrics() {
long expiry = clock.millis() + 1000;
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 1,
deleteCmd, expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 2, addCmd,
expiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 2, addCmd,
expiry,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn2, 1,
deleteCmd, expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn3, 1, addCmd,
expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn3, 0, addCmd,
expiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn3, 1, addCmd,
expiry,
+ THREE_GB_CONTAINER_SIZE, clock.millis());
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn3, 0, addCmd,
expiry,
+ THREE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(4), dn3, 0,
deleteCmd, expiry);
// InFlight Replication and Deletion
@@ -328,10 +353,13 @@ public void testReplicationMetrics() {
expiry = clock.millis() + 1000;
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(3), dn1, 2,
deleteCmd, expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn1, 3, addCmd,
expiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn1, 3, addCmd,
expiry,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(4), dn2, 2,
deleteCmd, expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(4), dn3, 4, addCmd,
expiry);
- pendingOps.scheduleAddReplica(ContainerID.valueOf(5), dn3, 0, addCmd,
expiry);
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(4), dn3, 4, addCmd,
expiry,
+ THREE_GB_CONTAINER_SIZE, clock.millis());
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(5), dn3, 0, addCmd,
expiry,
+ THREE_GB_CONTAINER_SIZE, clock.millis());
pendingOps.scheduleDeleteReplica(ContainerID.valueOf(6), dn3, 0,
deleteCmd, expiry);
// InFlight Replication and Deletion. Previous Inflight should be
@@ -375,7 +403,7 @@ public void testNotifySubscribers() {
// schedule an ADD and a DELETE
ContainerID containerID = ContainerID.valueOf(1);
- pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline,
FIVE_GB_CONTAINER_SIZE, clock.millis());
ContainerReplicaOp addOp = pendingOps.getPendingOps(containerID).get(0);
pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
@@ -392,7 +420,7 @@ public void testNotifySubscribers() {
// now, test notification on expiration
pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
- pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline,
FIVE_GB_CONTAINER_SIZE, clock.millis());
for (ContainerReplicaOp op : pendingOps.getPendingOps(containerID)) {
if (op.getOpType() == ADD) {
addOp = op;
@@ -415,7 +443,7 @@ public void
subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() {
// schedule ops
pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
- pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline,
FIVE_GB_CONTAINER_SIZE, clock.millis());
// register subscriber
ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
@@ -434,7 +462,7 @@ public void
subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() {
ContainerID containerID = ContainerID.valueOf(1);
// schedule ops
- pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline,
FIVE_GB_CONTAINER_SIZE, clock.millis());
// register subscriber
ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
@@ -442,9 +470,116 @@ public void
subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() {
pendingOps.registerSubscriber(subscriber1);
clock.fastForward(1000);
- pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1,
+ FIVE_GB_CONTAINER_SIZE, clock.millis());
// no entries have expired, so there should be zero interactions with the
// subscriber
verifyNoMoreInteractions(subscriber1);
}
+
+ /**
+ * Tests that ContainerReplicaPendingOps correctly tracks how much size (of
containers) is being moved to a target
+ * Datanode because of pending ADD ops. This size should be correctly added
and reduced when ADD ops are triggered
+ * and completed.
+ */
+ @Test
+ public void testScheduledSizeIsCorrectlyTrackedAndCompleted() {
+ final long eventTimeout = rmConf.getEventTimeout();
+ long now = clock.millis();
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
+ now + eventTimeout, FIVE_GB_CONTAINER_SIZE, clock.millis());
+
+ // Assert that containerSizeScheduled has the correct size
+ ConcurrentHashMap<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
scheduled =
+ pendingOps.getContainerSizeScheduled();
+ assertEquals(1, scheduled.size());
+ assertEquals(FIVE_GB_CONTAINER_SIZE, scheduled.get(dn1.getID()).getSize());
+
+ // Schedule a second op for the same datanode
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 0, addCmd,
+ now + eventTimeout, THREE_GB_CONTAINER_SIZE, clock.millis());
+ assertEquals(FIVE_GB_CONTAINER_SIZE + THREE_GB_CONTAINER_SIZE,
scheduled.get(dn1.getID()).getSize());
+
+ // Complete the first op
+ pendingOps.completeAddReplica(ContainerID.valueOf(1), dn1, 0);
+ assertEquals(THREE_GB_CONTAINER_SIZE,
scheduled.get(dn1.getID()).getSize());
+
+ // Complete the second op
+ pendingOps.completeAddReplica(ContainerID.valueOf(2), dn1, 0);
+ assertNull(scheduled.get(dn1.getID()));
+ }
+
+ /**
+ * When an ADD op (container replication) expires, the map in
ContainerReplicaPendingOps should be modified
+ * correctly. The entry should be removed if
ReplicationManagerConfiguration#eventTimemout milliseconds have passed
+ * since the entry's lastUpdatedTime.
+ */
+ @Test
+ public void testScheduledSizeIsCorrectlyTrackedAndExpired() {
+ final long eventTimeout = rmConf.getEventTimeout();
+
+ long now = clock.millis();
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn2, 0, addCmd,
+ now + eventTimeout, FIVE_GB_CONTAINER_SIZE, clock.millis());
+ ConcurrentHashMap<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
+ scheduled = pendingOps.getContainerSizeScheduled();
+ assertEquals(FIVE_GB_CONTAINER_SIZE, scheduled.get(dn2.getID()).getSize());
+ assertEquals(now, scheduled.get(dn2.getID()).getLastUpdatedTime());
+
+ // Advance clock so the op expires
+ clock.fastForward(eventTimeout + 1);
+ pendingOps.removeExpiredEntries();
+ // The entry should be removed from the map after expiration
+ assertNull(scheduled.get(dn2.getID()));
+ }
+
+ /**
+ * Tests that only the size of containers with expired ops is reduced from
the map tracking size of pending ops.
+ * For example, if target Datanode DN1 has two pending ADD ops 10GB + 15GB,
and the first op expires, then only
+ * 10GB should be subtracted.
+ */
+ @Test
+ public void testOnlyExpiredOpSizeIsRemovedFromSizeScheduledMap() {
+ final long eventTimeout = rmConf.getEventTimeout();
+ long now = clock.millis();
+ // Schedule first op
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(4), dn2, 0, addCmd,
+ now + eventTimeout, FIVE_GB_CONTAINER_SIZE, clock.millis());
+ // another replication scheduled for dn1 to receive a container - just
testing that this entry isn't removed or
+ // modified when other entries expire or are modified
+ pendingOps.scheduleAddReplica((ContainerID.valueOf(2)), dn1, 2, addCmd,
+ now + eventTimeout * 10, THREE_GB_CONTAINER_SIZE, clock.millis());
+ ConcurrentHashMap<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
+ scheduled = pendingOps.getContainerSizeScheduled();
+ assertEquals(FIVE_GB_CONTAINER_SIZE, scheduled.get(dn2.getID()).getSize());
+ assertEquals(THREE_GB_CONTAINER_SIZE,
scheduled.get(dn1.getID()).getSize());
+ assertEquals(now, scheduled.get(dn2.getID()).getLastUpdatedTime());
+ assertEquals(now, scheduled.get(dn1.getID()).getLastUpdatedTime());
+
+ clock.fastForward(eventTimeout - 1);
+ long updateTime = clock.millis();
+
+ // Schedule second op for dn2, which should update the lastUpdatedTime
+ pendingOps.scheduleAddReplica(ContainerID.valueOf(5), dn2, 1, addCmd,
+ updateTime + eventTimeout, THREE_GB_CONTAINER_SIZE, clock.millis());
+ assertEquals(FIVE_GB_CONTAINER_SIZE + THREE_GB_CONTAINER_SIZE,
scheduled.get(dn2.getID()).getSize());
+ assertEquals(THREE_GB_CONTAINER_SIZE,
scheduled.get(dn1.getID()).getSize());
+ assertEquals(updateTime, scheduled.get(dn2.getID()).getLastUpdatedTime());
+ assertEquals(now, scheduled.get(dn1.getID()).getLastUpdatedTime());
+
+ // Advance clock to expire the first op but not the second for dn2
+ clock.set(Instant.ofEpochMilli(now + eventTimeout + 1));
+ pendingOps.removeExpiredEntries();
+
+ // Assert the entry for dn2 still exists, but with reduced size
+ assertNotNull(scheduled.get(dn2.getID()));
+ assertEquals(THREE_GB_CONTAINER_SIZE,
scheduled.get(dn2.getID()).getSize());
+ assertEquals(THREE_GB_CONTAINER_SIZE,
scheduled.get(dn1.getID()).getSize());
+
+ // Advance clock again to expire the second op for dn2
+ clock.set(Instant.ofEpochMilli(updateTime + eventTimeout + 1));
+ pendingOps.removeExpiredEntries();
+ assertNull(scheduled.get(dn2.getID()));
+ assertEquals(THREE_GB_CONTAINER_SIZE,
scheduled.get(dn1.getID()).getSize());
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 8a18c1ba1d..5de354fb2d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -141,6 +141,7 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) {
.thenReturn(rmConf);
metrics = ReplicationManagerMetrics.create(replicationManager);
when(replicationManager.getMetrics()).thenReturn(metrics);
+
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
.thenAnswer(invocation -> {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 59ffab79c3..d65cd2afc0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -94,6 +94,7 @@ protected void setup(ReplicationConfig repConfig, File
testDir)
when(replicationManager.getConfig()).thenReturn(rmConf);
metrics = ReplicationManagerMetrics.create(replicationManager);
when(replicationManager.getMetrics()).thenReturn(metrics);
+
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
commandsSent = new HashSet<>();
ReplicationTestUtil.mockRMSendDatanodeCommand(
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
index d88bcf86af..22fee58b7c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
@@ -88,6 +88,7 @@ void setup(@TempDir File testDir) throws
NodeNotFoundException,
ReplicationManager.ReplicationManagerConfiguration.class));
metrics = ReplicationManagerMetrics.create(replicationManager);
when(replicationManager.getMetrics()).thenReturn(metrics);
+
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
/*
Return NodeStatus with NodeOperationalState as specified in
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 21bc259b1a..dd2f9fd51d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -106,6 +106,7 @@ void setup(@TempDir File testDir) throws
NodeNotFoundException,
ReplicationManagerConfiguration.class));
metrics = ReplicationManagerMetrics.create(replicationManager);
when(replicationManager.getMetrics()).thenReturn(metrics);
+
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
/*
Return NodeStatus with NodeOperationalState as specified in
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 349da3dcaa..2777a0222b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -201,6 +201,7 @@ void cleanup() {
private ReplicationManager createReplicationManager() throws IOException {
return new ReplicationManager(
+
configuration.getObject(ReplicationManager.ReplicationManagerConfiguration.class),
configuration,
containerManager,
ratisPlacementPolicy,
@@ -226,7 +227,7 @@ private void enableProcessAll() {
@Test
public void testPendingOpsClearedWhenStarting() {
containerReplicaPendingOps.scheduleAddReplica(ContainerID.valueOf(1),
- MockDatanodeDetails.randomDatanodeDetails(), 1, null,
Integer.MAX_VALUE);
+ MockDatanodeDetails.randomDatanodeDetails(), 1, null,
Integer.MAX_VALUE, 5L, clock.millis());
containerReplicaPendingOps.scheduleDeleteReplica(ContainerID.valueOf(2),
MockDatanodeDetails.randomDatanodeDetails(), 1, null,
Integer.MAX_VALUE);
assertEquals(1, containerReplicaPendingOps
@@ -618,7 +619,7 @@ public void testUnderReplicatedContainerFixedByPending()
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
MockDatanodeDetails.randomDatanodeDetails(), 5, null,
- clock.millis() + 10000);
+ clock.millis() + 10000, 5L, clock.millis());
replicationManager.processContainer(
container, repQueue, repReport);
@@ -1660,6 +1661,7 @@ public void
testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty)
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queue.isEmpty()).thenReturn(queueIsEmpty);
final ReplicationManager customRM = new ReplicationManager(
+
configuration.getObject(ReplicationManager.ReplicationManagerConfiguration.class),
configuration,
containerManager,
ratisPlacementPolicy,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
index c0315738ae..e101bf6ae5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
@@ -222,6 +222,7 @@ public void setup() throws IOException,
NodeNotFoundException {
private ReplicationManager createReplicationManager() throws IOException {
return new ReplicationManager(
+
configuration.getObject(ReplicationManager.ReplicationManagerConfiguration.class),
configuration,
containerManager,
ratisPlacementPolicy,
@@ -250,7 +251,7 @@ private void loadPendingOps(ContainerInfo container,
Scenario scenario) {
for (PendingReplica r : scenario.getPendingReplicas()) {
if (r.getType() == ContainerReplicaOp.PendingOpType.ADD) {
containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
r.getDatanodeDetails(),
- r.getReplicaIndex(), null, Long.MAX_VALUE);
+ r.getReplicaIndex(), null, Long.MAX_VALUE, 5L, clock.millis());
} else if (r.getType() == ContainerReplicaOp.PendingOpType.DELETE) {
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
r.getDatanodeDetails(),
r.getReplicaIndex(), null, Long.MAX_VALUE);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
index 97abe6be99..a7718bdf24 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
@@ -20,28 +20,38 @@
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps.SizeAndTime;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.time.Instant;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -55,6 +65,8 @@ public class TestReplicationManagerUtil {
@BeforeEach
public void setup() {
replicationManager = mock(ReplicationManager.class);
+ ContainerReplicaPendingOps pendingOpsMock =
mock(ContainerReplicaPendingOps.class);
+
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(pendingOpsMock);
}
@Test
@@ -222,4 +234,114 @@ public void
testGetUsedAndExcludedNodesForQuasiClosedContainer() throws NodeNotF
.contains(pendingDelete);
}
+ @Test
+ public void testDatanodesWithInSufficientDiskSpaceAreExcluded() throws
NodeNotFoundException {
+ ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+ final long oneGb = 1024L * 1024 * 1024;
+ container.setUsedBytes(5 * oneGb);
+ ContainerID cid = container.containerID();
+ Set<ContainerReplica> replicas = new HashSet<>();
+ ContainerReplica good = createContainerReplica(cid, 0,
+ IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(good);
+
+ ContainerReplica remove = createContainerReplica(cid, 0,
+ IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(remove);
+
+ ContainerReplica unhealthy = createContainerReplica(
+ cid, 0, IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, 1);
+ replicas.add(unhealthy);
+
+ ContainerReplica decommissioning =
+ createContainerReplica(cid, 0,
+ DECOMMISSIONING, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(decommissioning);
+
+ ContainerReplica maintenance =
+ createContainerReplica(cid, 0,
+ IN_MAINTENANCE, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(maintenance);
+
+ // Take one of the replicas and set it to be removed. It should be on the
+ // excluded list rather than the used list.
+ Set<ContainerReplica> toBeRemoved = new HashSet<>();
+ toBeRemoved.add(remove);
+
+ // Add a pending add and delete. The add should go onto the used
+ // list and the delete added to the excluded nodes.
+ DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails pendingDelete =
MockDatanodeDetails.randomDatanodeDetails();
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+ pending.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.DELETE, pendingDelete, 0));
+
+ // set up mocks such ContainerReplicaPendingOps returns the
containerSizeScheduled map
+ ReplicationManagerConfiguration rmConf = new
ReplicationManagerConfiguration();
+ when(replicationManager.getConfig()).thenReturn(rmConf);
+ TestClock clock = new TestClock(Instant.now(), ZoneOffset.UTC);
+ ConcurrentHashMap<DatanodeID, SizeAndTime> sizeScheduledMap = new
ConcurrentHashMap<>();
+
+ // fullDn has 10GB size scheduled, 30GB available and 20GB min free space,
so it should be excluded
+ DatanodeDetails fullDn = MockDatanodeDetails.randomDatanodeDetails();
+ sizeScheduledMap.put(fullDn.getID(), new SizeAndTime(10 * oneGb,
clock.millis()));
+ // spaceAvailableDn should not be excluded as it has sufficient space
+ DatanodeDetails spaceAvailableDn =
MockDatanodeDetails.randomDatanodeDetails();
+ sizeScheduledMap.put(spaceAvailableDn.getID(), new SizeAndTime(10 * oneGb,
clock.millis()));
+ // expiredOpDn is the same as fullDn, however its op has expired - so it
should not be excluded
+ DatanodeDetails expiredOpDn = MockDatanodeDetails.randomDatanodeDetails();
+ sizeScheduledMap.put(expiredOpDn.getID(), new SizeAndTime(10 * oneGb,
+ clock.millis() - rmConf.getEventTimeout() - 1));
+
+ ContainerReplicaPendingOps pendingOpsMock =
mock(ContainerReplicaPendingOps.class);
+
when(pendingOpsMock.getContainerSizeScheduled()).thenReturn(sizeScheduledMap);
+ when(pendingOpsMock.getClock()).thenReturn(clock);
+
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(pendingOpsMock);
+
+ NodeManager nodeManagerMock = mock(NodeManager.class);
+ when(replicationManager.getNodeManager()).thenReturn(nodeManagerMock);
+ doReturn(fullDn).when(nodeManagerMock).getNode(fullDn.getID());
+ doReturn(new SCMNodeMetric(50 * oneGb, 20 * oneGb, 30 * oneGb, 5 * oneGb,
+ 20 * oneGb)).when(nodeManagerMock).getNodeStat(fullDn);
+
doReturn(spaceAvailableDn).when(nodeManagerMock).getNode(spaceAvailableDn.getID());
+ doReturn(new SCMNodeMetric(50 * oneGb, 10 * oneGb, 40 * oneGb, 5 * oneGb,
+ 20 * oneGb)).when(nodeManagerMock).getNodeStat(spaceAvailableDn);
+ doReturn(expiredOpDn).when(nodeManagerMock).getNode(expiredOpDn.getID());
+ doReturn(new SCMNodeMetric(50 * oneGb, 20 * oneGb, 30 * oneGb, 5 * oneGb,
+ 20 * oneGb)).when(nodeManagerMock).getNodeStat(expiredOpDn);
+
+ when(replicationManager.getNodeStatus(any())).thenAnswer(
+ invocation -> {
+ final DatanodeDetails dn = invocation.getArgument(0);
+ for (ContainerReplica r : replicas) {
+ if (r.getDatanodeDetails().equals(dn)) {
+ return NodeStatus.valueOf(
+ r.getDatanodeDetails().getPersistedOpState(),
+ HddsProtos.NodeState.HEALTHY);
+ }
+ }
+ throw new NodeNotFoundException(dn.getID());
+ });
+
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+ ReplicationManagerUtil.getExcludedAndUsedNodes(container,
+ new ArrayList<>(replicas), toBeRemoved, pending,
+ replicationManager);
+
+ assertEquals(3, excludedAndUsedNodes.getUsedNodes().size());
+
assertThat(excludedAndUsedNodes.getUsedNodes()).contains(good.getDatanodeDetails());
+
assertThat(excludedAndUsedNodes.getUsedNodes()).contains(maintenance.getDatanodeDetails());
+ assertThat(excludedAndUsedNodes.getUsedNodes()).contains(pendingAdd);
+
+ assertEquals(5, excludedAndUsedNodes.getExcludedNodes().size());
+
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(unhealthy.getDatanodeDetails());
+
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(decommissioning.getDatanodeDetails());
+
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(remove.getDatanodeDetails());
+
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(pendingDelete);
+ assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(fullDn);
+ }
+
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index cb773004b0..09c54fd937 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -245,7 +245,8 @@ public
ReconStorageContainerManagerFacade(OzoneConfiguration conf,
scmhaManager,
scmContext);
ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
- Clock.system(ZoneId.systemDefault()));
+ Clock.system(ZoneId.systemDefault()),
+
conf.getObject(ReplicationManager.ReplicationManagerConfiguration.class));
this.containerManager = new ReconContainerManager(conf,
dbStore,
ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]