This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bb7cebcdda HDDS-13437. Avoid scheduling replications on full datanodes 
by tracking pending op size in SCM (#8813)
bb7cebcdda is described below

commit bb7cebcdda93978f8a5c14da4ee6816b8a912f20
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Mon Jul 28 19:39:38 2025 +0530

    HDDS-13437. Avoid scheduling replications on full datanodes by tracking 
pending op size in SCM (#8813)
---
 .../container/replication/ContainerReplicaOp.java  |  18 +-
 .../replication/ContainerReplicaPendingOps.java    |  98 +++++++++-
 .../container/replication/ReplicationManager.java  |  26 +--
 .../replication/ReplicationManagerUtil.java        |  51 ++++++
 .../hdds/scm/server/StorageContainerManager.java   |   7 +-
 .../TestContainerReplicaPendingOps.java            | 197 +++++++++++++++++----
 .../replication/TestECUnderReplicationHandler.java |   1 +
 .../replication/TestMisReplicationHandler.java     |   1 +
 ...estQuasiClosedStuckUnderReplicationHandler.java |   1 +
 .../TestRatisUnderReplicationHandler.java          |   1 +
 .../replication/TestReplicationManager.java        |   6 +-
 .../TestReplicationManagerScenarios.java           |   3 +-
 .../replication/TestReplicationManagerUtil.java    | 122 +++++++++++++
 .../scm/ReconStorageContainerManagerFacade.java    |   3 +-
 14 files changed, 481 insertions(+), 54 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
index 3c3e6fd712..2be951a181 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -30,19 +30,29 @@ public class ContainerReplicaOp {
   private final int replicaIndex;
   private final SCMCommand<?> command;
   private final long deadlineEpochMillis;
+  private final long containerSize;
 
   public static ContainerReplicaOp create(PendingOpType opType,
       DatanodeDetails target, int replicaIndex) {
-    return new ContainerReplicaOp(opType, target, replicaIndex, null, 
System.currentTimeMillis());
+    return new ContainerReplicaOp(opType, target, replicaIndex, null,
+        System.currentTimeMillis(), 0);
   }
 
   public ContainerReplicaOp(PendingOpType opType,
-      DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis) {
+      DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
+      long deadlineEpochMillis) {
+    this(opType, target, replicaIndex, command, deadlineEpochMillis, 0);
+  }
+
+  public ContainerReplicaOp(PendingOpType opType,
+      DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
+      long deadlineEpochMillis, long containerSize) {
     this.opType = opType;
     this.target = target;
     this.replicaIndex = replicaIndex;
     this.command = command;
     this.deadlineEpochMillis = deadlineEpochMillis;
+    this.containerSize = containerSize;
   }
 
   public PendingOpType getOpType() {
@@ -65,6 +75,10 @@ public long getDeadlineEpochMillis() {
     return deadlineEpochMillis;
   }
 
+  public long getContainerSize() {
+    return containerSize;
+  }
+
   /**
    * Enum representing different types of pending Ops.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
index d2d4389d73..8b1766cd97 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
@@ -59,12 +60,49 @@ public class ContainerReplicaPendingOps {
   private ReplicationManagerMetrics replicationMetrics = null;
   private final List<ContainerReplicaPendingOpsSubscriber> subscribers =
       new ArrayList<>();
+  // tracks how much data is pending to be added to a target Datanode because 
of pending ADD ops
+  private final ConcurrentHashMap<DatanodeID, SizeAndTime> 
containerSizeScheduled = new ConcurrentHashMap<>();
+  private ReplicationManager.ReplicationManagerConfiguration rmConf;
 
   public ContainerReplicaPendingOps(Clock clock) {
     this.clock = clock;
     resetCounters();
   }
 
+  public ContainerReplicaPendingOps(Clock clock, 
ReplicationManager.ReplicationManagerConfiguration rmConf) {
+    this(clock);
+    this.rmConf = rmConf;
+  }
+
+  /**
+   * Used as the value of {@link 
ContainerReplicaPendingOps#containerSizeScheduled} map for tracking the size of
+   * containers with pending ADD ops. Immutable.
+   */
+  public static class SizeAndTime {
+    // number of bytes pending ADD on a target DN
+    private final long size;
+    // timestamp (milliseconds since epoch) when the latest op was scheduled 
for this DN
+    private final long lastUpdatedTime;
+
+    public SizeAndTime(long size, long lastUpdatedTime) {
+      this.size = size;
+      this.lastUpdatedTime = lastUpdatedTime;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public long getLastUpdatedTime() {
+      return lastUpdatedTime;
+    }
+
+    @Override
+    public String toString() {
+      return "Size: " + size + ", lastUpdatedTime: " + lastUpdatedTime;
+    }
+  }
+
   private void resetCounters() {
     for (PendingOpType opType: PendingOpType.values()) {
       AtomicLong[] counters = new AtomicLong[2];
@@ -85,6 +123,7 @@ public void clear() {
     try {
       pendingOps.clear();
       resetCounters();
+      containerSizeScheduled.clear();
     } finally {
       globalLock.writeLock().unlock();
     }
@@ -123,8 +162,10 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID 
containerID) {
    *                            be discarded.
    */
   public void scheduleAddReplica(ContainerID containerID,
-      DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis) {
-    addReplica(ADD, containerID, target, replicaIndex, command, 
deadlineEpochMillis);
+      DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis, long containerSize,
+      long scheduledEpochMillis) {
+    addReplica(ADD, containerID, target, replicaIndex, command, 
deadlineEpochMillis, containerSize,
+        scheduledEpochMillis);
   }
 
   /**
@@ -139,7 +180,7 @@ public void scheduleAddReplica(ContainerID containerID,
    */
   public void scheduleDeleteReplica(ContainerID containerID,
       DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis) {
-    addReplica(DELETE, containerID, target, replicaIndex, command, 
deadlineEpochMillis);
+    addReplica(DELETE, containerID, target, replicaIndex, command, 
deadlineEpochMillis, 0L, clock.millis());
   }
 
   /**
@@ -226,6 +267,9 @@ public void removeExpiredEntries() {
               // For delete ops, we don't remove them from the list as RM must 
resend them, or they
               // will be removed via a container report when they are 
confirmed as deleted.
               iterator.remove();
+              if (op.getOpType() == ADD) {
+                releaseScheduledContainerSize(op);
+              }
               decrementCounter(op.getOpType(), op.getReplicaIndex());
             }
             expiredOps.add(op);
@@ -246,6 +290,22 @@ public void removeExpiredEntries() {
     }
   }
 
+  private void releaseScheduledContainerSize(ContainerReplicaOp op) {
+    containerSizeScheduled.computeIfPresent(op.getTarget().getID(), (k, v) -> {
+      long newSize = v.getSize() - op.getContainerSize();
+      boolean isSizeNonPositive = newSize <= 0;
+      boolean hasOpExpired = clock.millis() - v.getLastUpdatedTime() > 
rmConf.getEventTimeout();
+      if (isSizeNonPositive || hasOpExpired) {
+        /*
+        If the scheduled size is now less than or equal to 0, or if the last 
op has expired, implying that the ops
+        before it must have completed or expired, then remove this entry from 
the map
+        */
+        return null;
+      }
+      return new SizeAndTime(newSize, v.getLastUpdatedTime());
+    });
+  }
+
   private void updateTimeoutMetrics(ContainerReplicaOp op) {
     if (op.getOpType() == ADD && isMetricsNotNull()) {
       if (isEC(op.getReplicaIndex())) {
@@ -262,9 +322,10 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
     }
   }
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
   private void addReplica(ContainerReplicaOp.PendingOpType opType,
       ContainerID containerID, DatanodeDetails target, int replicaIndex, 
SCMCommand<?> command,
-      long deadlineEpochMillis) {
+      long deadlineEpochMillis, long containerSize, long scheduledEpochMillis) 
{
     Lock lock = writeLock(containerID);
     lock(lock);
     try {
@@ -274,7 +335,17 @@ private void addReplica(ContainerReplicaOp.PendingOpType 
opType,
       List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
           containerID, s -> new ArrayList<>());
       ops.add(new ContainerReplicaOp(opType,
-          target, replicaIndex, command, deadlineEpochMillis));
+          target, replicaIndex, command, deadlineEpochMillis, containerSize));
+      DatanodeID id = target.getID();
+      if (opType == ADD) {
+        containerSizeScheduled.compute(id, (k, v) -> {
+          if (v == null) {
+            return new SizeAndTime(containerSize, scheduledEpochMillis);
+          } else {
+            return new SizeAndTime(v.getSize() + containerSize, 
scheduledEpochMillis);
+          }
+        });
+      }
       incrementCounter(opType, replicaIndex);
     } finally {
       unlock(lock);
@@ -300,6 +371,15 @@ private boolean 
completeOp(ContainerReplicaOp.PendingOpType opType,
             found = true;
             completedOps.add(op);
             iterator.remove();
+            if (opType == ADD) {
+              containerSizeScheduled.computeIfPresent(target.getID(), (k, v) 
-> {
+                long newSize = v.getSize() - op.getContainerSize();
+                if (newSize <= 0) {
+                  return null;
+                }
+                return new SizeAndTime(newSize, v.getLastUpdatedTime());
+              });
+            }
             decrementCounter(op.getOpType(), replicaIndex);
           }
         }
@@ -364,6 +444,14 @@ private void unlock(Lock lock) {
     lock.unlock();
   }
 
+  public ConcurrentHashMap<DatanodeID, SizeAndTime> 
getContainerSizeScheduled() {
+    return containerSizeScheduled;
+  }
+
+  public Clock getClock() {
+    return clock;
+  }
+
   private boolean isMetricsNotNull() {
     return replicationMetrics != null;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 89259cec08..33ab37a938 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -84,6 +84,7 @@
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -205,7 +206,8 @@ public class ReplicationManager implements SCMService, 
ContainerReplicaPendingOp
    * @param replicaPendingOps The pendingOps instance
    */
   @SuppressWarnings("parameternumber")
-  public ReplicationManager(final ConfigurationSource conf,
+  public ReplicationManager(final ReplicationManagerConfiguration rmConf,
+             final ConfigurationSource conf,
              final ContainerManager containerManager,
              final PlacementPolicy ratisContainerPlacement,
              final PlacementPolicy ecContainerPlacement,
@@ -217,7 +219,7 @@ public ReplicationManager(final ConfigurationSource conf,
              throws IOException {
     this.containerManager = containerManager;
     this.scmContext = scmContext;
-    this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
+    this.rmConf = rmConf;
     this.replicationServerConf =
         conf.getObject(ReplicationServer.ReplicationConfig.class);
     this.running = false;
@@ -688,13 +690,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo 
containerInfo,
       ReconstructECContainersCommand rcc = (ReconstructECContainersCommand) 
cmd;
       List<DatanodeDetails> targets = rcc.getTargetDatanodes();
       final ByteString targetIndexes = rcc.getMissingContainerIndexes();
+      long requiredSize = 
HddsServerUtil.requiredReplicationSpace(containerInfo.getUsedBytes());
       for (int i = 0; i < targetIndexes.size(); i++) {
-        containerReplicaPendingOps.scheduleAddReplica(
-            containerInfo.containerID(), targets.get(i), 
targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs);
+        
containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), 
targets.get(i),
+            targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs, requiredSize, 
clock.millis());
       }
       getMetrics().incrEcReconstructionCmdsSentTotal();
     } else if (cmd.getType() == Type.replicateContainerCommand) {
       ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd;
+      long requiredSize = 
HddsServerUtil.requiredReplicationSpace(containerInfo.getUsedBytes());
 
       if (rcc.getTargetDatanode() == null) {
         /*
@@ -702,17 +706,15 @@ private void adjustPendingOpsAndMetrics(ContainerInfo 
containerInfo,
         op's target Datanode should be the Datanode this command is being
         sent to.
          */
-        containerReplicaPendingOps.scheduleAddReplica(
-            containerInfo.containerID(),
-            targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
+        
containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), 
targetDatanode,
+            rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs, requiredSize, 
clock.millis());
       } else {
         /*
         This means the source will push replica to the target, so the op's
         target Datanode should be the Datanode the replica will be pushed to.
          */
-        containerReplicaPendingOps.scheduleAddReplica(
-            containerInfo.containerID(),
-            rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd, 
scmDeadlineEpochMs);
+        
containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), 
rcc.getTargetDatanode(),
+            rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs, requiredSize, 
clock.millis());
       }
 
       if (rcc.getReplicaIndex() > 0) {
@@ -1484,6 +1486,10 @@ static NodeStatus getNodeStatus(DatanodeDetails dn, 
NodeManager nm) {
     }
   }
 
+  public NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
   private int getRemainingMaintenanceRedundancy(boolean isEC) {
     return isEC
         ? rmConf.getMaintenanceRemainingRedundancy()
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index 267da52173..8b2ad75824 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -24,6 +24,7 @@
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -34,7 +35,10 @@
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
@@ -195,9 +199,56 @@ An UNHEALTHY replica with unique origin node id of a 
QUASI_CLOSED container shou
         excludedNodes.add(pending.getTarget());
       }
     }
+    excludeFullNodes(replicationManager, container, excludedNodes);
     return new ExcludedAndUsedNodes(excludedNodes, usedNodes);
   }
 
+  private static void excludeFullNodes(ReplicationManager replicationManager,
+      ContainerInfo container, List<DatanodeDetails> excludedNodes) {
+    ContainerReplicaPendingOps pendingOps = 
replicationManager.getContainerReplicaPendingOps();
+    Map<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
+        containerSizeScheduled = pendingOps.getContainerSizeScheduled();
+    if (containerSizeScheduled == null || containerSizeScheduled.isEmpty()) {
+      return;
+    }
+
+    final long requiredSize = 
HddsServerUtil.requiredReplicationSpace(container.getUsedBytes());
+    NodeManager nodeManager = replicationManager.getNodeManager();
+
+    for (Map.Entry<DatanodeID, ContainerReplicaPendingOps.SizeAndTime> entry : 
containerSizeScheduled.entrySet()) {
+      DatanodeDetails dn = nodeManager.getNode(entry.getKey());
+      if (dn == null || excludedNodes.contains(dn)) {
+        continue;
+      }
+
+      SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
+      if (nodeMetric == null) {
+        continue;
+      }
+
+      long scheduledSize = 0;
+      ContainerReplicaPendingOps.SizeAndTime sizeAndTime = entry.getValue();
+      if (sizeAndTime != null) {
+        // Only consider sizes added in the last event timeout window
+        if (pendingOps.getClock().millis() - sizeAndTime.getLastUpdatedTime()
+            < replicationManager.getConfig().getEventTimeout()) {
+          scheduledSize = sizeAndTime.getSize();
+        } else {
+          LOG.debug("Expired op {} found while computing exclude nodes", 
entry);
+        }
+      }
+
+      SCMNodeStat scmNodeStat = nodeMetric.get();
+      if (scmNodeStat.getRemaining().get() - 
scmNodeStat.getFreeSpaceToSpare().get() - scheduledSize
+          < requiredSize) {
+        LOG.debug("Adding datanode {} to exclude list. Remaining: {}, 
freeSpaceToSpare: {}, scheduledSize: {}, " +
+            "requiredSize: {}. ContainerInfo: {}.", dn, 
scmNodeStat.getRemaining().get(),
+            scmNodeStat.getFreeSpaceToSpare().get(), scheduledSize, 
requiredSize, container);
+        excludedNodes.add(dn);
+      }
+    }
+  }
+
   /**
    * Simple class to hold the excluded and used nodes lists.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 00163aa214..9149d07841 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -765,8 +765,10 @@ private void initializeSystemManagers(OzoneConfiguration 
conf,
     finalizationManager.buildUpgradeContext(scmNodeManager, pipelineManager,
         scmContext);
 
+    ReplicationManager.ReplicationManagerConfiguration rmConf =
+        
conf.getObject(ReplicationManager.ReplicationManagerConfiguration.class);
     containerReplicaPendingOps =
-        new ContainerReplicaPendingOps(systemClock);
+        new ContainerReplicaPendingOps(systemClock, rmConf);
 
     long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration(
         ScmConfigKeys
@@ -819,6 +821,7 @@ private void initializeSystemManagers(OzoneConfiguration 
conf,
       replicationManager = configurator.getReplicationManager();
     }  else {
       replicationManager = new ReplicationManager(
+          rmConf,
           conf,
           containerManager,
           containerPlacementPolicy,
@@ -828,7 +831,7 @@ private void initializeSystemManagers(OzoneConfiguration 
conf,
           scmNodeManager,
           systemClock,
           containerReplicaPendingOps);
-      reconfigurationHandler.register(replicationManager.getConfig());
+      reconfigurationHandler.register(rmConf);
     }
     serviceManager.register(replicationManager);
     // RM gets notified of expired pending delete from 
containerReplicaPendingOps by subscribing to it
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
index d645beb624..ee813f0942 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
@@ -22,6 +22,8 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -32,11 +34,12 @@
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
@@ -61,18 +64,20 @@ public class TestContainerReplicaPendingOps {
   private long deadline;
   private SCMCommand<?> addCmd;
   private SCMCommand<?> deleteCmd;
+  private static final long FIVE_GB_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024;
+  private static final long THREE_GB_CONTAINER_SIZE = 3L * 1024 * 1024 * 1024;
+  private ReplicationManager.ReplicationManagerConfiguration rmConf;
 
   @BeforeEach
   public void setup() {
     clock = new TestClock(Instant.now(), ZoneOffset.UTC);
     deadline = clock.millis() + 10000; // Current time plus 10 seconds
-    pendingOps = new ContainerReplicaPendingOps(clock);
 
-    ConfigurationSource conf = new OzoneConfiguration();
-    ReplicationManager.ReplicationManagerConfiguration rmConf = conf
-        .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    rmConf = 
conf.getObject(ReplicationManager.ReplicationManagerConfiguration.class);
     ReplicationManager rm = mock(ReplicationManager.class);
     when(rm.getConfig()).thenReturn(rmConf);
+    pendingOps = new ContainerReplicaPendingOps(clock, rmConf);
     metrics = ReplicationManagerMetrics.create(rm);
     pendingOps.setReplicationMetrics(metrics);
     dn1 = MockDatanodeDetails.randomDatanodeDetails();
@@ -99,7 +104,8 @@ public void testGetPendingOpsReturnsEmptyList() {
 
   @Test
   public void testClear() {
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 0, 
deleteCmd, deadline);
 
     assertEquals(1, 
pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD));
@@ -116,16 +122,25 @@ public void testClear() {
 
   @Test
   public void testCanAddReplicasForAdd() {
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 0, addCmd, 
deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     // Duplicate for DN2
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 0, addCmd, 
deadline + 1);
+    pendingOps.scheduleAddReplica(
+        ContainerID.valueOf(1), dn2, 0, addCmd, deadline + 1,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     // Not a duplicate for DN2 as different index. Should not happen in 
practice as it is not valid to have 2 indexes
     // on the same node.
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 1, addCmd, 
deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd, 
deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd, 
deadline + 1);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn2, 1, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd, 
deadline,
+        THREE_GB_CONTAINER_SIZE, clock.millis());
+    pendingOps.scheduleAddReplica(
+        ContainerID.valueOf(2), dn1, 1, addCmd, deadline + 1,
+        THREE_GB_CONTAINER_SIZE, clock.millis());
 
     List<ContainerReplicaOp> ops =
         pendingOps.getPendingOps(ContainerID.valueOf(1));
@@ -185,9 +200,11 @@ public void testCanAddReplicasForDelete() {
   @Test
   public void testCompletingOps() {
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 0, 
deleteCmd, deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn2, 0, 
deleteCmd, deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 1, 
deleteCmd, deadline);
 
     List<ContainerReplicaOp> ops =
@@ -218,9 +235,11 @@ public void testCompletingOps() {
   @Test
   public void testRemoveSpecificOp() {
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 0, 
deleteCmd, deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn2, 0, 
deleteCmd, deadline);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
deadline,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 1, 
deleteCmd, deadline);
 
     ContainerID cid = ContainerID.valueOf(1);
@@ -241,11 +260,14 @@ public void testRemoveExpiredEntries() {
     long laterExpiry =  clock.millis() + 2000;
     long latestExpiry = clock.millis() + 3000;
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 0, 
deleteCmd, expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
expiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd, 
expiry,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn2, 0, 
deleteCmd, laterExpiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
laterExpiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn3, 0, addCmd, 
laterExpiry,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn1, 1, 
deleteCmd, latestExpiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd, 
latestExpiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 1, addCmd, 
latestExpiry,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
 
     List<ContainerReplicaOp> ops =
         pendingOps.getPendingOps(ContainerID.valueOf(1));
@@ -302,10 +324,13 @@ private boolean isOpPresent(List<ContainerReplicaOp> ops, 
DatanodeDetails dn,
   public void testReplicationMetrics() {
     long expiry = clock.millis() + 1000;
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(1), dn1, 1, 
deleteCmd, expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 2, addCmd, 
expiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 2, addCmd, 
expiry,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), dn2, 1, 
deleteCmd, expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn3, 1, addCmd, 
expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn3, 0, addCmd, 
expiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn3, 1, addCmd, 
expiry,
+        THREE_GB_CONTAINER_SIZE, clock.millis());
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn3, 0, addCmd, 
expiry,
+        THREE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(4), dn3, 0, 
deleteCmd, expiry);
 
     // InFlight Replication and Deletion
@@ -328,10 +353,13 @@ public void testReplicationMetrics() {
 
     expiry = clock.millis() + 1000;
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(3), dn1, 2, 
deleteCmd, expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn1, 3, addCmd, 
expiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn1, 3, addCmd, 
expiry,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(4), dn2, 2, 
deleteCmd, expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(4), dn3, 4, addCmd, 
expiry);
-    pendingOps.scheduleAddReplica(ContainerID.valueOf(5), dn3, 0, addCmd, 
expiry);
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(4), dn3, 4, addCmd, 
expiry,
+        THREE_GB_CONTAINER_SIZE, clock.millis());
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(5), dn3, 0, addCmd, 
expiry,
+        THREE_GB_CONTAINER_SIZE, clock.millis());
     pendingOps.scheduleDeleteReplica(ContainerID.valueOf(6), dn3, 0, 
deleteCmd, expiry);
 
     // InFlight Replication and Deletion. Previous Inflight should be
@@ -375,7 +403,7 @@ public void testNotifySubscribers() {
 
     // schedule an ADD and a DELETE
     ContainerID containerID = ContainerID.valueOf(1);
-    pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline, 
FIVE_GB_CONTAINER_SIZE, clock.millis());
     ContainerReplicaOp addOp = pendingOps.getPendingOps(containerID).get(0);
     pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
 
@@ -392,7 +420,7 @@ public void testNotifySubscribers() {
 
     // now, test notification on expiration
     pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
-    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline, 
FIVE_GB_CONTAINER_SIZE, clock.millis());
     for (ContainerReplicaOp op : pendingOps.getPendingOps(containerID)) {
       if (op.getOpType() == ADD) {
         addOp = op;
@@ -415,7 +443,7 @@ public void 
subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() {
 
     // schedule ops
     pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
-    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline, 
FIVE_GB_CONTAINER_SIZE, clock.millis());
 
     // register subscriber
     ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
@@ -434,7 +462,7 @@ public void 
subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() {
     ContainerID containerID = ContainerID.valueOf(1);
 
     // schedule ops
-    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline, 
FIVE_GB_CONTAINER_SIZE, clock.millis());
 
     // register subscriber
     ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
@@ -442,9 +470,116 @@ public void 
subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() {
     pendingOps.registerSubscriber(subscriber1);
 
     clock.fastForward(1000);
-    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1,
+        FIVE_GB_CONTAINER_SIZE, clock.millis());
     // no entries have expired, so there should be zero interactions with the
     // subscriber
     verifyNoMoreInteractions(subscriber1);
   }
+
+  /**
+   * Tests that ContainerReplicaPendingOps correctly tracks how much size (of 
containers) is being moved to a target
+   * Datanode because of pending ADD ops. This size should be correctly added 
and reduced when ADD ops are triggered
+   * and completed.
+   */
+  @Test
+  public void testScheduledSizeIsCorrectlyTrackedAndCompleted() {
+    final long eventTimeout = rmConf.getEventTimeout();
+    long now = clock.millis();
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(1), dn1, 0, addCmd,
+        now + eventTimeout, FIVE_GB_CONTAINER_SIZE, clock.millis());
+
+    // Assert that containerSizeScheduled has the correct size
+    ConcurrentHashMap<DatanodeID, ContainerReplicaPendingOps.SizeAndTime> 
scheduled =
+        pendingOps.getContainerSizeScheduled();
+    assertEquals(1, scheduled.size());
+    assertEquals(FIVE_GB_CONTAINER_SIZE, scheduled.get(dn1.getID()).getSize());
+
+    // Schedule a second op for the same datanode
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(2), dn1, 0, addCmd,
+        now + eventTimeout, THREE_GB_CONTAINER_SIZE, clock.millis());
+    assertEquals(FIVE_GB_CONTAINER_SIZE + THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn1.getID()).getSize());
+
+    // Complete the first op
+    pendingOps.completeAddReplica(ContainerID.valueOf(1), dn1, 0);
+    assertEquals(THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn1.getID()).getSize());
+
+    // Complete the second op
+    pendingOps.completeAddReplica(ContainerID.valueOf(2), dn1, 0);
+    assertNull(scheduled.get(dn1.getID()));
+  }
+
+  /**
+   * When an ADD op (container replication) expires, the map in 
ContainerReplicaPendingOps should be modified
+   * correctly. The entry should be removed if 
ReplicationManagerConfiguration#eventTimemout milliseconds have passed
+   * since the entry's lastUpdatedTime.
+   */
+  @Test
+  public void testScheduledSizeIsCorrectlyTrackedAndExpired() {
+    final long eventTimeout = rmConf.getEventTimeout();
+
+    long now = clock.millis();
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(3), dn2, 0, addCmd,
+        now + eventTimeout, FIVE_GB_CONTAINER_SIZE, clock.millis());
+    ConcurrentHashMap<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
+        scheduled = pendingOps.getContainerSizeScheduled();
+    assertEquals(FIVE_GB_CONTAINER_SIZE, scheduled.get(dn2.getID()).getSize());
+    assertEquals(now, scheduled.get(dn2.getID()).getLastUpdatedTime());
+
+    // Advance clock so the op expires
+    clock.fastForward(eventTimeout + 1);
+    pendingOps.removeExpiredEntries();
+    // The entry should be removed from the map after expiration
+    assertNull(scheduled.get(dn2.getID()));
+  }
+
+  /**
+   * Tests that only the size of containers with expired ops is reduced from 
the map tracking size of pending ops.
+   * For example, if target Datanode DN1 has two pending ADD ops 10GB + 15GB, 
and the first op expires, then only
+   * 10GB should be subtracted.
+   */
+  @Test
+  public void testOnlyExpiredOpSizeIsRemovedFromSizeScheduledMap() {
+    final long eventTimeout = rmConf.getEventTimeout();
+    long now = clock.millis();
+    // Schedule first op
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(4), dn2, 0, addCmd,
+        now + eventTimeout, FIVE_GB_CONTAINER_SIZE, clock.millis());
+    //  another replication scheduled for dn1 to receive a container - just 
testing that this entry isn't removed or
+    //  modified when other entries expire or are modified
+    pendingOps.scheduleAddReplica((ContainerID.valueOf(2)), dn1, 2, addCmd,
+        now + eventTimeout * 10, THREE_GB_CONTAINER_SIZE, clock.millis());
+    ConcurrentHashMap<DatanodeID, ContainerReplicaPendingOps.SizeAndTime>
+        scheduled = pendingOps.getContainerSizeScheduled();
+    assertEquals(FIVE_GB_CONTAINER_SIZE, scheduled.get(dn2.getID()).getSize());
+    assertEquals(THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn1.getID()).getSize());
+    assertEquals(now, scheduled.get(dn2.getID()).getLastUpdatedTime());
+    assertEquals(now, scheduled.get(dn1.getID()).getLastUpdatedTime());
+
+    clock.fastForward(eventTimeout - 1);
+    long updateTime = clock.millis();
+
+    // Schedule second op for dn2, which should update the lastUpdatedTime
+    pendingOps.scheduleAddReplica(ContainerID.valueOf(5), dn2, 1, addCmd,
+        updateTime + eventTimeout, THREE_GB_CONTAINER_SIZE, clock.millis());
+    assertEquals(FIVE_GB_CONTAINER_SIZE + THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn2.getID()).getSize());
+    assertEquals(THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn1.getID()).getSize());
+    assertEquals(updateTime, scheduled.get(dn2.getID()).getLastUpdatedTime());
+    assertEquals(now, scheduled.get(dn1.getID()).getLastUpdatedTime());
+
+    // Advance clock to expire the first op but not the second for dn2
+    clock.set(Instant.ofEpochMilli(now + eventTimeout + 1));
+    pendingOps.removeExpiredEntries();
+
+    // Assert the entry for dn2 still exists, but with reduced size
+    assertNotNull(scheduled.get(dn2.getID()));
+    assertEquals(THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn2.getID()).getSize());
+    assertEquals(THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn1.getID()).getSize());
+
+    // Advance clock again to expire the second op for dn2
+    clock.set(Instant.ofEpochMilli(updateTime + eventTimeout + 1));
+    pendingOps.removeExpiredEntries();
+    assertNull(scheduled.get(dn2.getID()));
+    assertEquals(THREE_GB_CONTAINER_SIZE, 
scheduled.get(dn1.getID()).getSize());
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 8a18c1ba1d..5de354fb2d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -141,6 +141,7 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) {
         .thenReturn(rmConf);
     metrics = ReplicationManagerMetrics.create(replicationManager);
     when(replicationManager.getMetrics()).thenReturn(metrics);
+    
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
 
     when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
         .thenAnswer(invocation -> {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 59ffab79c3..d65cd2afc0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -94,6 +94,7 @@ protected void setup(ReplicationConfig repConfig, File 
testDir)
     when(replicationManager.getConfig()).thenReturn(rmConf);
     metrics = ReplicationManagerMetrics.create(replicationManager);
     when(replicationManager.getMetrics()).thenReturn(metrics);
+    
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
 
     commandsSent = new HashSet<>();
     ReplicationTestUtil.mockRMSendDatanodeCommand(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
index d88bcf86af..22fee58b7c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java
@@ -88,6 +88,7 @@ void setup(@TempDir File testDir) throws 
NodeNotFoundException,
             ReplicationManager.ReplicationManagerConfiguration.class));
     metrics = ReplicationManagerMetrics.create(replicationManager);
     when(replicationManager.getMetrics()).thenReturn(metrics);
+    
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
 
     /*
       Return NodeStatus with NodeOperationalState as specified in
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 21bc259b1a..dd2f9fd51d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -106,6 +106,7 @@ void setup(@TempDir File testDir) throws 
NodeNotFoundException,
             ReplicationManagerConfiguration.class));
     metrics = ReplicationManagerMetrics.create(replicationManager);
     when(replicationManager.getMetrics()).thenReturn(metrics);
+    
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(mock(ContainerReplicaPendingOps.class));
 
     /*
       Return NodeStatus with NodeOperationalState as specified in
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 349da3dcaa..2777a0222b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -201,6 +201,7 @@ void cleanup() {
 
   private ReplicationManager createReplicationManager() throws IOException {
     return new ReplicationManager(
+        
configuration.getObject(ReplicationManager.ReplicationManagerConfiguration.class),
         configuration,
         containerManager,
         ratisPlacementPolicy,
@@ -226,7 +227,7 @@ private void enableProcessAll() {
   @Test
   public void testPendingOpsClearedWhenStarting() {
     containerReplicaPendingOps.scheduleAddReplica(ContainerID.valueOf(1),
-        MockDatanodeDetails.randomDatanodeDetails(), 1, null, 
Integer.MAX_VALUE);
+        MockDatanodeDetails.randomDatanodeDetails(), 1, null, 
Integer.MAX_VALUE, 5L, clock.millis());
     containerReplicaPendingOps.scheduleDeleteReplica(ContainerID.valueOf(2),
         MockDatanodeDetails.randomDatanodeDetails(), 1, null, 
Integer.MAX_VALUE);
     assertEquals(1, containerReplicaPendingOps
@@ -618,7 +619,7 @@ public void testUnderReplicatedContainerFixedByPending()
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
     containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
         MockDatanodeDetails.randomDatanodeDetails(), 5, null,
-        clock.millis() + 10000);
+        clock.millis() + 10000, 5L, clock.millis());
 
     replicationManager.processContainer(
         container, repQueue, repReport);
@@ -1660,6 +1661,7 @@ public void 
testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty)
     ReplicationQueue queue = mock(ReplicationQueue.class);
     when(queue.isEmpty()).thenReturn(queueIsEmpty);
     final ReplicationManager customRM = new ReplicationManager(
+        
configuration.getObject(ReplicationManager.ReplicationManagerConfiguration.class),
         configuration,
         containerManager,
         ratisPlacementPolicy,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
index c0315738ae..e101bf6ae5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
@@ -222,6 +222,7 @@ public void setup() throws IOException, 
NodeNotFoundException {
 
   private ReplicationManager createReplicationManager() throws IOException {
     return new ReplicationManager(
+        
configuration.getObject(ReplicationManager.ReplicationManagerConfiguration.class),
         configuration,
         containerManager,
         ratisPlacementPolicy,
@@ -250,7 +251,7 @@ private void loadPendingOps(ContainerInfo container, 
Scenario scenario) {
     for (PendingReplica r : scenario.getPendingReplicas()) {
       if (r.getType() == ContainerReplicaOp.PendingOpType.ADD) {
         containerReplicaPendingOps.scheduleAddReplica(container.containerID(), 
r.getDatanodeDetails(),
-            r.getReplicaIndex(), null, Long.MAX_VALUE);
+            r.getReplicaIndex(), null, Long.MAX_VALUE, 5L, clock.millis());
       } else if (r.getType() == ContainerReplicaOp.PendingOpType.DELETE) {
         
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(), 
r.getDatanodeDetails(),
             r.getReplicaIndex(), null, Long.MAX_VALUE);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
index 97abe6be99..a7718bdf24 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
@@ -20,28 +20,38 @@
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps.SizeAndTime;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ozone.test.TestClock;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -55,6 +65,8 @@ public class TestReplicationManagerUtil {
   @BeforeEach
   public void setup() {
     replicationManager = mock(ReplicationManager.class);
+    ContainerReplicaPendingOps pendingOpsMock = 
mock(ContainerReplicaPendingOps.class);
+    
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(pendingOpsMock);
   }
 
   @Test
@@ -222,4 +234,114 @@ public void 
testGetUsedAndExcludedNodesForQuasiClosedContainer() throws NodeNotF
         .contains(pendingDelete);
   }
 
+  @Test
+  public void testDatanodesWithInSufficientDiskSpaceAreExcluded() throws 
NodeNotFoundException {
+    ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+    final long oneGb = 1024L * 1024 * 1024;
+    container.setUsedBytes(5 * oneGb);
+    ContainerID cid = container.containerID();
+    Set<ContainerReplica> replicas = new HashSet<>();
+    ContainerReplica good = createContainerReplica(cid, 0,
+        IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(good);
+
+    ContainerReplica remove = createContainerReplica(cid, 0,
+        IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(remove);
+
+    ContainerReplica unhealthy = createContainerReplica(
+        cid, 0, IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, 1);
+    replicas.add(unhealthy);
+
+    ContainerReplica decommissioning =
+        createContainerReplica(cid, 0,
+            DECOMMISSIONING, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(decommissioning);
+
+    ContainerReplica maintenance =
+        createContainerReplica(cid, 0,
+            IN_MAINTENANCE, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(maintenance);
+
+    // Take one of the replicas and set it to be removed. It should be on the
+    // excluded list rather than the used list.
+    Set<ContainerReplica> toBeRemoved = new HashSet<>();
+    toBeRemoved.add(remove);
+
+    // Add a pending add and delete. The add should go onto the used
+    // list and the delete added to the excluded nodes.
+    DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails pendingDelete = 
MockDatanodeDetails.randomDatanodeDetails();
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+    pending.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.DELETE, pendingDelete, 0));
+
+    // set up mocks such ContainerReplicaPendingOps returns the 
containerSizeScheduled map
+    ReplicationManagerConfiguration rmConf = new 
ReplicationManagerConfiguration();
+    when(replicationManager.getConfig()).thenReturn(rmConf);
+    TestClock clock = new TestClock(Instant.now(), ZoneOffset.UTC);
+    ConcurrentHashMap<DatanodeID, SizeAndTime> sizeScheduledMap = new 
ConcurrentHashMap<>();
+
+    // fullDn has 10GB size scheduled, 30GB available and 20GB min free space, 
so it should be excluded
+    DatanodeDetails fullDn = MockDatanodeDetails.randomDatanodeDetails();
+    sizeScheduledMap.put(fullDn.getID(), new SizeAndTime(10 * oneGb, 
clock.millis()));
+    // spaceAvailableDn should not be excluded as it has sufficient space
+    DatanodeDetails spaceAvailableDn = 
MockDatanodeDetails.randomDatanodeDetails();
+    sizeScheduledMap.put(spaceAvailableDn.getID(), new SizeAndTime(10 * oneGb, 
clock.millis()));
+    // expiredOpDn is the same as fullDn, however its op has expired - so it 
should not be excluded
+    DatanodeDetails expiredOpDn = MockDatanodeDetails.randomDatanodeDetails();
+    sizeScheduledMap.put(expiredOpDn.getID(), new SizeAndTime(10 * oneGb,
+        clock.millis() - rmConf.getEventTimeout() - 1));
+
+    ContainerReplicaPendingOps pendingOpsMock = 
mock(ContainerReplicaPendingOps.class);
+    
when(pendingOpsMock.getContainerSizeScheduled()).thenReturn(sizeScheduledMap);
+    when(pendingOpsMock.getClock()).thenReturn(clock);
+    
when(replicationManager.getContainerReplicaPendingOps()).thenReturn(pendingOpsMock);
+
+    NodeManager nodeManagerMock = mock(NodeManager.class);
+    when(replicationManager.getNodeManager()).thenReturn(nodeManagerMock);
+    doReturn(fullDn).when(nodeManagerMock).getNode(fullDn.getID());
+    doReturn(new SCMNodeMetric(50 * oneGb, 20 * oneGb, 30 * oneGb, 5 * oneGb,
+        20 * oneGb)).when(nodeManagerMock).getNodeStat(fullDn);
+    
doReturn(spaceAvailableDn).when(nodeManagerMock).getNode(spaceAvailableDn.getID());
+    doReturn(new SCMNodeMetric(50 * oneGb, 10 * oneGb, 40 * oneGb, 5 * oneGb,
+        20 * oneGb)).when(nodeManagerMock).getNodeStat(spaceAvailableDn);
+    doReturn(expiredOpDn).when(nodeManagerMock).getNode(expiredOpDn.getID());
+    doReturn(new SCMNodeMetric(50 * oneGb, 20 * oneGb, 30 * oneGb, 5 * oneGb,
+        20 * oneGb)).when(nodeManagerMock).getNodeStat(expiredOpDn);
+
+    when(replicationManager.getNodeStatus(any())).thenAnswer(
+        invocation -> {
+          final DatanodeDetails dn = invocation.getArgument(0);
+          for (ContainerReplica r : replicas) {
+            if (r.getDatanodeDetails().equals(dn)) {
+              return NodeStatus.valueOf(
+                  r.getDatanodeDetails().getPersistedOpState(),
+                  HddsProtos.NodeState.HEALTHY);
+            }
+          }
+          throw new NodeNotFoundException(dn.getID());
+        });
+
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+        ReplicationManagerUtil.getExcludedAndUsedNodes(container,
+            new ArrayList<>(replicas), toBeRemoved, pending,
+            replicationManager);
+
+    assertEquals(3, excludedAndUsedNodes.getUsedNodes().size());
+    
assertThat(excludedAndUsedNodes.getUsedNodes()).contains(good.getDatanodeDetails());
+    
assertThat(excludedAndUsedNodes.getUsedNodes()).contains(maintenance.getDatanodeDetails());
+    assertThat(excludedAndUsedNodes.getUsedNodes()).contains(pendingAdd);
+
+    assertEquals(5, excludedAndUsedNodes.getExcludedNodes().size());
+    
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(unhealthy.getDatanodeDetails());
+    
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(decommissioning.getDatanodeDetails());
+    
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(remove.getDatanodeDetails());
+    
assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(pendingDelete);
+    assertThat(excludedAndUsedNodes.getExcludedNodes()).contains(fullDn);
+  }
+
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index cb773004b0..09c54fd937 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -245,7 +245,8 @@ public 
ReconStorageContainerManagerFacade(OzoneConfiguration conf,
         scmhaManager,
         scmContext);
     ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
-        Clock.system(ZoneId.systemDefault()));
+        Clock.system(ZoneId.systemDefault()),
+        
conf.getObject(ReplicationManager.ReplicationManagerConfiguration.class));
     this.containerManager = new ReconContainerManager(conf,
         dbStore,
         ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to