This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new c531e7519a HDDS-12439. [DiskBalancer] Reserve committed space during 
volume choosing process (#8882)
c531e7519a is described below

commit c531e7519a0206367dd33c6409966a322ca4fe61
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Aug 8 18:04:22 2025 +0800

    HDDS-12439. [DiskBalancer] Reserve committed space during volume choosing 
process (#8882)
---
 .../ozone/container/common/interfaces/Handler.java |   5 +-
 .../container/common/utils/HddsVolumeUtil.java     |  25 --
 .../common/volume/AvailableSpaceFilter.java        |   2 +-
 .../volume/CapacityVolumeChoosingPolicy.java       |  86 +++--
 .../volume/RoundRobinVolumeChoosingPolicy.java     |  46 ++-
 .../container/common/volume/StorageVolume.java     |  10 +-
 .../common/volume/VolumeChoosingPolicyFactory.java |  13 +-
 .../diskbalancer/DiskBalancerService.java          | 230 ++++++------
 .../diskbalancer/DiskBalancerServiceMetrics.java   |   4 +-
 .../policy/DefaultContainerChoosingPolicy.java     |  11 +-
 .../policy/DefaultVolumeChoosingPolicy.java        |  83 +++--
 .../policy/DiskBalancerVolumeChoosingPolicy.java   |   3 +-
 .../container/keyvalue/KeyValueContainer.java      |  40 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  27 +-
 .../container/ozoneimpl/ContainerController.java   |  12 +-
 .../diskbalancer/TestDiskBalancerService.java      |   3 +-
 .../diskbalancer/TestDiskBalancerTask.java         | 412 ++++++++++++++++-----
 .../ozone/scm/node/TestVolumeChoosingPolicy.java   |  21 +-
 18 files changed, 679 insertions(+), 354 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index b8a40fff24..e36940a509 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -256,9 +256,8 @@ public abstract void copyContainer(
       throws IOException;
 
   /**
-   * Imports container from a container path.
+   * Imports container from a container which is under the temp directory.
    */
-  public abstract Container importContainer(
-      ContainerData containerData, Path containerPath) throws IOException;
+  public abstract Container importContainer(ContainerData 
targetTempContainerData) throws IOException;
 }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
index cf2c548ee5..6ea9e87541 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
@@ -26,7 +26,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.volume.DbVolume;
@@ -140,28 +139,4 @@ private static void mapDbVolumesToDataVolumesIfNeeded(
         hddsVolume.setDbVolume(globalDbVolumeMap.getOrDefault(
             hddsVolume.getStorageID(), null)));
   }
-
-  /**
-   * Get the HddsVolume according to the path.
-   * @param volumes volume list to match from
-   * @param pathStr path to match
-   */
-  public static HddsVolume matchHddsVolume(List<HddsVolume> volumes,
-      String pathStr) throws IOException {
-    assert pathStr != null;
-    List<HddsVolume> resList = new ArrayList<>();
-    for (HddsVolume hddsVolume: volumes) {
-      if (pathStr.startsWith(hddsVolume.getVolumeRootDir())) {
-        resList.add(hddsVolume);
-      }
-    }
-    if (resList.size() == 1) {
-      return resList.get(0);
-    } else if (resList.size() > 1) {
-      throw new IOException("Get multi volumes " +
-          resList.stream().map(HddsVolume::getVolumeRootDir).collect(
-              Collectors.joining(",")) + " matching path " + pathStr);
-    }
-    return null;
-  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
index 6769a8eeae..bbd2bc9751 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
@@ -32,7 +32,7 @@ public class AvailableSpaceFilter implements 
Predicate<HddsVolume> {
   private final List<StorageLocationReport> fullVolumes = new LinkedList<>();
   private long mostAvailableSpace = Long.MIN_VALUE;
 
-  AvailableSpaceFilter(long requiredSpace) {
+  public AvailableSpaceFilter(long requiredSpace) {
     this.requiredSpace = requiredSpace;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
index e323eeb4b1..08b64327b0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
@@ -20,9 +20,11 @@
 import static 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.logIfSomeVolumesOutOfSpace;
 import static 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.throwDiskOutOfSpace;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -43,9 +45,20 @@ public class CapacityVolumeChoosingPolicy implements 
VolumeChoosingPolicy {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       CapacityVolumeChoosingPolicy.class);
+  private final ReentrantLock lock;
+
+  public CapacityVolumeChoosingPolicy(ReentrantLock globalLock) {
+    lock = globalLock;
+  }
+
+  // only for testing purposes
+  @VisibleForTesting
+  public CapacityVolumeChoosingPolicy() {
+    lock = new ReentrantLock();
+  }
 
   @Override
-  public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
+  public HddsVolume chooseVolume(List<HddsVolume> volumes,
       long maxContainerSize) throws IOException {
 
     // No volumes available to choose from
@@ -55,43 +68,48 @@ public synchronized HddsVolume 
chooseVolume(List<HddsVolume> volumes,
 
     AvailableSpaceFilter filter = new AvailableSpaceFilter(maxContainerSize);
 
-    List<HddsVolume> volumesWithEnoughSpace = volumes.stream()
-        .filter(filter)
-        .collect(Collectors.toList());
+    lock.lock();
+    try {
+      List<HddsVolume> volumesWithEnoughSpace = volumes.stream()
+          .filter(filter)
+          .collect(Collectors.toList());
 
-    if (volumesWithEnoughSpace.isEmpty()) {
-      throwDiskOutOfSpace(filter, LOG);
-    } else {
-      logIfSomeVolumesOutOfSpace(filter, LOG);
-    }
+      if (volumesWithEnoughSpace.isEmpty()) {
+        throwDiskOutOfSpace(filter, LOG);
+      } else {
+        logIfSomeVolumesOutOfSpace(filter, LOG);
+      }
 
-    int count = volumesWithEnoughSpace.size();
-    HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
-    if (count > 1) {
-      // Even if we don't have too many volumes in volumesWithEnoughSpace, this
-      // algorithm will still help us choose the volume with larger
-      // available space than other volumes.
-      // Say we have vol1 with more available space than vol2, for two choices,
-      // the distribution of possibility is as follows:
-      // 1. vol1 + vol2: 25%, result is vol1
-      // 2. vol1 + vol1: 25%, result is vol1
-      // 3. vol2 + vol1: 25%, result is vol1
-      // 4. vol2 + vol2: 25%, result is vol2
-      // So we have a total of 75% chances to choose vol1, which meets our
-      // expectation.
-      int firstIndex = ThreadLocalRandom.current().nextInt(count);
-      int secondIndex = ThreadLocalRandom.current().nextInt(count);
+      int count = volumesWithEnoughSpace.size();
+      HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
+      if (count > 1) {
+        // Even if we don't have too many volumes in volumesWithEnoughSpace, 
this
+        // algorithm will still help us choose the volume with larger
+        // available space than other volumes.
+        // Say we have vol1 with more available space than vol2, for two 
choices,
+        // the distribution of possibility is as follows:
+        // 1. vol1 + vol2: 25%, result is vol1
+        // 2. vol1 + vol1: 25%, result is vol1
+        // 3. vol2 + vol1: 25%, result is vol1
+        // 4. vol2 + vol2: 25%, result is vol2
+        // So we have a total of 75% chances to choose vol1, which meets our
+        // expectation.
+        int firstIndex = ThreadLocalRandom.current().nextInt(count);
+        int secondIndex = ThreadLocalRandom.current().nextInt(count);
 
-      HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
-      HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
+        HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
+        HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
 
-      long firstAvailable = firstVolume.getCurrentUsage().getAvailable()
-          - firstVolume.getCommittedBytes();
-      long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
-          - secondVolume.getCommittedBytes();
-      selectedVolume = firstAvailable < secondAvailable ? secondVolume : 
firstVolume;
+        long firstAvailable = firstVolume.getCurrentUsage().getAvailable()
+            - firstVolume.getCommittedBytes();
+        long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
+            - secondVolume.getCommittedBytes();
+        selectedVolume = firstAvailable < secondAvailable ? secondVolume : 
firstVolume;
+      }
+      selectedVolume.incCommittedBytes(maxContainerSize);
+      return selectedVolume;
+    } finally {
+      lock.unlock();
     }
-    selectedVolume.incCommittedBytes(maxContainerSize);
-    return selectedVolume;
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
index 52c8c59970..ff6f571300 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
@@ -20,8 +20,10 @@
 import static 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.logIfSomeVolumesOutOfSpace;
 import static 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.throwDiskOutOfSpace;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.slf4j.Logger;
@@ -38,9 +40,20 @@ public class RoundRobinVolumeChoosingPolicy implements 
VolumeChoosingPolicy {
 
   // Stores the index of the next volume to be returned.
   private int nextVolumeIndex = 0;
+  private final ReentrantLock lock;
+
+  public RoundRobinVolumeChoosingPolicy(ReentrantLock globalLock) {
+    lock = globalLock;
+  }
+
+  // only for testing purposes
+  @VisibleForTesting
+  public RoundRobinVolumeChoosingPolicy() {
+    lock = new ReentrantLock();
+  }
 
   @Override
-  public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
+  public HddsVolume chooseVolume(List<HddsVolume> volumes,
       long maxContainerSize) throws IOException {
 
     // No volumes available to choose from
@@ -56,23 +69,28 @@ public synchronized HddsVolume 
chooseVolume(List<HddsVolume> volumes,
 
     int startVolumeIndex = currentVolumeIndex;
 
-    while (true) {
-      final HddsVolume volume = volumes.get(currentVolumeIndex);
-      // adjust for remaining capacity in Open containers
-      boolean hasEnoughSpace = filter.test(volume);
+    lock.lock();
+    try {
+      while (true) {
+        final HddsVolume volume = volumes.get(currentVolumeIndex);
+        // adjust for remaining capacity in Open containers
+        boolean hasEnoughSpace = filter.test(volume);
 
-      currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
+        currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
 
-      if (hasEnoughSpace) {
-        logIfSomeVolumesOutOfSpace(filter, LOG);
-        nextVolumeIndex = currentVolumeIndex;
-        volume.incCommittedBytes(maxContainerSize);
-        return volume;
-      }
+        if (hasEnoughSpace) {
+          logIfSomeVolumesOutOfSpace(filter, LOG);
+          nextVolumeIndex = currentVolumeIndex;
+          volume.incCommittedBytes(maxContainerSize);
+          return volume;
+        }
 
-      if (currentVolumeIndex == startVolumeIndex) {
-        throwDiskOutOfSpace(filter, LOG);
+        if (currentVolumeIndex == startVolumeIndex) {
+          throwDiskOutOfSpace(filter, LOG);
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 3ef4008463..2ebfee6e20 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -87,8 +87,8 @@ public abstract class StorageVolume implements 
Checkable<Boolean, VolumeCheckRes
   private long cTime;             // creation time of the file system state
   private int layoutVersion;      // layout version of the storage data
 
-  private final ConfigurationSource conf;
-  private final DatanodeConfiguration dnConf;
+  private ConfigurationSource conf;
+  private DatanodeConfiguration dnConf;
 
   private final StorageType storageType;
   private final String volumeRoot;
@@ -578,6 +578,12 @@ public ConfigurationSource getConf() {
     return conf;
   }
 
+  @VisibleForTesting
+  public void setConf(ConfigurationSource newConf) {
+    this.conf = newConf;
+    this.dnConf = newConf.getObject(DatanodeConfiguration.class);
+  }
+
   public DatanodeConfiguration getDatanodeConfig() {
     return dnConf;
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
index 28ad10a158..ae2eafa735 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
@@ -19,9 +19,12 @@
 
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
 import org.apache.ratis.util.ReflectionUtils;
 
 /**
@@ -32,6 +35,8 @@ public final class VolumeChoosingPolicyFactory {
 
   private static final Class<? extends VolumeChoosingPolicy>
       DEFAULT_VOLUME_CHOOSING_POLICY = CapacityVolumeChoosingPolicy.class;
+  // a lock to coordinate space reservation between multiple policies and 
threads
+  private static final ReentrantLock LOCK = new ReentrantLock();
 
   private VolumeChoosingPolicyFactory() {
   }
@@ -40,6 +45,12 @@ public static VolumeChoosingPolicy 
getPolicy(ConfigurationSource conf) {
     Class<? extends VolumeChoosingPolicy> policyClass = conf.getClass(
         HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
         DEFAULT_VOLUME_CHOOSING_POLICY, VolumeChoosingPolicy.class);
-    return ReflectionUtils.newInstance(policyClass);
+    return ReflectionUtils.newInstance(policyClass, new Class<?>[] 
{ReentrantLock.class}, LOCK);
+  }
+
+  public static DiskBalancerVolumeChoosingPolicy 
getDiskBalancerPolicy(ConfigurationSource conf) {
+    Class<?> policyClass = 
conf.getObject(DiskBalancerConfiguration.class).getVolumeChoosingPolicyClass();
+    return (DiskBalancerVolumeChoosingPolicy) ReflectionUtils.newInstance(
+        policyClass, new Class<?>[]{ReentrantLock.class}, LOCK);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 724ec8f9ff..b01217b74b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.container.diskbalancer;
 
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -35,16 +37,20 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -54,6 +60,7 @@
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -96,6 +103,7 @@ public class DiskBalancerService extends BackgroundService {
   private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
 
   private Set<Long> inProgressContainers;
+  private static FaultInjector injector;
 
   /**
    * A map that tracks the total bytes which will be freed from each source 
volume
@@ -115,6 +123,7 @@ public class DiskBalancerService extends BackgroundService {
 
   private DiskBalancerServiceMetrics metrics;
   private long bytesToMove;
+  private long containerDefaultSize;
 
   /**
    * Defines the operational states of the DiskBalancerService.
@@ -156,11 +165,12 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
     inProgressContainers = ConcurrentHashMap.newKeySet();
     deltaSizes = new ConcurrentHashMap<>();
     volumeSet = ozoneContainer.getVolumeSet();
+    containerDefaultSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
 
     try {
-      volumeChoosingPolicy = (DiskBalancerVolumeChoosingPolicy)
-          conf.getObject(DiskBalancerConfiguration.class)
-          .getVolumeChoosingPolicyClass().newInstance();
+      volumeChoosingPolicy = 
VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf);
       containerChoosingPolicy = (ContainerChoosingPolicy)
           conf.getObject(DiskBalancerConfiguration.class)
               .getContainerChoosingPolicyClass().newInstance();
@@ -387,7 +397,7 @@ public BackgroundTaskQueue getTasks() {
 
     for (int i = 0; i < availableTaskCount; i++) {
       Pair<HddsVolume, HddsVolume> pair = volumeChoosingPolicy
-          .chooseVolume(volumeSet, threshold, deltaSizes);
+          .chooseVolume(volumeSet, threshold, deltaSizes, 
containerDefaultSize);
       if (pair == null) {
         continue;
       }
@@ -401,7 +411,9 @@ public BackgroundTaskQueue getTasks() {
         inProgressContainers.add(toBalanceContainer.getContainerID());
         deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
             - toBalanceContainer.getBytesUsed());
-        destVolume.incCommittedBytes(toBalanceContainer.getBytesUsed());
+      } else {
+        // release destVolume committed bytes
+        destVolume.incCommittedBytes(0 - containerDefaultSize);
       }
     }
 
@@ -468,109 +480,98 @@ public BackgroundTaskResult call() {
       long startTime = Time.monotonicNow();
       boolean moveSucceeded = true;
       long containerId = containerData.getContainerID();
-      boolean destVolumeIncreased = false;
+      Container container = 
ozoneContainer.getContainerSet().getContainer(containerId);
+      boolean readLockReleased = false;
       Path diskBalancerTmpDir = null, diskBalancerDestDir = null;
       long containerSize = containerData.getBytesUsed();
-      String originalContainerChecksum = 
containerData.getContainerFileChecksum();
+      if (container == null) {
+        LOG.warn("Container " + containerId + " doesn't exist in 
ContainerSet");
+        postCall(false, startTime);
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
+      }
+      // hold read lock on the container first, to avoid other threads to 
update the container state,
+      // such as block deletion.
+      container.readLock();
       try {
         // Step 1: Copy container to new Volume's tmp Dir
         diskBalancerTmpDir = destVolume.getTmpDir().toPath()
             .resolve(DISK_BALANCER_DIR).resolve(String.valueOf(containerId));
-        ozoneContainer.getController().copyContainer(containerData,
-            diskBalancerTmpDir);
+        ozoneContainer.getController().copyContainer(containerData, 
diskBalancerTmpDir);
 
         // Step 2: verify checksum and Transition Temp container to Temp 
C1-RECOVERING
-        File tempContainerFile = ContainerUtils.getContainerFile(
-            diskBalancerTmpDir.toFile());
+        File tempContainerFile = 
ContainerUtils.getContainerFile(diskBalancerTmpDir.toFile());
         if (!tempContainerFile.exists()) {
           throw new IOException("ContainerFile for container " + containerId
-              + " doesn't exist in temp directory "
-              + tempContainerFile.getAbsolutePath());
-        }
-        ContainerData tempContainerData = ContainerDataYaml
-            .readContainerFile(tempContainerFile);
-        String copiedContainerChecksum = 
tempContainerData.getContainerFileChecksum();
-        if (!originalContainerChecksum.equals(copiedContainerChecksum)) {
-          throw new IOException("Container checksum mismatch for container "
-              + containerId + ". Original: " + originalContainerChecksum
-              + ", Copied: " + copiedContainerChecksum);
+              + " doesn't exist in temp directory " + 
tempContainerFile.getAbsolutePath());
         }
+        ContainerData tempContainerData = 
ContainerDataYaml.readContainerFile(tempContainerFile);
+        ContainerUtils.verifyContainerFileChecksum(tempContainerData, conf);
+        // Before move the container directory to final place, the destination 
dir is empty and doesn't have
+        // a metadata directory. Writing the .container file will fail as the 
metadata dir doesn't exist.
+        // So we instead save the container file to the diskBalancerTmpDir.
+        ContainerProtos.ContainerDataProto.State originalState = 
tempContainerData.getState();
         
tempContainerData.setState(ContainerProtos.ContainerDataProto.State.RECOVERING);
-
+        // update tempContainerData volume to point to destVolume
+        tempContainerData.setVolume(destVolume);
         // overwrite the .container file with the new state.
         ContainerDataYaml.createContainerFile(tempContainerData, 
tempContainerFile);
+        // reset to original state
+        tempContainerData.setState(originalState);
 
-        // Step 3: Move container directory to final place on new volume
+        // Step 3: Move container directory to final place on new volume and 
import
         String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
             destVolume, destVolume.getClusterID());
         diskBalancerDestDir =
             Paths.get(KeyValueContainerLocationUtil.getBaseContainerLocation(
                 destVolume.getHddsRootDir().toString(), idDir,
                 containerData.getContainerID()));
-        Path destDirParent = diskBalancerDestDir.getParent();
-        if (destDirParent != null) {
-          Files.createDirectories(destDirParent);
+        if (!Files.exists(diskBalancerDestDir)) {
+          Files.createDirectories(diskBalancerDestDir);
         }
-        Files.move(diskBalancerTmpDir, diskBalancerDestDir,
-            StandardCopyOption.ATOMIC_MOVE,
-            StandardCopyOption.REPLACE_EXISTING);
-
-        // Generate a new Container based on destDir which is in C1-RECOVERING 
state.
-        File containerFile = ContainerUtils.getContainerFile(
-            diskBalancerDestDir.toFile());
-        if (!containerFile.exists()) {
-          throw new IOException("ContainerFile for container " + containerId
-          + " doesn't exists.");
-        }
-        ContainerData originalContainerData = ContainerDataYaml
-            .readContainerFile(containerFile);
-        Container newContainer = ozoneContainer.getController()
-            .importContainer(originalContainerData, diskBalancerDestDir);
-        newContainer.getContainerData().getVolume()
-            .incrementUsedSpace(containerSize);
-        destVolumeIncreased = true;
-
-        // The import process loaded the temporary RECOVERING state from disk.
-        // Now, restore the original state and persist it to the .container 
file.
-        newContainer.getContainerData().setState(containerData.getState());
-        newContainer.update(newContainer.getContainerData().getMetadata(), 
true);
-
-        // Step 5: Update container for containerID and delete old container.
-        Container oldContainer = ozoneContainer.getContainerSet()
-            .getContainer(containerId);
-        oldContainer.writeLock();
-        try {
-          // First, update the in-memory set to point to the new replica.
-          ozoneContainer.getContainerSet().updateContainer(newContainer);
 
-          // Mark old container as DELETED and persist state.
-          oldContainer.getContainerData().setState(
-              ContainerProtos.ContainerDataProto.State.DELETED);
-          oldContainer.update(oldContainer.getContainerData().getMetadata(),
-              true);
-
-          // Remove the old container from the KeyValueContainerUtil.
-          try {
-            KeyValueContainerUtil.removeContainer(
-                (KeyValueContainerData) oldContainer.getContainerData(), conf);
-            oldContainer.delete();
-          } catch (IOException ex) {
-            LOG.warn("Failed to cleanup old container {} after move. It is " +
-                    "marked DELETED and will be handled by background 
scanners.",
-                containerId, ex);
-          }
-        } finally {
-          oldContainer.writeUnlock();
+        if (FileUtils.isEmptyDirectory(diskBalancerDestDir.toFile())) {
+          Files.move(diskBalancerTmpDir, diskBalancerDestDir,
+              StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+        } else {
+          String errorMessage = "Container " + containerId +
+              " move failed because Container Directory " +
+              diskBalancerDestDir.toAbsolutePath() + " already exists and are 
not empty";
+          throw new StorageContainerException(errorMessage, 
CONTAINER_ALREADY_EXISTS);
         }
 
-        //The move is now successful.
-        oldContainer.getContainerData().getVolume()
-            .decrementUsedSpace(containerSize);
+        // Import the container. importContainer will reset container back to 
original state
+        Container newContainer = 
ozoneContainer.getController().importContainer(tempContainerData);
+
+        // Step 4: Update container for containerID and mark old container for 
deletion
+        // first, update the in-memory set to point to the new replica.
+        // After this update, new caller will get the new Container object,
+        // old caller can still hold the old Container object.
+        ozoneContainer.getContainerSet().updateContainer(newContainer);
+        destVolume.incrementUsedSpace(containerSize);
+        // Mark old container as DELETED and persist state.
+        // markContainerForDelete require writeLock, so release readLock first
+        container.readUnlock();
+        readLockReleased = true;
+        try {
+          container.markContainerForDelete();
+        } catch (Throwable e) {
+          // mark container for deleted failure will not fail the whole 
process, it will leave both old and new replica
+          // on disk, while new container in the ContainerSet.
+          LOG.warn("Failed to mark the old container {} for delete. " +
+              "It will be handled after DN restart.", containerId, e);
+        }
+        // The move is now successful.
         balancedBytesInLastWindow.addAndGet(containerSize);
-        metrics.incrSuccessCount(1);
         metrics.incrSuccessBytes(containerSize);
         totalBalancedBytes.addAndGet(containerSize);
       } catch (IOException e) {
+        if (injector != null) {
+          try {
+            injector.pause();
+          } catch (IOException ex) {
+            // do nothing
+          }
+        }
         moveSucceeded = false;
         LOG.warn("Failed to move container {}", containerId, e);
         if (diskBalancerTmpDir != null) {
@@ -578,35 +579,37 @@ public BackgroundTaskResult call() {
             File dir = new File(String.valueOf(diskBalancerTmpDir));
             FileUtils.deleteDirectory(dir);
           } catch (IOException ex) {
-            LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir,
-                ex);
+            LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir, 
ex);
           }
         }
-        if (diskBalancerDestDir != null) {
+        if (diskBalancerDestDir != null && e instanceof 
StorageContainerException
+            && ((StorageContainerException) e).getResult() != 
CONTAINER_ALREADY_EXISTS) {
           try {
             File dir = new File(String.valueOf(diskBalancerDestDir));
             FileUtils.deleteDirectory(dir);
           } catch (IOException ex) {
-            LOG.warn("Failed to delete dest directory {}",
-                diskBalancerDestDir, ex);
+            LOG.warn("Failed to delete dest directory {}", 
diskBalancerDestDir, ex);
           }
         }
-        // Only need to check for destVolume, sourceVolume's usedSpace is
-        // updated at last, if it reaches there, there is no exception.
-        if (destVolumeIncreased) {
-          destVolume.decrementUsedSpace(containerSize);
-        }
-        metrics.incrFailureCount();
       } finally {
-        long endTime = Time.monotonicNow();
+        if (!readLockReleased) {
+          container.readUnlock();
+        }
         if (moveSucceeded) {
-          metrics.getMoveSuccessTime().add(endTime - startTime);
+          // Remove the old container from the KeyValueContainerUtil.
+          try {
+            KeyValueContainerUtil.removeContainer(
+                (KeyValueContainerData) container.getContainerData(), conf);
+            container.delete();
+            
container.getContainerData().getVolume().decrementUsedSpace(containerSize);
+          } catch (IOException ex) {
+            LOG.warn("Failed to move or delete old container {} after it's 
marked as DELETED. " +
+                    "It will be handled by background scanners.", containerId, 
ex);
+          }
           ContainerLogger.logMoveSuccess(containerId, sourceVolume,
-              destVolume, containerSize, endTime - startTime);
-        } else {
-          metrics.getMoveFailureTime().add(endTime - startTime);
+              destVolume, containerSize, Time.monotonicNow() - startTime);
         }
-        postCall();
+        postCall(moveSucceeded, startTime);
       }
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
@@ -616,11 +619,19 @@ public int getPriority() {
       return BackgroundTask.super.getPriority();
     }
 
-    private void postCall() {
+    private void postCall(boolean success, long startTime) {
       inProgressContainers.remove(containerData.getContainerID());
       deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
           containerData.getBytesUsed());
-      destVolume.incCommittedBytes(-containerData.getBytesUsed());
+      destVolume.incCommittedBytes(0 - containerDefaultSize);
+      long endTime = Time.monotonicNow();
+      if (success) {
+        metrics.incrSuccessCount(1);
+        metrics.getMoveSuccessTime().add(endTime - startTime);
+      } else {
+        metrics.incrFailureCount(1);
+        metrics.getMoveFailureTime().add(endTime - startTime);
+      }
     }
   }
 
@@ -669,10 +680,6 @@ private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
         .resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR);
   }
 
-  public boolean isBalancingContainer(long containerId) {
-    return inProgressContainers.contains(containerId);
-  }
-
   public DiskBalancerServiceMetrics getMetrics() {
     return metrics;
   }
@@ -690,15 +697,6 @@ public DiskBalancerVolumeChoosingPolicy 
getVolumeChoosingPolicy() {
     return volumeChoosingPolicy;
   }
 
-  @VisibleForTesting
-  public DiskBalancerTask createDiskBalancerTask(ContainerData containerData, 
HddsVolume source, HddsVolume dest) {
-    inProgressContainers.add(containerData.getContainerID());
-    deltaSizes.put(source, deltaSizes.getOrDefault(source, 0L)
-        - containerData.getBytesUsed());
-    dest.incCommittedBytes(containerData.getBytesUsed());
-    return new DiskBalancerTask(containerData, source, dest);
-  }
-
   @VisibleForTesting
   public void setVolumeChoosingPolicy(DiskBalancerVolumeChoosingPolicy 
volumeChoosingPolicy) {
     this.volumeChoosingPolicy = volumeChoosingPolicy;
@@ -714,6 +712,11 @@ public Set<Long> getInProgressContainers() {
     return inProgressContainers;
   }
 
+  @VisibleForTesting
+  public Map<HddsVolume, Long> getDeltaSizes() {
+    return deltaSizes;
+  }
+
   /**
    * Handle state changes for DiskBalancerService.
    */
@@ -756,4 +759,9 @@ public void shutdown() {
       DiskBalancerServiceMetrics.unRegister();
     }
   }
+
+  @VisibleForTesting
+  public static void setInjector(FaultInjector instance) {
+    injector = instance;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
index 38e8df7dec..5b7ee19ba4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
@@ -89,8 +89,8 @@ public void incrSuccessBytes(long bytes) {
     this.successBytes.incr(bytes);
   }
 
-  public void incrFailureCount() {
-    this.failureCount.incr();
+  public void incrFailureCount(long count) {
+    this.failureCount.incr(count);
   }
 
   public void incrRunningLoopCount() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
index f75b94ad16..76a0f30c14 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
@@ -19,6 +19,7 @@
 
 import static java.util.concurrent.TimeUnit.HOURS;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.util.Iterator;
@@ -42,6 +43,9 @@ public class DefaultContainerChoosingPolicy implements 
ContainerChoosingPolicy {
       ThreadLocal.withInitial(
           () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, 
HOURS).build());
 
+  // for test
+  private static boolean test = false;
+
   @Override
   public ContainerData chooseContainer(OzoneContainer ozoneContainer,
       HddsVolume hddsVolume, Set<Long> inProgressContainerIDs) {
@@ -57,7 +61,7 @@ public ContainerData chooseContainer(OzoneContainer 
ozoneContainer,
     while (itr.hasNext()) {
       ContainerData containerData = itr.next().getContainerData();
       if (!inProgressContainerIDs.contains(
-          containerData.getContainerID()) && containerData.isClosed()) {
+          containerData.getContainerID()) && (containerData.isClosed() || 
(test && containerData.isQuasiClosed()))) {
         return containerData;
       }
     }
@@ -67,4 +71,9 @@ public ContainerData chooseContainer(OzoneContainer 
ozoneContainer,
     }
     return null;
   }
+
+  @VisibleForTesting
+  public static void setTest(boolean isTest) {
+    test = isTest;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
index d867b3b0f2..cba87740f7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
@@ -19,16 +19,18 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Choose a random volume for balancing.
+ * Choose a random volume for disk balancing.
  *
  * Source volumes use deltaMap to simulate space that will be freed 
(pre-deleted).
  * Destination volumes use committedBytes to account for space already 
reserved.
@@ -38,37 +40,64 @@ public class DefaultVolumeChoosingPolicy implements 
DiskBalancerVolumeChoosingPo
 
   public static final Logger LOG = LoggerFactory.getLogger(
       DefaultVolumeChoosingPolicy.class);
+  private final ReentrantLock lock;
+
+  public DefaultVolumeChoosingPolicy(ReentrantLock globalLock) {
+    lock = globalLock;
+  }
 
   @Override
   public Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
-      double threshold, Map<HddsVolume, Long> deltaMap) {
-    double idealUsage = volumeSet.getIdealUsage();
+      double threshold, Map<HddsVolume, Long> deltaMap, long containerSize) {
+    lock.lock();
+    try {
+      double idealUsage = volumeSet.getIdealUsage();
 
-    // Threshold is given as a percentage
-    double normalizedThreshold = threshold / 100;
-    List<HddsVolume> volumes = StorageVolumeUtil
-        .getHddsVolumesList(volumeSet.getVolumesList())
-        .stream()
-        .filter(volume ->
-            Math.abs(
-                ((double)((volume.getCurrentUsage().getCapacity() - 
volume.getCurrentUsage().getAvailable())
-                    + deltaMap.getOrDefault(volume, 0L) + 
volume.getCommittedBytes()))
-                    / volume.getCurrentUsage().getCapacity() - idealUsage) >= 
normalizedThreshold)
-        .sorted((v1, v2) ->
-            Double.compare(
-                (double) ((v2.getCurrentUsage().getCapacity() - 
v2.getCurrentUsage().getAvailable())
-                    + deltaMap.getOrDefault(v2, 0L) + v2.getCommittedBytes()) /
-                    v2.getCurrentUsage().getCapacity(),
-                (double) ((v1.getCurrentUsage().getCapacity() - 
v1.getCurrentUsage().getAvailable())
-                    + deltaMap.getOrDefault(v1, 0L) + v1.getCommittedBytes()) /
-                    v1.getCurrentUsage().getCapacity()))
-        .collect(Collectors.toList());
+      // Threshold is given as a percentage
+      double normalizedThreshold = threshold / 100;
+      List<HddsVolume> volumes = StorageVolumeUtil
+          .getHddsVolumesList(volumeSet.getVolumesList())
+          .stream()
+          .filter(volume ->
+              Math.abs(
+                  ((double)((volume.getCurrentUsage().getCapacity() - 
volume.getCurrentUsage().getAvailable())
+                      + deltaMap.getOrDefault(volume, 0L) + 
volume.getCommittedBytes()))
+                      / volume.getCurrentUsage().getCapacity() - idealUsage) 
>= normalizedThreshold)
+          .sorted((v1, v2) ->
+              Double.compare(
+                  (double) ((v2.getCurrentUsage().getCapacity() - 
v2.getCurrentUsage().getAvailable())
+                      + deltaMap.getOrDefault(v2, 0L) + 
v2.getCommittedBytes()) /
+                      v2.getCurrentUsage().getCapacity(),
+                  (double) ((v1.getCurrentUsage().getCapacity() - 
v1.getCurrentUsage().getAvailable())
+                      + deltaMap.getOrDefault(v1, 0L) + 
v1.getCommittedBytes()) /
+                      v1.getCurrentUsage().getCapacity()))
+          .collect(Collectors.toList());
 
-    // Can not generate DiskBalancerTask if we have less than 2 results
-    if (volumes.size() <= 1) {
-      LOG.debug("Can not find appropriate Source volume and Dest Volume.");
-      return null;
+      // Can not generate DiskBalancerTask if we have less than 2 results
+      if (volumes.size() <= 1) {
+        LOG.debug("Can not find appropriate Source volume and Dest Volume.");
+        return null;
+      }
+      AvailableSpaceFilter filter = new AvailableSpaceFilter(containerSize);
+      HddsVolume srcVolume = volumes.get(0);
+      HddsVolume destVolume = volumes.get(volumes.size() - 1);
+      while (!filter.test(destVolume)) {
+        // If the destination volume does not have enough space, try the next
+        // one in the list.
+        LOG.debug("Destination volume {} does not have enough space, trying 
next volume.",
+            destVolume.getStorageID());
+        volumes.remove(destVolume);
+        if (volumes.size() <= 1) {
+          LOG.debug("Can not find appropriate Source volume and Dest Volume.");
+          return null;
+        }
+        destVolume = volumes.get(volumes.size() - 1);
+      }
+      // reserve space for the dest volume
+      destVolume.incCommittedBytes(containerSize);
+      return Pair.of(srcVolume, destVolume);
+    } finally {
+      lock.unlock();
     }
-    return Pair.of(volumes.get(0), volumes.get(volumes.size() - 1));
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
index 7bb805f464..01520fea37 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
@@ -32,8 +32,9 @@ public interface DiskBalancerVolumeChoosingPolicy {
    * @param volumeSet - volumes to choose from.
    * @param threshold - the threshold to choose source and dest volumes.
    * @param deltaSizes - the sizes changes of inProgress balancing jobs.
+   * @param containerSize - the estimated size of container to be moved.
    * @return Source volume and Dest volume.
    */
   Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
-      double threshold, Map<HddsVolume, Long> deltaSizes);
+      double threshold, Map<HddsVolume, Long> deltaSizes, long containerSize);
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 3b952c5952..b3ba31da8e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -64,6 +64,7 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -120,6 +121,7 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   private Set<Long> pendingPutBlockCache;
 
   private boolean bCheckChunksFilePath;
+  private static FaultInjector faultInjector;
 
   public KeyValueContainer(KeyValueContainerData containerData,
       ConfigurationSource ozoneConfig) {
@@ -934,7 +936,7 @@ public DataScanResult scanData(DataTransferThrottler 
throttler, Canceler cancele
 
   @Override
   public void copyContainerData(Path destination) throws IOException {
-    writeLock();
+    readLock();
     try {
       // Closed/ Quasi closed containers are considered for replication by
       // replication manager if they are under-replicated.
@@ -948,16 +950,11 @@ public void copyContainerData(Path destination) throws 
IOException {
                 " is in state " + state);
       }
 
-      try {
-        if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
-          compactDB();
-          // Close DB (and remove from cache) to avoid concurrent modification
-          // while copying it.
-          BlockUtils.removeDB(containerData, config);
-        }
-      } finally {
-        readLock();
-        writeUnlock();
+      if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+        compactDB();
+        // Close DB (and remove from cache) to avoid concurrent modification
+        // while copying it.
+        BlockUtils.removeDB(containerData, config);
       }
 
       if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
@@ -972,19 +969,28 @@ public void copyContainerData(Path destination) throws 
IOException {
       } else {
         copyContainerToDestination(destination);
       }
-    } catch (Exception e) {
+      if (getInjector() != null && getInjector().getException() != null) {
+        throw new IOException("Fault injection", getInjector().getException());
+      }
+    } catch (IOException e) {
       LOG.error("Got exception when copying container {} to {}",
           containerData.getContainerID(), destination, e);
       throw e;
     } finally {
-      if (lock.isWriteLockedByCurrentThread()) {
-        writeUnlock();
-      } else {
-        readUnlock();
-      }
+      readUnlock();
     }
   }
 
+  @VisibleForTesting
+  public static FaultInjector getInjector() {
+    return faultInjector;
+  }
+
+  @VisibleForTesting
+  public static void setInjector(FaultInjector instance) {
+    faultInjector = instance;
+  }
+
   /**
    * Set all of the path realted container data fields based on the name
    * conventions.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 622515d0f8..2176338227 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -146,7 +146,6 @@
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
@@ -1329,7 +1328,15 @@ public Container importContainer(ContainerData 
originalContainerData,
     ContainerLogger.logImported(container.getContainerData());
     sendICR(container);
     return container;
+  }
 
+  @Override
+  public Container importContainer(ContainerData targetTempContainerData) 
throws IOException {
+    KeyValueContainer container = createNewContainer(targetTempContainerData);
+    HddsVolume targetVolume = targetTempContainerData.getVolume();
+    populateContainerPathFields(container, targetVolume);
+    container.importContainerData((KeyValueContainerData) 
targetTempContainerData);
+    return container;
   }
 
   @Override
@@ -1557,24 +1564,6 @@ public void copyContainer(final Container container, 
Path destinationPath)
     kvc.copyContainerData(destinationPath);
   }
 
-  @Override
-  public Container importContainer(ContainerData originalContainerData,
-      final Path containerPath) throws IOException {
-    KeyValueContainer container = createNewContainer(originalContainerData);
-
-    HddsVolume volume = HddsVolumeUtil.matchHddsVolume(
-        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
-        containerPath.toString());
-    if (volume == null ||
-        !containerPath.startsWith(volume.getVolumeRootDir())) {
-      throw new IOException("ContainerPath " + containerPath
-          + " doesn't match volume " + volume);
-    }
-    container.populatePathFields(volume, containerPath);
-    container.importContainerData(containerPath);
-    return container;
-  }
-
   private KeyValueContainer createNewContainer(
       ContainerData originalContainerData) {
     Preconditions.checkState(originalContainerData instanceof
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 097d7cfa34..7b13929713 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -211,6 +211,10 @@ public Container importContainer(
         .importContainer(containerData, rawContainerStream, packer);
   }
 
+  public Container importContainer(final ContainerData 
targetTempContainerData) throws IOException {
+    return 
handlers.get(targetTempContainerData.getContainerType()).importContainer(targetTempContainerData);
+  }
+
   public void copyContainer(final ContainerData containerData,
       final Path destinationPath) throws IOException {
     handlers.get(containerData.getContainerType())
@@ -219,14 +223,6 @@ public void copyContainer(final ContainerData 
containerData,
             destinationPath);
   }
 
-  public Container importContainer(
-      final ContainerData containerData,
-      final Path containerPath)
-      throws IOException {
-    return handlers.get(containerData.getContainerType())
-        .importContainer(containerData, containerPath);
-  }
-
   public void exportContainer(final ContainerType type,
       final long containerId, final OutputStream outputStream,
       final TarContainerPacker packer) throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 07d05fcb0b..9bd803ab60 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -23,6 +23,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -289,7 +290,7 @@ public void testConcurrentTasksNotExceedThreadLimit() 
throws Exception {
     when(containerData.getContainerID()).thenAnswer(invocation -> 
System.nanoTime());
     when(containerData.getBytesUsed()).thenReturn(100L);
 
-    when(volumePolicy.chooseVolume(any(), anyDouble(), 
any())).thenReturn(Pair.of(source, dest));
+    when(volumePolicy.chooseVolume(any(), anyDouble(), any(), 
anyLong())).thenReturn(Pair.of(source, dest));
     when(containerPolicy.chooseContainer(any(), any(), 
any())).thenReturn(containerData);
 
     // Test when no tasks are in progress, it should schedule up to the limit
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index 9208eacf09..ee72c347a6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -23,7 +23,9 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -40,16 +42,24 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -60,6 +70,7 @@
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -68,11 +79,16 @@
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.assertj.core.api.Fail;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
@@ -100,6 +116,87 @@ public class TestDiskBalancerTask {
   private static final long CONTAINER_ID = 1L;
   private static final long CONTAINER_SIZE = 1024L * 1024L; // 1 MB
 
+  private final TestFaultInjector kvFaultInjector = new TestFaultInjector();
+
+  /**
+   * A FaultInjector that can be configured to throw an exception on a
+   * specific invocation number. This allows tests to target failure points
+   * that occur after initial checks.
+   */
+  private static class TestFaultInjector extends FaultInjector {
+    private Throwable exception;
+    private int throwOnInvocation = -1; // -1 means never throw
+    private int invocationCount = 0;
+    private CountDownLatch ready;
+    private CountDownLatch wait;
+
+    TestFaultInjector() {
+      init();
+    }
+
+    @Override
+    public void init() {
+      this.ready = new CountDownLatch(1);
+      this.wait = new CountDownLatch(1);
+    }
+
+    @Override
+    public void pause() throws IOException {
+      ready.countDown();
+      try {
+        wait.await();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void resume() throws IOException {
+      // Make sure injector pauses before resuming.
+      try {
+        ready.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assertions.assertTrue(Fail.fail("resume interrupted"));
+      }
+      wait.countDown();
+    }
+
+    /**
+     * Sets an exception to be thrown on a specific invocation.
+     * @param e The exception to throw.
+     * @param onInvocation The invocation number to throw on (e.g., 1 for the
+     * first call, 2 for the second, etc.).
+     */
+    public void setException(Throwable e, int onInvocation) {
+      this.exception = e;
+      this.throwOnInvocation = onInvocation;
+      this.invocationCount = 0; // Reset count for each new test setup
+    }
+
+    @Override
+    public void setException(Throwable e) {
+      // Default to throwing on the first invocation if no number is specified.
+      setException(e, 1);
+    }
+
+    @Override
+    public Throwable getException() {
+      invocationCount++;
+      if (exception != null && invocationCount == throwOnInvocation) {
+        return exception;
+      }
+      return null;
+    }
+
+    @Override
+    public void reset() {
+      this.exception = null;
+      this.throwOnInvocation = -1;
+      this.invocationCount = 0;
+    }
+  }
+
   @BeforeEach
   public void setup() throws Exception {
     testRoot = tmpDir.toFile();
@@ -109,15 +206,28 @@ public void setup() throws Exception {
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
         testRoot.getAbsolutePath() + "/vol1," + testRoot.getAbsolutePath()
             + "/vol2");
+    conf.setClass(SpaceUsageCheckFactory.Conf.configKeyForClassName(),
+        MockSpaceUsageCheckFactory.HalfTera.class,
+        SpaceUsageCheckFactory.class);
     volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
     createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
 
+    List<StorageVolume> volumes = volumeSet.getVolumesList();
+    sourceVolume = (HddsVolume) volumes.get(0);
+    destVolume = (HddsVolume) volumes.get(1);
+
+    // reset volume's usedBytes
+    sourceVolume.incrementUsedSpace(0 - 
sourceVolume.getCurrentUsage().getUsedSpace());
+    destVolume.incrementUsedSpace(0 - 
destVolume.getCurrentUsage().getUsedSpace());
+    
sourceVolume.incrementUsedSpace(sourceVolume.getCurrentUsage().getCapacity() / 
2);
+
     containerSet = ContainerSet.newReadOnlyContainerSet(1000);
     ContainerMetrics containerMetrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
         containerSet, volumeSet, containerMetrics, c -> {
     }, new ContainerChecksumTreeManager(conf));
+    keyValueHandler.setClusterID(scmId);
 
     Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
     handlers.put(ContainerProtos.ContainerType.KeyValueContainer, 
keyValueHandler);
@@ -129,12 +239,13 @@ public void setup() throws Exception {
     when(ozoneContainer.getDispatcher())
         .thenReturn(mock(ContainerDispatcher.class));
 
+    DiskBalancerConfiguration diskBalancerConfiguration = 
conf.getObject(DiskBalancerConfiguration.class);
+    diskBalancerConfiguration.setDiskBalancerShouldRun(true);
+    conf.setFromObject(diskBalancerConfiguration);
     diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer,
         100, conf, 1);
 
-    List<StorageVolume> volumes = volumeSet.getVolumesList();
-    sourceVolume = (HddsVolume) volumes.get(0);
-    destVolume = (HddsVolume) volumes.get(1);
+    KeyValueContainer.setInjector(kvFaultInjector);
   }
 
   @AfterEach
@@ -150,51 +261,85 @@ public void cleanup() throws IOException {
     if (testRoot.exists()) {
       FileUtils.deleteDirectory(testRoot);
     }
+
+    kvFaultInjector.reset();
+    KeyValueContainer.setInjector(null);
+    DiskBalancerService.setInjector(null);
+    DefaultContainerChoosingPolicy.setTest(false);
   }
 
-  @Test
-  public void moveSuccess() throws IOException {
-    Container container = createContainer(CONTAINER_ID, sourceVolume);
+  @ParameterizedTest
+  @EnumSource(names = {"CLOSED", "QUASI_CLOSED"})
+  public void moveSuccess(State containerState) throws IOException {
     long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
     long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
-    String oldContainerPath = container.getContainerData().getContainerPath();
+    long initialDestCommitted = destVolume.getCommittedBytes();
+    long initialSourceDelta = 
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+        0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
 
-    DiskBalancerService.DiskBalancerTask task = 
getTask(container.getContainerData());
+    Container container = createContainer(CONTAINER_ID, sourceVolume, 
containerState);
+    State originalState = container.getContainerState();
+    assertEquals(initialSourceUsed + CONTAINER_SIZE, 
sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+
+    String oldContainerPath = container.getContainerData().getContainerPath();
+    if (containerState == State.QUASI_CLOSED) {
+      DefaultContainerChoosingPolicy.setTest(true);
+    }
+    DiskBalancerService.DiskBalancerTask task = getTask();
     task.call();
+    assertEquals(State.DELETED, container.getContainerState());
 
+    // Asserts
     Container newContainer = containerSet.getContainer(CONTAINER_ID);
     assertNotNull(newContainer);
     assertNotEquals(container, newContainer);
+    assertEquals(originalState, newContainer.getContainerState());
     assertEquals(destVolume, newContainer.getContainerData().getVolume());
-    assertEquals(initialSourceUsed - CONTAINER_SIZE,
-        sourceVolume.getCurrentUsage().getUsedSpace());
-    assertEquals(initialDestUsed + CONTAINER_SIZE,
-        destVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed + CONTAINER_SIZE, 
destVolume.getCurrentUsage().getUsedSpace());
     assertFalse(new File(oldContainerPath).exists());
-    assertTrue(
-        new File(newContainer.getContainerData().getContainerPath()).exists());
-    assertEquals(1,
-        diskBalancerService.getMetrics().getSuccessCount());
-    assertEquals(CONTAINER_SIZE,
-        diskBalancerService.getMetrics().getSuccessBytes());
+    assertTrue(new 
File(newContainer.getContainerData().getContainerPath()).exists());
+    assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
+    assertEquals(CONTAINER_SIZE, 
diskBalancerService.getMetrics().getSuccessBytes());
+    assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+    assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
   @Test
-  public void moveFailsOnCopy() throws IOException {
-    Container container = createContainer(CONTAINER_ID, sourceVolume);
+  public void moveFailsAfterCopy() throws IOException, InterruptedException, 
TimeoutException, ExecutionException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume, 
State.CLOSED);
     long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
     long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    long initialDestCommitted = destVolume.getCommittedBytes();
+    long initialSourceDelta = 
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+        0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
     String oldContainerPath = container.getContainerData().getContainerPath();
 
-    // Use spy ContainerController to inject failure during copy
-    ContainerController spyController = spy(controller);
-    doThrow(new IOException("Mockito spy: copy failed"))
-        .when(spyController).copyContainer(any(ContainerData.class), 
any(Path.class));
-    when(ozoneContainer.getController()).thenReturn(spyController);
-
-    DiskBalancerService.DiskBalancerTask task = 
getTask(container.getContainerData());
-    task.call();
-
+    // verify temp container directory doesn't exist before task execution
+    Path tempContainerDir = destVolume.getTmpDir().toPath()
+        
.resolve(DiskBalancerService.DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
+    File dir = new File(String.valueOf(tempContainerDir));
+    assertFalse(dir.exists(), "Temp container directory should not exist 
before task starts");
+
+    kvFaultInjector.setException(new IOException("Fault injection: copy 
failed"), 1);
+    final TestFaultInjector serviceFaultInjector = new TestFaultInjector();
+    DiskBalancerService.setInjector(serviceFaultInjector);
+    DiskBalancerService.DiskBalancerTask task = getTask();
+    CompletableFuture completableFuture = CompletableFuture.runAsync(() -> 
task.call());
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Files.exists(tempContainerDir) && 
!FileUtils.isEmptyDirectory(tempContainerDir.toFile());
+      } catch (IOException e) {
+        fail("Failed to check temp container directory existence", e);
+      }
+      return false;
+    }, 100, 30000);
+    
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
+    serviceFaultInjector.resume();
+    // wait for task to be completed
+    completableFuture.get();
     Container originalContainer = containerSet.getContainer(CONTAINER_ID);
     assertNotNull(originalContainer);
     assertEquals(container, originalContainer);
@@ -203,30 +348,58 @@ public void moveFailsOnCopy() throws IOException {
     assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
     assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
     assertTrue(new File(oldContainerPath).exists());
-    Path tempDir = destVolume.getTmpDir().toPath()
-        .resolve(DiskBalancerService.DISK_BALANCER_DIR);
-    assertFalse(Files.exists(tempDir),
-        "Temp directory should be cleaned up");
+    assertFalse(Files.exists(tempContainerDir), "Temp container directory 
should be cleaned up");
     assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+    assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+    
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+    assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
   @Test
-  public void moveFailsOnImportContainer() throws IOException {
-    Container container = createContainer(CONTAINER_ID, sourceVolume);
+  public void moveFailsOnAtomicMove() throws IOException, 
InterruptedException, TimeoutException, ExecutionException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume, 
State.CLOSED);
     long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
     long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    long initialDestCommitted = destVolume.getCommittedBytes();
+    long initialSourceDelta = 
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+        0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
     String oldContainerPath = container.getContainerData().getContainerPath();
-
-    // Use spy to inject failure during the atomic move
-    ContainerController spyController = spy(controller);
-    doThrow(new IOException("Mockito spy: container import failed"))
-        .when(spyController).importContainer(any(ContainerData.class), 
any(Path.class));
-    when(ozoneContainer.getController()).thenReturn(spyController);
-
-    DiskBalancerService.DiskBalancerTask task = getTask(
-        container.getContainerData());
-    task.call();
-
+    Path tempDir = destVolume.getTmpDir().toPath()
+        .resolve(DiskBalancerService.DISK_BALANCER_DIR)
+        .resolve(String.valueOf(CONTAINER_ID));
+    assertFalse(Files.exists(tempDir), "Temp container directory should not 
exist");
+    Path destDirPath = Paths.get(
+        KeyValueContainerLocationUtil.getBaseContainerLocation(
+            destVolume.getHddsRootDir().toString(), scmId,
+            container.getContainerData().getContainerID()));
+    assertFalse(Files.exists(destDirPath), "Dest container directory should 
not exist");
+
+    // create dest container directory
+    assertTrue(destDirPath.toFile().mkdirs());
+    // create one file in the dest container directory
+    Path testfile = destDirPath.resolve("testfile");
+    assertTrue(testfile.toFile().createNewFile());
+
+    GenericTestUtils.LogCapturer serviceLog = 
GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class);
+    final TestFaultInjector serviceFaultInjector = new TestFaultInjector();
+    DiskBalancerService.setInjector(serviceFaultInjector);
+    DiskBalancerService.DiskBalancerTask task = getTask();
+    CompletableFuture completableFuture = CompletableFuture.runAsync(() -> 
task.call());
+    // wait for temp container directory to be created
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Files.exists(tempDir) && 
!FileUtils.isEmptyDirectory(tempDir.toFile());
+      } catch (IOException e) {
+        fail("Failed to check temp container directory existence", e);
+      }
+      return false;
+    }, 100, 30000);
+    
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+    serviceFaultInjector.resume();
+    completableFuture.get();
+
+    String expectedString = "Container Directory " + destDirPath + " already 
exists and are not empty";
+    assertTrue(serviceLog.getOutput().contains(expectedString));
     Container originalContainer = containerSet.getContainer(CONTAINER_ID);
     assertNotNull(originalContainer);
     assertEquals(container, originalContainer);
@@ -234,19 +407,31 @@ public void moveFailsOnImportContainer() throws 
IOException {
     assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
     assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
     assertTrue(new File(oldContainerPath).exists());
-    Path tempDir = destVolume.getTmpDir().toPath()
-        .resolve(DiskBalancerService.DISK_BALANCER_DIR)
-        .resolve(String.valueOf(CONTAINER_ID));
     assertFalse(Files.exists(tempDir), "Temp copy should be cleaned up");
+    assertTrue(Files.exists(destDirPath), "Dest container directory should not 
be cleaned up");
+    assertTrue(testfile.toFile().exists(), "testfile should not be cleaned 
up");
     assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+    assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+    
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+    assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
   @Test
-  public void moveFailsDuringInMemoryUpdate() throws IOException {
-    Container container = createContainer(CONTAINER_ID, sourceVolume);
+  public void moveFailsDuringInMemoryUpdate()
+      throws IOException, InterruptedException, TimeoutException, 
ExecutionException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume, 
State.QUASI_CLOSED);
     long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
     long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    long initialDestCommitted = destVolume.getCommittedBytes();
+    long initialSourceDelta = 
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+        0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
     String oldContainerPath = container.getContainerData().getContainerPath();
+    Path destDirPath = Paths.get(
+        KeyValueContainerLocationUtil.getBaseContainerLocation(
+            destVolume.getHddsRootDir().toString(), scmId,
+            container.getContainerData().getContainerID()));
+    assertFalse(Files.exists(destDirPath),
+        "Destination container should not exist before task execution");
 
     ContainerSet spyContainerSet = spy(containerSet);
     doThrow(new StorageContainerException("Mockito spy: updateContainer 
failed",
@@ -254,10 +439,24 @@ public void moveFailsDuringInMemoryUpdate() throws 
IOException {
         .when(spyContainerSet).updateContainer(any(Container.class));
     when(ozoneContainer.getContainerSet()).thenReturn(spyContainerSet);
 
-
-    DiskBalancerService.DiskBalancerTask task = getTask(
-        container.getContainerData());
-    task.call();
+    DefaultContainerChoosingPolicy.setTest(true);
+    DiskBalancerService.DiskBalancerTask task = getTask();
+    CompletableFuture completableFuture = CompletableFuture.runAsync(() -> 
task.call());
+
+    final TestFaultInjector serviceFaultInjector = new TestFaultInjector();
+    DiskBalancerService.setInjector(serviceFaultInjector);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return Files.exists(destDirPath) && 
!FileUtils.isEmptyDirectory(destDirPath.toFile());
+      } catch (IOException e) {
+        fail("Failed to check dest container directory existence", e);
+      }
+      return false;
+    }, 100, 30000);
+    
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+    serviceFaultInjector.resume();
+    // wait for task to be completed
+    completableFuture.get();
 
     // Asserts for rollback
     // The move succeeded on disk but should be reverted by the catch block
@@ -268,23 +467,27 @@ public void moveFailsDuringInMemoryUpdate() throws 
IOException {
     assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
     assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
     assertTrue(new File(oldContainerPath).exists());
+    assertFalse(FileUtils.isEmptyDirectory(new File(oldContainerPath)));
+    assertEquals(State.QUASI_CLOSED, originalContainer.getContainerState(),
+        "Container state should remain QUASI_CLOSED after rollback");
 
     // Verify the partially moved container at destination is cleaned up
-    String idDir = container.getContainerData().getOriginNodeId();
-    Path finalDestPath = Paths.get(
-        KeyValueContainerLocationUtil.getBaseContainerLocation(
-            destVolume.getHddsRootDir().toString(), idDir,
-            container.getContainerData().getContainerID()));
-    assertFalse(Files.exists(finalDestPath),
+    assertFalse(Files.exists(destDirPath),
         "Moved container at destination should be cleaned up on failure");
     assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+    assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+    
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+    assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
   @Test
   public void moveFailsDuringOldContainerRemove() throws IOException {
-    Container container = createContainer(CONTAINER_ID, sourceVolume);
+    Container container = createContainer(CONTAINER_ID, sourceVolume, 
State.CLOSED);
     long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
     long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    long initialDestCommitted = destVolume.getCommittedBytes();
+    long initialSourceDelta = 
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+        0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
 
     // Use a static mock for the KeyValueContainer utility class
     try (MockedStatic<KeyValueContainerUtil> mockedUtil =
@@ -294,35 +497,77 @@ public void moveFailsDuringOldContainerRemove() throws 
IOException {
               any(KeyValueContainerData.class), any(OzoneConfiguration.class)))
           .thenThrow(new IOException("Mockito: old container delete() 
failed"));
 
-      DiskBalancerService.DiskBalancerTask task = getTask(
-          container.getContainerData());
+      DiskBalancerService.DiskBalancerTask task = getTask();
       task.call();
+
+      // Assertions for successful move despite old container cleanup failure
+      assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
+      assertEquals(0, diskBalancerService.getMetrics().getFailureCount());
+      assertEquals(CONTAINER_SIZE, 
diskBalancerService.getMetrics().getSuccessBytes());
+
+      // Verify new container is active on the destination volume
+      Container newContainer = containerSet.getContainer(CONTAINER_ID);
+      assertNotNull(newContainer);
+      assertNotEquals(container, newContainer);
+      assertEquals(destVolume, newContainer.getContainerData().getVolume());
+      assertTrue(new 
File(newContainer.getContainerData().getContainerPath()).exists());
+
+      // Verify old container still exists
+      assertTrue(new 
File(container.getContainerData().getContainerPath()).exists());
+      assertFalse(FileUtils.isEmptyDirectory(new 
File(container.getContainerData().getContainerPath())));
+      assertEquals(State.DELETED, container.getContainerState());
+
+      // Verify volume usage is updated correctly
+      assertEquals(initialSourceUsed,
+          sourceVolume.getCurrentUsage().getUsedSpace());
+      assertEquals(initialDestUsed + CONTAINER_SIZE,
+          destVolume.getCurrentUsage().getUsedSpace());
+      assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+      
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+      assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
     }
+  }
 
-    // Assertions for successful move despite old container cleanup failure
-    assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
-    assertEquals(0, diskBalancerService.getMetrics().getFailureCount());
-    assertEquals(CONTAINER_SIZE, 
diskBalancerService.getMetrics().getSuccessBytes());
+  @Test
+  public void testDestVolumeCommittedSpaceReleased() throws IOException {
+    createContainer(CONTAINER_ID, sourceVolume, State.CLOSED);
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+    long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    long initialDestCommitted = destVolume.getCommittedBytes();
+    long initialSourceDelta = 
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+        0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
 
-    // Verify new container is active on the destination volume
-    Container newContainer = containerSet.getContainer(CONTAINER_ID);
-    assertNotNull(newContainer);
-    assertEquals(destVolume, newContainer.getContainerData().getVolume());
-    assertTrue(new 
File(newContainer.getContainerData().getContainerPath()).exists());
+    GenericTestUtils.LogCapturer serviceLog = 
GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class);
+    DiskBalancerService.DiskBalancerTask task = getTask();
+    long defaultContainerSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+    // verify committed space is reserved for destination volume
+    assertEquals(defaultContainerSize, destVolume.getCommittedBytes() - 
initialDestCommitted);
 
-    // Verify volume usage is updated correctly
-    assertEquals(initialSourceUsed - CONTAINER_SIZE,
-        sourceVolume.getCurrentUsage().getUsedSpace());
-    assertEquals(initialDestUsed + CONTAINER_SIZE,
-        destVolume.getCurrentUsage().getUsedSpace());
+    // delete the container from containerSet to simulate a failure
+    containerSet.removeContainer(CONTAINER_ID);
+
+    task.call();
+    String expectedString = "Container " + CONTAINER_ID + " doesn't exist in 
ContainerSet";
+    assertTrue(serviceLog.getOutput().contains(expectedString));
+    Container originalContainer = containerSet.getContainer(CONTAINER_ID);
+    assertNull(originalContainer);
+    assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(0, destVolume.getCommittedBytes() - initialDestCommitted);
+    assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+    assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+    
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+    assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
-  private KeyValueContainer createContainer(long containerId, HddsVolume vol)
+  private KeyValueContainer createContainer(long containerId, HddsVolume vol, 
State state)
       throws IOException {
     KeyValueContainerData containerData = new KeyValueContainerData(
         containerId, ContainerLayoutVersion.FILE_PER_BLOCK, CONTAINER_SIZE,
         UUID.randomUUID().toString(), datanodeUuid);
-    containerData.setState(State.CLOSED);
+    containerData.setState(state);
     containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE);
 
     KeyValueContainer container = new KeyValueContainer(containerData, conf);
@@ -337,8 +582,7 @@ private KeyValueContainer createContainer(long containerId, 
HddsVolume vol)
     return container;
   }
 
-  private DiskBalancerService.DiskBalancerTask getTask(ContainerData data) {
-    return diskBalancerService.createDiskBalancerTask(data, sourceVolume,
-        destVolume);
+  private DiskBalancerService.DiskBalancerTask getTask() {
+    return (DiskBalancerService.DiskBalancerTask) 
diskBalancerService.getTasks().poll();
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
index e300f8da6e..b65cfa637c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.scm.node;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -36,6 +38,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
@@ -83,7 +86,7 @@ public class TestVolumeChoosingPolicy {
   public void setup() throws Exception {
     hddsVolumes = new ArrayList<>();
     createVolumes();
-    volumeChoosingPolicy = new DefaultVolumeChoosingPolicy();
+    volumeChoosingPolicy = new DefaultVolumeChoosingPolicy(new 
ReentrantLock());
     executor = Executors.newFixedThreadPool(NUM_THREADS);
   }
 
@@ -108,6 +111,18 @@ public void cleanUp() {
     }
   }
 
+  @Test
+  @Timeout(30)
+  public void testVolumeChoosingFailureDueToDiskFull() {
+    // update volume configure, set a huge min free space
+    CONF.set("hdds.datanode.volume.min.free.space", "990GB");
+    for (StorageVolume volume: volumeSet.getVolumesList()) {
+      volume.setConf(CONF);
+    }
+    assertNull(volumeChoosingPolicy.chooseVolume(volumeSet, THRESHOLD, 
deltaSizes, 0));
+    assertEquals(NUM_VOLUMES, volumeSet.getVolumesList().size());
+  }
+
   @Test
   @Timeout(300)
   public void testConcurrentVolumeChoosingPerformance() throws Exception {
@@ -159,7 +174,7 @@ private void testPolicyPerformance(String policyName, 
DiskBalancerVolumeChoosing
 
             long threadStart = System.nanoTime();
             try {
-              Pair<HddsVolume, HddsVolume> pair = 
policy.chooseVolume(volumeSet, THRESHOLD, deltaSizes);
+              Pair<HddsVolume, HddsVolume> pair = 
policy.chooseVolume(volumeSet, THRESHOLD, deltaSizes, 0);
 
               if (pair == null) {
                 volumeNotChosen++;
@@ -200,7 +215,7 @@ private void testPolicyPerformance(String policyName, 
DiskBalancerVolumeChoosing
     System.out.println("Performance results for " + policyName);
     System.out.println("Total volumes: " + NUM_VOLUMES);
     System.out.println("Total threads: " + NUM_THREADS);
-    System.out.println("Threshold(%): " + THRESHOLD * 100.0);
+    System.out.println("Threshold(%): " + THRESHOLD);
     System.out.println("Total operations: " + totalOperations);
     System.out.println("Volume Pair Chosen operations: " + 
pairChosenCount.get());
     System.out.println("Volume Pair Not Chosen operations: " + 
pairNotChosenCount.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to