This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new c531e7519a HDDS-12439. [DiskBalancer] Reserve committed space during
volume choosing process (#8882)
c531e7519a is described below
commit c531e7519a0206367dd33c6409966a322ca4fe61
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Aug 8 18:04:22 2025 +0800
HDDS-12439. [DiskBalancer] Reserve committed space during volume choosing
process (#8882)
---
.../ozone/container/common/interfaces/Handler.java | 5 +-
.../container/common/utils/HddsVolumeUtil.java | 25 --
.../common/volume/AvailableSpaceFilter.java | 2 +-
.../volume/CapacityVolumeChoosingPolicy.java | 86 +++--
.../volume/RoundRobinVolumeChoosingPolicy.java | 46 ++-
.../container/common/volume/StorageVolume.java | 10 +-
.../common/volume/VolumeChoosingPolicyFactory.java | 13 +-
.../diskbalancer/DiskBalancerService.java | 230 ++++++------
.../diskbalancer/DiskBalancerServiceMetrics.java | 4 +-
.../policy/DefaultContainerChoosingPolicy.java | 11 +-
.../policy/DefaultVolumeChoosingPolicy.java | 83 +++--
.../policy/DiskBalancerVolumeChoosingPolicy.java | 3 +-
.../container/keyvalue/KeyValueContainer.java | 40 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 27 +-
.../container/ozoneimpl/ContainerController.java | 12 +-
.../diskbalancer/TestDiskBalancerService.java | 3 +-
.../diskbalancer/TestDiskBalancerTask.java | 412 ++++++++++++++++-----
.../ozone/scm/node/TestVolumeChoosingPolicy.java | 21 +-
18 files changed, 679 insertions(+), 354 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index b8a40fff24..e36940a509 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -256,9 +256,8 @@ public abstract void copyContainer(
throws IOException;
/**
- * Imports container from a container path.
+ * Imports container from a container which is under the temp directory.
*/
- public abstract Container importContainer(
- ContainerData containerData, Path containerPath) throws IOException;
+ public abstract Container importContainer(ContainerData
targetTempContainerData) throws IOException;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
index cf2c548ee5..6ea9e87541 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
@@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.volume.DbVolume;
@@ -140,28 +139,4 @@ private static void mapDbVolumesToDataVolumesIfNeeded(
hddsVolume.setDbVolume(globalDbVolumeMap.getOrDefault(
hddsVolume.getStorageID(), null)));
}
-
- /**
- * Get the HddsVolume according to the path.
- * @param volumes volume list to match from
- * @param pathStr path to match
- */
- public static HddsVolume matchHddsVolume(List<HddsVolume> volumes,
- String pathStr) throws IOException {
- assert pathStr != null;
- List<HddsVolume> resList = new ArrayList<>();
- for (HddsVolume hddsVolume: volumes) {
- if (pathStr.startsWith(hddsVolume.getVolumeRootDir())) {
- resList.add(hddsVolume);
- }
- }
- if (resList.size() == 1) {
- return resList.get(0);
- } else if (resList.size() > 1) {
- throw new IOException("Get multi volumes " +
- resList.stream().map(HddsVolume::getVolumeRootDir).collect(
- Collectors.joining(",")) + " matching path " + pathStr);
- }
- return null;
- }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
index 6769a8eeae..bbd2bc9751 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
@@ -32,7 +32,7 @@ public class AvailableSpaceFilter implements
Predicate<HddsVolume> {
private final List<StorageLocationReport> fullVolumes = new LinkedList<>();
private long mostAvailableSpace = Long.MIN_VALUE;
- AvailableSpaceFilter(long requiredSpace) {
+ public AvailableSpaceFilter(long requiredSpace) {
this.requiredSpace = requiredSpace;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
index e323eeb4b1..08b64327b0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
@@ -20,9 +20,11 @@
import static
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.logIfSomeVolumesOutOfSpace;
import static
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.throwDiskOutOfSpace;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -43,9 +45,20 @@ public class CapacityVolumeChoosingPolicy implements
VolumeChoosingPolicy {
private static final Logger LOG = LoggerFactory.getLogger(
CapacityVolumeChoosingPolicy.class);
+ private final ReentrantLock lock;
+
+ public CapacityVolumeChoosingPolicy(ReentrantLock globalLock) {
+ lock = globalLock;
+ }
+
+ // only for testing purposes
+ @VisibleForTesting
+ public CapacityVolumeChoosingPolicy() {
+ lock = new ReentrantLock();
+ }
@Override
- public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
+ public HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {
// No volumes available to choose from
@@ -55,43 +68,48 @@ public synchronized HddsVolume
chooseVolume(List<HddsVolume> volumes,
AvailableSpaceFilter filter = new AvailableSpaceFilter(maxContainerSize);
- List<HddsVolume> volumesWithEnoughSpace = volumes.stream()
- .filter(filter)
- .collect(Collectors.toList());
+ lock.lock();
+ try {
+ List<HddsVolume> volumesWithEnoughSpace = volumes.stream()
+ .filter(filter)
+ .collect(Collectors.toList());
- if (volumesWithEnoughSpace.isEmpty()) {
- throwDiskOutOfSpace(filter, LOG);
- } else {
- logIfSomeVolumesOutOfSpace(filter, LOG);
- }
+ if (volumesWithEnoughSpace.isEmpty()) {
+ throwDiskOutOfSpace(filter, LOG);
+ } else {
+ logIfSomeVolumesOutOfSpace(filter, LOG);
+ }
- int count = volumesWithEnoughSpace.size();
- HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
- if (count > 1) {
- // Even if we don't have too many volumes in volumesWithEnoughSpace, this
- // algorithm will still help us choose the volume with larger
- // available space than other volumes.
- // Say we have vol1 with more available space than vol2, for two choices,
- // the distribution of possibility is as follows:
- // 1. vol1 + vol2: 25%, result is vol1
- // 2. vol1 + vol1: 25%, result is vol1
- // 3. vol2 + vol1: 25%, result is vol1
- // 4. vol2 + vol2: 25%, result is vol2
- // So we have a total of 75% chances to choose vol1, which meets our
- // expectation.
- int firstIndex = ThreadLocalRandom.current().nextInt(count);
- int secondIndex = ThreadLocalRandom.current().nextInt(count);
+ int count = volumesWithEnoughSpace.size();
+ HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
+ if (count > 1) {
+ // Even if we don't have too many volumes in volumesWithEnoughSpace,
this
+ // algorithm will still help us choose the volume with larger
+ // available space than other volumes.
+ // Say we have vol1 with more available space than vol2, for two
choices,
+ // the distribution of possibility is as follows:
+ // 1. vol1 + vol2: 25%, result is vol1
+ // 2. vol1 + vol1: 25%, result is vol1
+ // 3. vol2 + vol1: 25%, result is vol1
+ // 4. vol2 + vol2: 25%, result is vol2
+ // So we have a total of 75% chances to choose vol1, which meets our
+ // expectation.
+ int firstIndex = ThreadLocalRandom.current().nextInt(count);
+ int secondIndex = ThreadLocalRandom.current().nextInt(count);
- HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
- HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
+ HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
+ HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
- long firstAvailable = firstVolume.getCurrentUsage().getAvailable()
- - firstVolume.getCommittedBytes();
- long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
- - secondVolume.getCommittedBytes();
- selectedVolume = firstAvailable < secondAvailable ? secondVolume :
firstVolume;
+ long firstAvailable = firstVolume.getCurrentUsage().getAvailable()
+ - firstVolume.getCommittedBytes();
+ long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
+ - secondVolume.getCommittedBytes();
+ selectedVolume = firstAvailable < secondAvailable ? secondVolume :
firstVolume;
+ }
+ selectedVolume.incCommittedBytes(maxContainerSize);
+ return selectedVolume;
+ } finally {
+ lock.unlock();
}
- selectedVolume.incCommittedBytes(maxContainerSize);
- return selectedVolume;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
index 52c8c59970..ff6f571300 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
@@ -20,8 +20,10 @@
import static
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.logIfSomeVolumesOutOfSpace;
import static
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingUtil.throwDiskOutOfSpace;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.slf4j.Logger;
@@ -38,9 +40,20 @@ public class RoundRobinVolumeChoosingPolicy implements
VolumeChoosingPolicy {
// Stores the index of the next volume to be returned.
private int nextVolumeIndex = 0;
+ private final ReentrantLock lock;
+
+ public RoundRobinVolumeChoosingPolicy(ReentrantLock globalLock) {
+ lock = globalLock;
+ }
+
+ // only for testing purposes
+ @VisibleForTesting
+ public RoundRobinVolumeChoosingPolicy() {
+ lock = new ReentrantLock();
+ }
@Override
- public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
+ public HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {
// No volumes available to choose from
@@ -56,23 +69,28 @@ public synchronized HddsVolume
chooseVolume(List<HddsVolume> volumes,
int startVolumeIndex = currentVolumeIndex;
- while (true) {
- final HddsVolume volume = volumes.get(currentVolumeIndex);
- // adjust for remaining capacity in Open containers
- boolean hasEnoughSpace = filter.test(volume);
+ lock.lock();
+ try {
+ while (true) {
+ final HddsVolume volume = volumes.get(currentVolumeIndex);
+ // adjust for remaining capacity in Open containers
+ boolean hasEnoughSpace = filter.test(volume);
- currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
+ currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
- if (hasEnoughSpace) {
- logIfSomeVolumesOutOfSpace(filter, LOG);
- nextVolumeIndex = currentVolumeIndex;
- volume.incCommittedBytes(maxContainerSize);
- return volume;
- }
+ if (hasEnoughSpace) {
+ logIfSomeVolumesOutOfSpace(filter, LOG);
+ nextVolumeIndex = currentVolumeIndex;
+ volume.incCommittedBytes(maxContainerSize);
+ return volume;
+ }
- if (currentVolumeIndex == startVolumeIndex) {
- throwDiskOutOfSpace(filter, LOG);
+ if (currentVolumeIndex == startVolumeIndex) {
+ throwDiskOutOfSpace(filter, LOG);
+ }
}
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 3ef4008463..2ebfee6e20 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -87,8 +87,8 @@ public abstract class StorageVolume implements
Checkable<Boolean, VolumeCheckRes
private long cTime; // creation time of the file system state
private int layoutVersion; // layout version of the storage data
- private final ConfigurationSource conf;
- private final DatanodeConfiguration dnConf;
+ private ConfigurationSource conf;
+ private DatanodeConfiguration dnConf;
private final StorageType storageType;
private final String volumeRoot;
@@ -578,6 +578,12 @@ public ConfigurationSource getConf() {
return conf;
}
+ @VisibleForTesting
+ public void setConf(ConfigurationSource newConf) {
+ this.conf = newConf;
+ this.dnConf = newConf.getObject(DatanodeConfiguration.class);
+ }
+
public DatanodeConfiguration getDatanodeConfig() {
return dnConf;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
index 28ad10a158..ae2eafa735 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java
@@ -19,9 +19,12 @@
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import
org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
import org.apache.ratis.util.ReflectionUtils;
/**
@@ -32,6 +35,8 @@ public final class VolumeChoosingPolicyFactory {
private static final Class<? extends VolumeChoosingPolicy>
DEFAULT_VOLUME_CHOOSING_POLICY = CapacityVolumeChoosingPolicy.class;
+ // a lock to coordinate space reservation between multiple policies and
threads
+ private static final ReentrantLock LOCK = new ReentrantLock();
private VolumeChoosingPolicyFactory() {
}
@@ -40,6 +45,12 @@ public static VolumeChoosingPolicy
getPolicy(ConfigurationSource conf) {
Class<? extends VolumeChoosingPolicy> policyClass = conf.getClass(
HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
DEFAULT_VOLUME_CHOOSING_POLICY, VolumeChoosingPolicy.class);
- return ReflectionUtils.newInstance(policyClass);
+ return ReflectionUtils.newInstance(policyClass, new Class<?>[]
{ReentrantLock.class}, LOCK);
+ }
+
+ public static DiskBalancerVolumeChoosingPolicy
getDiskBalancerPolicy(ConfigurationSource conf) {
+ Class<?> policyClass =
conf.getObject(DiskBalancerConfiguration.class).getVolumeChoosingPolicyClass();
+ return (DiskBalancerVolumeChoosingPolicy) ReflectionUtils.newInstance(
+ policyClass, new Class<?>[]{ReentrantLock.class}, LOCK);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 724ec8f9ff..b01217b74b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.container.diskbalancer;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -35,16 +37,20 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -54,6 +60,7 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -96,6 +103,7 @@ public class DiskBalancerService extends BackgroundService {
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
private Set<Long> inProgressContainers;
+ private static FaultInjector injector;
/**
* A map that tracks the total bytes which will be freed from each source
volume
@@ -115,6 +123,7 @@ public class DiskBalancerService extends BackgroundService {
private DiskBalancerServiceMetrics metrics;
private long bytesToMove;
+ private long containerDefaultSize;
/**
* Defines the operational states of the DiskBalancerService.
@@ -156,11 +165,12 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
inProgressContainers = ConcurrentHashMap.newKeySet();
deltaSizes = new ConcurrentHashMap<>();
volumeSet = ozoneContainer.getVolumeSet();
+ containerDefaultSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
try {
- volumeChoosingPolicy = (DiskBalancerVolumeChoosingPolicy)
- conf.getObject(DiskBalancerConfiguration.class)
- .getVolumeChoosingPolicyClass().newInstance();
+ volumeChoosingPolicy =
VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf);
containerChoosingPolicy = (ContainerChoosingPolicy)
conf.getObject(DiskBalancerConfiguration.class)
.getContainerChoosingPolicyClass().newInstance();
@@ -387,7 +397,7 @@ public BackgroundTaskQueue getTasks() {
for (int i = 0; i < availableTaskCount; i++) {
Pair<HddsVolume, HddsVolume> pair = volumeChoosingPolicy
- .chooseVolume(volumeSet, threshold, deltaSizes);
+ .chooseVolume(volumeSet, threshold, deltaSizes,
containerDefaultSize);
if (pair == null) {
continue;
}
@@ -401,7 +411,9 @@ public BackgroundTaskQueue getTasks() {
inProgressContainers.add(toBalanceContainer.getContainerID());
deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
- toBalanceContainer.getBytesUsed());
- destVolume.incCommittedBytes(toBalanceContainer.getBytesUsed());
+ } else {
+ // release destVolume committed bytes
+ destVolume.incCommittedBytes(0 - containerDefaultSize);
}
}
@@ -468,109 +480,98 @@ public BackgroundTaskResult call() {
long startTime = Time.monotonicNow();
boolean moveSucceeded = true;
long containerId = containerData.getContainerID();
- boolean destVolumeIncreased = false;
+ Container container =
ozoneContainer.getContainerSet().getContainer(containerId);
+ boolean readLockReleased = false;
Path diskBalancerTmpDir = null, diskBalancerDestDir = null;
long containerSize = containerData.getBytesUsed();
- String originalContainerChecksum =
containerData.getContainerFileChecksum();
+ if (container == null) {
+ LOG.warn("Container " + containerId + " doesn't exist in
ContainerSet");
+ postCall(false, startTime);
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ // hold read lock on the container first, to avoid other threads to
update the container state,
+ // such as block deletion.
+ container.readLock();
try {
// Step 1: Copy container to new Volume's tmp Dir
diskBalancerTmpDir = destVolume.getTmpDir().toPath()
.resolve(DISK_BALANCER_DIR).resolve(String.valueOf(containerId));
- ozoneContainer.getController().copyContainer(containerData,
- diskBalancerTmpDir);
+ ozoneContainer.getController().copyContainer(containerData,
diskBalancerTmpDir);
// Step 2: verify checksum and Transition Temp container to Temp
C1-RECOVERING
- File tempContainerFile = ContainerUtils.getContainerFile(
- diskBalancerTmpDir.toFile());
+ File tempContainerFile =
ContainerUtils.getContainerFile(diskBalancerTmpDir.toFile());
if (!tempContainerFile.exists()) {
throw new IOException("ContainerFile for container " + containerId
- + " doesn't exist in temp directory "
- + tempContainerFile.getAbsolutePath());
- }
- ContainerData tempContainerData = ContainerDataYaml
- .readContainerFile(tempContainerFile);
- String copiedContainerChecksum =
tempContainerData.getContainerFileChecksum();
- if (!originalContainerChecksum.equals(copiedContainerChecksum)) {
- throw new IOException("Container checksum mismatch for container "
- + containerId + ". Original: " + originalContainerChecksum
- + ", Copied: " + copiedContainerChecksum);
+ + " doesn't exist in temp directory " +
tempContainerFile.getAbsolutePath());
}
+ ContainerData tempContainerData =
ContainerDataYaml.readContainerFile(tempContainerFile);
+ ContainerUtils.verifyContainerFileChecksum(tempContainerData, conf);
+ // Before move the container directory to final place, the destination
dir is empty and doesn't have
+ // a metadata directory. Writing the .container file will fail as the
metadata dir doesn't exist.
+ // So we instead save the container file to the diskBalancerTmpDir.
+ ContainerProtos.ContainerDataProto.State originalState =
tempContainerData.getState();
tempContainerData.setState(ContainerProtos.ContainerDataProto.State.RECOVERING);
-
+ // update tempContainerData volume to point to destVolume
+ tempContainerData.setVolume(destVolume);
// overwrite the .container file with the new state.
ContainerDataYaml.createContainerFile(tempContainerData,
tempContainerFile);
+ // reset to original state
+ tempContainerData.setState(originalState);
- // Step 3: Move container directory to final place on new volume
+ // Step 3: Move container directory to final place on new volume and
import
String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
destVolume, destVolume.getClusterID());
diskBalancerDestDir =
Paths.get(KeyValueContainerLocationUtil.getBaseContainerLocation(
destVolume.getHddsRootDir().toString(), idDir,
containerData.getContainerID()));
- Path destDirParent = diskBalancerDestDir.getParent();
- if (destDirParent != null) {
- Files.createDirectories(destDirParent);
+ if (!Files.exists(diskBalancerDestDir)) {
+ Files.createDirectories(diskBalancerDestDir);
}
- Files.move(diskBalancerTmpDir, diskBalancerDestDir,
- StandardCopyOption.ATOMIC_MOVE,
- StandardCopyOption.REPLACE_EXISTING);
-
- // Generate a new Container based on destDir which is in C1-RECOVERING
state.
- File containerFile = ContainerUtils.getContainerFile(
- diskBalancerDestDir.toFile());
- if (!containerFile.exists()) {
- throw new IOException("ContainerFile for container " + containerId
- + " doesn't exists.");
- }
- ContainerData originalContainerData = ContainerDataYaml
- .readContainerFile(containerFile);
- Container newContainer = ozoneContainer.getController()
- .importContainer(originalContainerData, diskBalancerDestDir);
- newContainer.getContainerData().getVolume()
- .incrementUsedSpace(containerSize);
- destVolumeIncreased = true;
-
- // The import process loaded the temporary RECOVERING state from disk.
- // Now, restore the original state and persist it to the .container
file.
- newContainer.getContainerData().setState(containerData.getState());
- newContainer.update(newContainer.getContainerData().getMetadata(),
true);
-
- // Step 5: Update container for containerID and delete old container.
- Container oldContainer = ozoneContainer.getContainerSet()
- .getContainer(containerId);
- oldContainer.writeLock();
- try {
- // First, update the in-memory set to point to the new replica.
- ozoneContainer.getContainerSet().updateContainer(newContainer);
- // Mark old container as DELETED and persist state.
- oldContainer.getContainerData().setState(
- ContainerProtos.ContainerDataProto.State.DELETED);
- oldContainer.update(oldContainer.getContainerData().getMetadata(),
- true);
-
- // Remove the old container from the KeyValueContainerUtil.
- try {
- KeyValueContainerUtil.removeContainer(
- (KeyValueContainerData) oldContainer.getContainerData(), conf);
- oldContainer.delete();
- } catch (IOException ex) {
- LOG.warn("Failed to cleanup old container {} after move. It is " +
- "marked DELETED and will be handled by background
scanners.",
- containerId, ex);
- }
- } finally {
- oldContainer.writeUnlock();
+ if (FileUtils.isEmptyDirectory(diskBalancerDestDir.toFile())) {
+ Files.move(diskBalancerTmpDir, diskBalancerDestDir,
+ StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
+ } else {
+ String errorMessage = "Container " + containerId +
+ " move failed because Container Directory " +
+ diskBalancerDestDir.toAbsolutePath() + " already exists and are
not empty";
+ throw new StorageContainerException(errorMessage,
CONTAINER_ALREADY_EXISTS);
}
- //The move is now successful.
- oldContainer.getContainerData().getVolume()
- .decrementUsedSpace(containerSize);
+ // Import the container. importContainer will reset container back to
original state
+ Container newContainer =
ozoneContainer.getController().importContainer(tempContainerData);
+
+ // Step 4: Update container for containerID and mark old container for
deletion
+ // first, update the in-memory set to point to the new replica.
+ // After this update, new caller will get the new Container object,
+ // old caller can still hold the old Container object.
+ ozoneContainer.getContainerSet().updateContainer(newContainer);
+ destVolume.incrementUsedSpace(containerSize);
+ // Mark old container as DELETED and persist state.
+ // markContainerForDelete require writeLock, so release readLock first
+ container.readUnlock();
+ readLockReleased = true;
+ try {
+ container.markContainerForDelete();
+ } catch (Throwable e) {
+ // mark container for deleted failure will not fail the whole
process, it will leave both old and new replica
+ // on disk, while new container in the ContainerSet.
+ LOG.warn("Failed to mark the old container {} for delete. " +
+ "It will be handled after DN restart.", containerId, e);
+ }
+ // The move is now successful.
balancedBytesInLastWindow.addAndGet(containerSize);
- metrics.incrSuccessCount(1);
metrics.incrSuccessBytes(containerSize);
totalBalancedBytes.addAndGet(containerSize);
} catch (IOException e) {
+ if (injector != null) {
+ try {
+ injector.pause();
+ } catch (IOException ex) {
+ // do nothing
+ }
+ }
moveSucceeded = false;
LOG.warn("Failed to move container {}", containerId, e);
if (diskBalancerTmpDir != null) {
@@ -578,35 +579,37 @@ public BackgroundTaskResult call() {
File dir = new File(String.valueOf(diskBalancerTmpDir));
FileUtils.deleteDirectory(dir);
} catch (IOException ex) {
- LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir,
- ex);
+ LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir,
ex);
}
}
- if (diskBalancerDestDir != null) {
+ if (diskBalancerDestDir != null && e instanceof
StorageContainerException
+ && ((StorageContainerException) e).getResult() !=
CONTAINER_ALREADY_EXISTS) {
try {
File dir = new File(String.valueOf(diskBalancerDestDir));
FileUtils.deleteDirectory(dir);
} catch (IOException ex) {
- LOG.warn("Failed to delete dest directory {}",
- diskBalancerDestDir, ex);
+ LOG.warn("Failed to delete dest directory {}",
diskBalancerDestDir, ex);
}
}
- // Only need to check for destVolume, sourceVolume's usedSpace is
- // updated at last, if it reaches there, there is no exception.
- if (destVolumeIncreased) {
- destVolume.decrementUsedSpace(containerSize);
- }
- metrics.incrFailureCount();
} finally {
- long endTime = Time.monotonicNow();
+ if (!readLockReleased) {
+ container.readUnlock();
+ }
if (moveSucceeded) {
- metrics.getMoveSuccessTime().add(endTime - startTime);
+ // Remove the old container from the KeyValueContainerUtil.
+ try {
+ KeyValueContainerUtil.removeContainer(
+ (KeyValueContainerData) container.getContainerData(), conf);
+ container.delete();
+
container.getContainerData().getVolume().decrementUsedSpace(containerSize);
+ } catch (IOException ex) {
+ LOG.warn("Failed to move or delete old container {} after it's
marked as DELETED. " +
+ "It will be handled by background scanners.", containerId,
ex);
+ }
ContainerLogger.logMoveSuccess(containerId, sourceVolume,
- destVolume, containerSize, endTime - startTime);
- } else {
- metrics.getMoveFailureTime().add(endTime - startTime);
+ destVolume, containerSize, Time.monotonicNow() - startTime);
}
- postCall();
+ postCall(moveSucceeded, startTime);
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
@@ -616,11 +619,19 @@ public int getPriority() {
return BackgroundTask.super.getPriority();
}
- private void postCall() {
+ private void postCall(boolean success, long startTime) {
inProgressContainers.remove(containerData.getContainerID());
deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
containerData.getBytesUsed());
- destVolume.incCommittedBytes(-containerData.getBytesUsed());
+ destVolume.incCommittedBytes(0 - containerDefaultSize);
+ long endTime = Time.monotonicNow();
+ if (success) {
+ metrics.incrSuccessCount(1);
+ metrics.getMoveSuccessTime().add(endTime - startTime);
+ } else {
+ metrics.incrFailureCount(1);
+ metrics.getMoveFailureTime().add(endTime - startTime);
+ }
}
}
@@ -669,10 +680,6 @@ private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
.resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR);
}
- public boolean isBalancingContainer(long containerId) {
- return inProgressContainers.contains(containerId);
- }
-
public DiskBalancerServiceMetrics getMetrics() {
return metrics;
}
@@ -690,15 +697,6 @@ public DiskBalancerVolumeChoosingPolicy
getVolumeChoosingPolicy() {
return volumeChoosingPolicy;
}
- @VisibleForTesting
- public DiskBalancerTask createDiskBalancerTask(ContainerData containerData,
HddsVolume source, HddsVolume dest) {
- inProgressContainers.add(containerData.getContainerID());
- deltaSizes.put(source, deltaSizes.getOrDefault(source, 0L)
- - containerData.getBytesUsed());
- dest.incCommittedBytes(containerData.getBytesUsed());
- return new DiskBalancerTask(containerData, source, dest);
- }
-
@VisibleForTesting
public void setVolumeChoosingPolicy(DiskBalancerVolumeChoosingPolicy
volumeChoosingPolicy) {
this.volumeChoosingPolicy = volumeChoosingPolicy;
@@ -714,6 +712,11 @@ public Set<Long> getInProgressContainers() {
return inProgressContainers;
}
+ @VisibleForTesting
+ public Map<HddsVolume, Long> getDeltaSizes() {
+ return deltaSizes;
+ }
+
/**
* Handle state changes for DiskBalancerService.
*/
@@ -756,4 +759,9 @@ public void shutdown() {
DiskBalancerServiceMetrics.unRegister();
}
}
+
+ @VisibleForTesting
+ public static void setInjector(FaultInjector instance) {
+ injector = instance;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
index 38e8df7dec..5b7ee19ba4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceMetrics.java
@@ -89,8 +89,8 @@ public void incrSuccessBytes(long bytes) {
this.successBytes.incr(bytes);
}
- public void incrFailureCount() {
- this.failureCount.incr();
+ public void incrFailureCount(long count) {
+ this.failureCount.incr(count);
}
public void incrRunningLoopCount() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
index f75b94ad16..76a0f30c14 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
@@ -19,6 +19,7 @@
import static java.util.concurrent.TimeUnit.HOURS;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Iterator;
@@ -42,6 +43,9 @@ public class DefaultContainerChoosingPolicy implements
ContainerChoosingPolicy {
ThreadLocal.withInitial(
() -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1,
HOURS).build());
+ // for test
+ private static boolean test = false;
+
@Override
public ContainerData chooseContainer(OzoneContainer ozoneContainer,
HddsVolume hddsVolume, Set<Long> inProgressContainerIDs) {
@@ -57,7 +61,7 @@ public ContainerData chooseContainer(OzoneContainer
ozoneContainer,
while (itr.hasNext()) {
ContainerData containerData = itr.next().getContainerData();
if (!inProgressContainerIDs.contains(
- containerData.getContainerID()) && containerData.isClosed()) {
+ containerData.getContainerID()) && (containerData.isClosed() ||
(test && containerData.isQuasiClosed()))) {
return containerData;
}
}
@@ -67,4 +71,9 @@ public ContainerData chooseContainer(OzoneContainer
ozoneContainer,
}
return null;
}
+
+ @VisibleForTesting
+ public static void setTest(boolean isTest) {
+ test = isTest;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
index d867b3b0f2..cba87740f7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
@@ -19,16 +19,18 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Choose a random volume for balancing.
+ * Choose a random volume for disk balancing.
*
* Source volumes use deltaMap to simulate space that will be freed
(pre-deleted).
* Destination volumes use committedBytes to account for space already
reserved.
@@ -38,37 +40,64 @@ public class DefaultVolumeChoosingPolicy implements
DiskBalancerVolumeChoosingPo
public static final Logger LOG = LoggerFactory.getLogger(
DefaultVolumeChoosingPolicy.class);
+ private final ReentrantLock lock;
+
+ public DefaultVolumeChoosingPolicy(ReentrantLock globalLock) {
+ lock = globalLock;
+ }
@Override
public Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
- double threshold, Map<HddsVolume, Long> deltaMap) {
- double idealUsage = volumeSet.getIdealUsage();
+ double threshold, Map<HddsVolume, Long> deltaMap, long containerSize) {
+ lock.lock();
+ try {
+ double idealUsage = volumeSet.getIdealUsage();
- // Threshold is given as a percentage
- double normalizedThreshold = threshold / 100;
- List<HddsVolume> volumes = StorageVolumeUtil
- .getHddsVolumesList(volumeSet.getVolumesList())
- .stream()
- .filter(volume ->
- Math.abs(
- ((double)((volume.getCurrentUsage().getCapacity() -
volume.getCurrentUsage().getAvailable())
- + deltaMap.getOrDefault(volume, 0L) +
volume.getCommittedBytes()))
- / volume.getCurrentUsage().getCapacity() - idealUsage) >=
normalizedThreshold)
- .sorted((v1, v2) ->
- Double.compare(
- (double) ((v2.getCurrentUsage().getCapacity() -
v2.getCurrentUsage().getAvailable())
- + deltaMap.getOrDefault(v2, 0L) + v2.getCommittedBytes()) /
- v2.getCurrentUsage().getCapacity(),
- (double) ((v1.getCurrentUsage().getCapacity() -
v1.getCurrentUsage().getAvailable())
- + deltaMap.getOrDefault(v1, 0L) + v1.getCommittedBytes()) /
- v1.getCurrentUsage().getCapacity()))
- .collect(Collectors.toList());
+ // Threshold is given as a percentage
+ double normalizedThreshold = threshold / 100;
+ List<HddsVolume> volumes = StorageVolumeUtil
+ .getHddsVolumesList(volumeSet.getVolumesList())
+ .stream()
+ .filter(volume ->
+ Math.abs(
+ ((double)((volume.getCurrentUsage().getCapacity() -
volume.getCurrentUsage().getAvailable())
+ + deltaMap.getOrDefault(volume, 0L) +
volume.getCommittedBytes()))
+ / volume.getCurrentUsage().getCapacity() - idealUsage)
>= normalizedThreshold)
+ .sorted((v1, v2) ->
+ Double.compare(
+ (double) ((v2.getCurrentUsage().getCapacity() -
v2.getCurrentUsage().getAvailable())
+ + deltaMap.getOrDefault(v2, 0L) +
v2.getCommittedBytes()) /
+ v2.getCurrentUsage().getCapacity(),
+ (double) ((v1.getCurrentUsage().getCapacity() -
v1.getCurrentUsage().getAvailable())
+ + deltaMap.getOrDefault(v1, 0L) +
v1.getCommittedBytes()) /
+ v1.getCurrentUsage().getCapacity()))
+ .collect(Collectors.toList());
- // Can not generate DiskBalancerTask if we have less than 2 results
- if (volumes.size() <= 1) {
- LOG.debug("Can not find appropriate Source volume and Dest Volume.");
- return null;
+ // Can not generate DiskBalancerTask if we have less than 2 results
+ if (volumes.size() <= 1) {
+ LOG.debug("Can not find appropriate Source volume and Dest Volume.");
+ return null;
+ }
+ AvailableSpaceFilter filter = new AvailableSpaceFilter(containerSize);
+ HddsVolume srcVolume = volumes.get(0);
+ HddsVolume destVolume = volumes.get(volumes.size() - 1);
+ while (!filter.test(destVolume)) {
+ // If the destination volume does not have enough space, try the next
+ // one in the list.
+ LOG.debug("Destination volume {} does not have enough space, trying
next volume.",
+ destVolume.getStorageID());
+ volumes.remove(destVolume);
+ if (volumes.size() <= 1) {
+ LOG.debug("Can not find appropriate Source volume and Dest Volume.");
+ return null;
+ }
+ destVolume = volumes.get(volumes.size() - 1);
+ }
+ // reserve space for the dest volume
+ destVolume.incCommittedBytes(containerSize);
+ return Pair.of(srcVolume, destVolume);
+ } finally {
+ lock.unlock();
}
- return Pair.of(volumes.get(0), volumes.get(volumes.size() - 1));
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
index 7bb805f464..01520fea37 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java
@@ -32,8 +32,9 @@ public interface DiskBalancerVolumeChoosingPolicy {
* @param volumeSet - volumes to choose from.
* @param threshold - the threshold to choose source and dest volumes.
* @param deltaSizes - the sizes changes of inProgress balancing jobs.
+ * @param containerSize - the estimated size of container to be moved.
* @return Source volume and Dest volume.
*/
Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
- double threshold, Map<HddsVolume, Long> deltaSizes);
+ double threshold, Map<HddsVolume, Long> deltaSizes, long containerSize);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 3b952c5952..b3ba31da8e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -64,6 +64,7 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.nativeio.NativeIO;
@@ -120,6 +121,7 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
private Set<Long> pendingPutBlockCache;
private boolean bCheckChunksFilePath;
+ private static FaultInjector faultInjector;
public KeyValueContainer(KeyValueContainerData containerData,
ConfigurationSource ozoneConfig) {
@@ -934,7 +936,7 @@ public DataScanResult scanData(DataTransferThrottler
throttler, Canceler cancele
@Override
public void copyContainerData(Path destination) throws IOException {
- writeLock();
+ readLock();
try {
// Closed/ Quasi closed containers are considered for replication by
// replication manager if they are under-replicated.
@@ -948,16 +950,11 @@ public void copyContainerData(Path destination) throws
IOException {
" is in state " + state);
}
- try {
- if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
- compactDB();
- // Close DB (and remove from cache) to avoid concurrent modification
- // while copying it.
- BlockUtils.removeDB(containerData, config);
- }
- } finally {
- readLock();
- writeUnlock();
+ if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+ compactDB();
+ // Close DB (and remove from cache) to avoid concurrent modification
+ // while copying it.
+ BlockUtils.removeDB(containerData, config);
}
if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
@@ -972,19 +969,28 @@ public void copyContainerData(Path destination) throws
IOException {
} else {
copyContainerToDestination(destination);
}
- } catch (Exception e) {
+ if (getInjector() != null && getInjector().getException() != null) {
+ throw new IOException("Fault injection", getInjector().getException());
+ }
+ } catch (IOException e) {
LOG.error("Got exception when copying container {} to {}",
containerData.getContainerID(), destination, e);
throw e;
} finally {
- if (lock.isWriteLockedByCurrentThread()) {
- writeUnlock();
- } else {
- readUnlock();
- }
+ readUnlock();
}
}
+ @VisibleForTesting
+ public static FaultInjector getInjector() {
+ return faultInjector;
+ }
+
+ @VisibleForTesting
+ public static void setInjector(FaultInjector instance) {
+ faultInjector = instance;
+ }
+
/**
* Set all of the path realted container data fields based on the name
* conventions.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 622515d0f8..2176338227 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -146,7 +146,6 @@
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
@@ -1329,7 +1328,15 @@ public Container importContainer(ContainerData
originalContainerData,
ContainerLogger.logImported(container.getContainerData());
sendICR(container);
return container;
+ }
+ @Override
+ public Container importContainer(ContainerData targetTempContainerData)
throws IOException {
+ KeyValueContainer container = createNewContainer(targetTempContainerData);
+ HddsVolume targetVolume = targetTempContainerData.getVolume();
+ populateContainerPathFields(container, targetVolume);
+ container.importContainerData((KeyValueContainerData)
targetTempContainerData);
+ return container;
}
@Override
@@ -1557,24 +1564,6 @@ public void copyContainer(final Container container,
Path destinationPath)
kvc.copyContainerData(destinationPath);
}
- @Override
- public Container importContainer(ContainerData originalContainerData,
- final Path containerPath) throws IOException {
- KeyValueContainer container = createNewContainer(originalContainerData);
-
- HddsVolume volume = HddsVolumeUtil.matchHddsVolume(
- StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
- containerPath.toString());
- if (volume == null ||
- !containerPath.startsWith(volume.getVolumeRootDir())) {
- throw new IOException("ContainerPath " + containerPath
- + " doesn't match volume " + volume);
- }
- container.populatePathFields(volume, containerPath);
- container.importContainerData(containerPath);
- return container;
- }
-
private KeyValueContainer createNewContainer(
ContainerData originalContainerData) {
Preconditions.checkState(originalContainerData instanceof
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 097d7cfa34..7b13929713 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -211,6 +211,10 @@ public Container importContainer(
.importContainer(containerData, rawContainerStream, packer);
}
+ public Container importContainer(final ContainerData
targetTempContainerData) throws IOException {
+ return
handlers.get(targetTempContainerData.getContainerType()).importContainer(targetTempContainerData);
+ }
+
public void copyContainer(final ContainerData containerData,
final Path destinationPath) throws IOException {
handlers.get(containerData.getContainerType())
@@ -219,14 +223,6 @@ public void copyContainer(final ContainerData
containerData,
destinationPath);
}
- public Container importContainer(
- final ContainerData containerData,
- final Path containerPath)
- throws IOException {
- return handlers.get(containerData.getContainerType())
- .importContainer(containerData, containerPath);
- }
-
public void exportContainer(final ContainerType type,
final long containerId, final OutputStream outputStream,
final TarContainerPacker packer) throws IOException {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 07d05fcb0b..9bd803ab60 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -289,7 +290,7 @@ public void testConcurrentTasksNotExceedThreadLimit()
throws Exception {
when(containerData.getContainerID()).thenAnswer(invocation ->
System.nanoTime());
when(containerData.getBytesUsed()).thenReturn(100L);
- when(volumePolicy.chooseVolume(any(), anyDouble(),
any())).thenReturn(Pair.of(source, dest));
+ when(volumePolicy.chooseVolume(any(), anyDouble(), any(),
anyLong())).thenReturn(Pair.of(source, dest));
when(containerPolicy.chooseContainer(any(), any(),
any())).thenReturn(containerData);
// Test when no tasks are in progress, it should schedule up to the limit
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index 9208eacf09..ee72c347a6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -23,7 +23,9 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -40,16 +42,24 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -60,6 +70,7 @@
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -68,11 +79,16 @@
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -100,6 +116,87 @@ public class TestDiskBalancerTask {
private static final long CONTAINER_ID = 1L;
private static final long CONTAINER_SIZE = 1024L * 1024L; // 1 MB
+ private final TestFaultInjector kvFaultInjector = new TestFaultInjector();
+
+ /**
+ * A FaultInjector that can be configured to throw an exception on a
+ * specific invocation number. This allows tests to target failure points
+ * that occur after initial checks.
+ */
+ private static class TestFaultInjector extends FaultInjector {
+ private Throwable exception;
+ private int throwOnInvocation = -1; // -1 means never throw
+ private int invocationCount = 0;
+ private CountDownLatch ready;
+ private CountDownLatch wait;
+
+ TestFaultInjector() {
+ init();
+ }
+
+ @Override
+ public void init() {
+ this.ready = new CountDownLatch(1);
+ this.wait = new CountDownLatch(1);
+ }
+
+ @Override
+ public void pause() throws IOException {
+ ready.countDown();
+ try {
+ wait.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void resume() throws IOException {
+ // Make sure injector pauses before resuming.
+ try {
+ ready.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assertions.assertTrue(Fail.fail("resume interrupted"));
+ }
+ wait.countDown();
+ }
+
+ /**
+ * Sets an exception to be thrown on a specific invocation.
+ * @param e The exception to throw.
+ * @param onInvocation The invocation number to throw on (e.g., 1 for the
+ * first call, 2 for the second, etc.).
+ */
+ public void setException(Throwable e, int onInvocation) {
+ this.exception = e;
+ this.throwOnInvocation = onInvocation;
+ this.invocationCount = 0; // Reset count for each new test setup
+ }
+
+ @Override
+ public void setException(Throwable e) {
+ // Default to throwing on the first invocation if no number is specified.
+ setException(e, 1);
+ }
+
+ @Override
+ public Throwable getException() {
+ invocationCount++;
+ if (exception != null && invocationCount == throwOnInvocation) {
+ return exception;
+ }
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ this.exception = null;
+ this.throwOnInvocation = -1;
+ this.invocationCount = 0;
+ }
+ }
+
@BeforeEach
public void setup() throws Exception {
testRoot = tmpDir.toFile();
@@ -109,15 +206,28 @@ public void setup() throws Exception {
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
testRoot.getAbsolutePath() + "/vol1," + testRoot.getAbsolutePath()
+ "/vol2");
+ conf.setClass(SpaceUsageCheckFactory.Conf.configKeyForClassName(),
+ MockSpaceUsageCheckFactory.HalfTera.class,
+ SpaceUsageCheckFactory.class);
volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+ List<StorageVolume> volumes = volumeSet.getVolumesList();
+ sourceVolume = (HddsVolume) volumes.get(0);
+ destVolume = (HddsVolume) volumes.get(1);
+
+ // reset volume's usedBytes
+ sourceVolume.incrementUsedSpace(0 -
sourceVolume.getCurrentUsage().getUsedSpace());
+ destVolume.incrementUsedSpace(0 -
destVolume.getCurrentUsage().getUsedSpace());
+
sourceVolume.incrementUsedSpace(sourceVolume.getCurrentUsage().getCapacity() /
2);
+
containerSet = ContainerSet.newReadOnlyContainerSet(1000);
ContainerMetrics containerMetrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, containerMetrics, c -> {
}, new ContainerChecksumTreeManager(conf));
+ keyValueHandler.setClusterID(scmId);
Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
handlers.put(ContainerProtos.ContainerType.KeyValueContainer,
keyValueHandler);
@@ -129,12 +239,13 @@ public void setup() throws Exception {
when(ozoneContainer.getDispatcher())
.thenReturn(mock(ContainerDispatcher.class));
+ DiskBalancerConfiguration diskBalancerConfiguration =
conf.getObject(DiskBalancerConfiguration.class);
+ diskBalancerConfiguration.setDiskBalancerShouldRun(true);
+ conf.setFromObject(diskBalancerConfiguration);
diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer,
100, conf, 1);
- List<StorageVolume> volumes = volumeSet.getVolumesList();
- sourceVolume = (HddsVolume) volumes.get(0);
- destVolume = (HddsVolume) volumes.get(1);
+ KeyValueContainer.setInjector(kvFaultInjector);
}
@AfterEach
@@ -150,51 +261,85 @@ public void cleanup() throws IOException {
if (testRoot.exists()) {
FileUtils.deleteDirectory(testRoot);
}
+
+ kvFaultInjector.reset();
+ KeyValueContainer.setInjector(null);
+ DiskBalancerService.setInjector(null);
+ DefaultContainerChoosingPolicy.setTest(false);
}
- @Test
- public void moveSuccess() throws IOException {
- Container container = createContainer(CONTAINER_ID, sourceVolume);
+ @ParameterizedTest
+ @EnumSource(names = {"CLOSED", "QUASI_CLOSED"})
+ public void moveSuccess(State containerState) throws IOException {
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
- String oldContainerPath = container.getContainerData().getContainerPath();
+ long initialDestCommitted = destVolume.getCommittedBytes();
+ long initialSourceDelta =
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+ 0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
- DiskBalancerService.DiskBalancerTask task =
getTask(container.getContainerData());
+ Container container = createContainer(CONTAINER_ID, sourceVolume,
containerState);
+ State originalState = container.getContainerState();
+ assertEquals(initialSourceUsed + CONTAINER_SIZE,
sourceVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+
+ String oldContainerPath = container.getContainerData().getContainerPath();
+ if (containerState == State.QUASI_CLOSED) {
+ DefaultContainerChoosingPolicy.setTest(true);
+ }
+ DiskBalancerService.DiskBalancerTask task = getTask();
task.call();
+ assertEquals(State.DELETED, container.getContainerState());
+ // Asserts
Container newContainer = containerSet.getContainer(CONTAINER_ID);
assertNotNull(newContainer);
assertNotEquals(container, newContainer);
+ assertEquals(originalState, newContainer.getContainerState());
assertEquals(destVolume, newContainer.getContainerData().getVolume());
- assertEquals(initialSourceUsed - CONTAINER_SIZE,
- sourceVolume.getCurrentUsage().getUsedSpace());
- assertEquals(initialDestUsed + CONTAINER_SIZE,
- destVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(initialSourceUsed,
sourceVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(initialDestUsed + CONTAINER_SIZE,
destVolume.getCurrentUsage().getUsedSpace());
assertFalse(new File(oldContainerPath).exists());
- assertTrue(
- new File(newContainer.getContainerData().getContainerPath()).exists());
- assertEquals(1,
- diskBalancerService.getMetrics().getSuccessCount());
- assertEquals(CONTAINER_SIZE,
- diskBalancerService.getMetrics().getSuccessBytes());
+ assertTrue(new
File(newContainer.getContainerData().getContainerPath()).exists());
+ assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
+ assertEquals(CONTAINER_SIZE,
diskBalancerService.getMetrics().getSuccessBytes());
+ assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+ assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
@Test
- public void moveFailsOnCopy() throws IOException {
- Container container = createContainer(CONTAINER_ID, sourceVolume);
+ public void moveFailsAfterCopy() throws IOException, InterruptedException,
TimeoutException, ExecutionException {
+ Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+ long initialDestCommitted = destVolume.getCommittedBytes();
+ long initialSourceDelta =
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+ 0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
- // Use spy ContainerController to inject failure during copy
- ContainerController spyController = spy(controller);
- doThrow(new IOException("Mockito spy: copy failed"))
- .when(spyController).copyContainer(any(ContainerData.class),
any(Path.class));
- when(ozoneContainer.getController()).thenReturn(spyController);
-
- DiskBalancerService.DiskBalancerTask task =
getTask(container.getContainerData());
- task.call();
-
+ // verify temp container directory doesn't exist before task execution
+ Path tempContainerDir = destVolume.getTmpDir().toPath()
+
.resolve(DiskBalancerService.DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
+ File dir = new File(String.valueOf(tempContainerDir));
+ assertFalse(dir.exists(), "Temp container directory should not exist
before task starts");
+
+ kvFaultInjector.setException(new IOException("Fault injection: copy
failed"), 1);
+ final TestFaultInjector serviceFaultInjector = new TestFaultInjector();
+ DiskBalancerService.setInjector(serviceFaultInjector);
+ DiskBalancerService.DiskBalancerTask task = getTask();
+ CompletableFuture completableFuture = CompletableFuture.runAsync(() ->
task.call());
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return Files.exists(tempContainerDir) &&
!FileUtils.isEmptyDirectory(tempContainerDir.toFile());
+ } catch (IOException e) {
+ fail("Failed to check temp container directory existence", e);
+ }
+ return false;
+ }, 100, 30000);
+
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
+ serviceFaultInjector.resume();
+ // wait for task to be completed
+ completableFuture.get();
Container originalContainer = containerSet.getContainer(CONTAINER_ID);
assertNotNull(originalContainer);
assertEquals(container, originalContainer);
@@ -203,30 +348,58 @@ public void moveFailsOnCopy() throws IOException {
assertEquals(initialSourceUsed,
sourceVolume.getCurrentUsage().getUsedSpace());
assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
assertTrue(new File(oldContainerPath).exists());
- Path tempDir = destVolume.getTmpDir().toPath()
- .resolve(DiskBalancerService.DISK_BALANCER_DIR);
- assertFalse(Files.exists(tempDir),
- "Temp directory should be cleaned up");
+ assertFalse(Files.exists(tempContainerDir), "Temp container directory
should be cleaned up");
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+ assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
@Test
- public void moveFailsOnImportContainer() throws IOException {
- Container container = createContainer(CONTAINER_ID, sourceVolume);
+ public void moveFailsOnAtomicMove() throws IOException,
InterruptedException, TimeoutException, ExecutionException {
+ Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+ long initialDestCommitted = destVolume.getCommittedBytes();
+ long initialSourceDelta =
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+ 0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
-
- // Use spy to inject failure during the atomic move
- ContainerController spyController = spy(controller);
- doThrow(new IOException("Mockito spy: container import failed"))
- .when(spyController).importContainer(any(ContainerData.class),
any(Path.class));
- when(ozoneContainer.getController()).thenReturn(spyController);
-
- DiskBalancerService.DiskBalancerTask task = getTask(
- container.getContainerData());
- task.call();
-
+ Path tempDir = destVolume.getTmpDir().toPath()
+ .resolve(DiskBalancerService.DISK_BALANCER_DIR)
+ .resolve(String.valueOf(CONTAINER_ID));
+ assertFalse(Files.exists(tempDir), "Temp container directory should not
exist");
+ Path destDirPath = Paths.get(
+ KeyValueContainerLocationUtil.getBaseContainerLocation(
+ destVolume.getHddsRootDir().toString(), scmId,
+ container.getContainerData().getContainerID()));
+ assertFalse(Files.exists(destDirPath), "Dest container directory should
not exist");
+
+ // create dest container directory
+ assertTrue(destDirPath.toFile().mkdirs());
+ // create one file in the dest container directory
+ Path testfile = destDirPath.resolve("testfile");
+ assertTrue(testfile.toFile().createNewFile());
+
+ GenericTestUtils.LogCapturer serviceLog =
GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class);
+ final TestFaultInjector serviceFaultInjector = new TestFaultInjector();
+ DiskBalancerService.setInjector(serviceFaultInjector);
+ DiskBalancerService.DiskBalancerTask task = getTask();
+ CompletableFuture completableFuture = CompletableFuture.runAsync(() ->
task.call());
+ // wait for temp container directory to be created
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return Files.exists(tempDir) &&
!FileUtils.isEmptyDirectory(tempDir.toFile());
+ } catch (IOException e) {
+ fail("Failed to check temp container directory existence", e);
+ }
+ return false;
+ }, 100, 30000);
+
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ serviceFaultInjector.resume();
+ completableFuture.get();
+
+ String expectedString = "Container Directory " + destDirPath + " already
exists and are not empty";
+ assertTrue(serviceLog.getOutput().contains(expectedString));
Container originalContainer = containerSet.getContainer(CONTAINER_ID);
assertNotNull(originalContainer);
assertEquals(container, originalContainer);
@@ -234,19 +407,31 @@ public void moveFailsOnImportContainer() throws
IOException {
assertEquals(initialSourceUsed,
sourceVolume.getCurrentUsage().getUsedSpace());
assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
assertTrue(new File(oldContainerPath).exists());
- Path tempDir = destVolume.getTmpDir().toPath()
- .resolve(DiskBalancerService.DISK_BALANCER_DIR)
- .resolve(String.valueOf(CONTAINER_ID));
assertFalse(Files.exists(tempDir), "Temp copy should be cleaned up");
+ assertTrue(Files.exists(destDirPath), "Dest container directory should not
be cleaned up");
+ assertTrue(testfile.toFile().exists(), "testfile should not be cleaned
up");
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+ assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
@Test
- public void moveFailsDuringInMemoryUpdate() throws IOException {
- Container container = createContainer(CONTAINER_ID, sourceVolume);
+ public void moveFailsDuringInMemoryUpdate()
+ throws IOException, InterruptedException, TimeoutException,
ExecutionException {
+ Container container = createContainer(CONTAINER_ID, sourceVolume,
State.QUASI_CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+ long initialDestCommitted = destVolume.getCommittedBytes();
+ long initialSourceDelta =
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+ 0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
+ Path destDirPath = Paths.get(
+ KeyValueContainerLocationUtil.getBaseContainerLocation(
+ destVolume.getHddsRootDir().toString(), scmId,
+ container.getContainerData().getContainerID()));
+ assertFalse(Files.exists(destDirPath),
+ "Destination container should not exist before task execution");
ContainerSet spyContainerSet = spy(containerSet);
doThrow(new StorageContainerException("Mockito spy: updateContainer
failed",
@@ -254,10 +439,24 @@ public void moveFailsDuringInMemoryUpdate() throws
IOException {
.when(spyContainerSet).updateContainer(any(Container.class));
when(ozoneContainer.getContainerSet()).thenReturn(spyContainerSet);
-
- DiskBalancerService.DiskBalancerTask task = getTask(
- container.getContainerData());
- task.call();
+ DefaultContainerChoosingPolicy.setTest(true);
+ DiskBalancerService.DiskBalancerTask task = getTask();
+ CompletableFuture completableFuture = CompletableFuture.runAsync(() ->
task.call());
+
+ final TestFaultInjector serviceFaultInjector = new TestFaultInjector();
+ DiskBalancerService.setInjector(serviceFaultInjector);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return Files.exists(destDirPath) &&
!FileUtils.isEmptyDirectory(destDirPath.toFile());
+ } catch (IOException e) {
+ fail("Failed to check dest container directory existence", e);
+ }
+ return false;
+ }, 100, 30000);
+
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ serviceFaultInjector.resume();
+ // wait for task to be completed
+ completableFuture.get();
// Asserts for rollback
// The move succeeded on disk but should be reverted by the catch block
@@ -268,23 +467,27 @@ public void moveFailsDuringInMemoryUpdate() throws
IOException {
assertEquals(initialSourceUsed,
sourceVolume.getCurrentUsage().getUsedSpace());
assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
assertTrue(new File(oldContainerPath).exists());
+ assertFalse(FileUtils.isEmptyDirectory(new File(oldContainerPath)));
+ assertEquals(State.QUASI_CLOSED, originalContainer.getContainerState(),
+ "Container state should remain QUASI_CLOSED after rollback");
// Verify the partially moved container at destination is cleaned up
- String idDir = container.getContainerData().getOriginNodeId();
- Path finalDestPath = Paths.get(
- KeyValueContainerLocationUtil.getBaseContainerLocation(
- destVolume.getHddsRootDir().toString(), idDir,
- container.getContainerData().getContainerID()));
- assertFalse(Files.exists(finalDestPath),
+ assertFalse(Files.exists(destDirPath),
"Moved container at destination should be cleaned up on failure");
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+ assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
@Test
public void moveFailsDuringOldContainerRemove() throws IOException {
- Container container = createContainer(CONTAINER_ID, sourceVolume);
+ Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+ long initialDestCommitted = destVolume.getCommittedBytes();
+ long initialSourceDelta =
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+ 0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
// Use a static mock for the KeyValueContainer utility class
try (MockedStatic<KeyValueContainerUtil> mockedUtil =
@@ -294,35 +497,77 @@ public void moveFailsDuringOldContainerRemove() throws
IOException {
any(KeyValueContainerData.class), any(OzoneConfiguration.class)))
.thenThrow(new IOException("Mockito: old container delete()
failed"));
- DiskBalancerService.DiskBalancerTask task = getTask(
- container.getContainerData());
+ DiskBalancerService.DiskBalancerTask task = getTask();
task.call();
+
+ // Assertions for successful move despite old container cleanup failure
+ assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
+ assertEquals(0, diskBalancerService.getMetrics().getFailureCount());
+ assertEquals(CONTAINER_SIZE,
diskBalancerService.getMetrics().getSuccessBytes());
+
+ // Verify new container is active on the destination volume
+ Container newContainer = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(newContainer);
+ assertNotEquals(container, newContainer);
+ assertEquals(destVolume, newContainer.getContainerData().getVolume());
+ assertTrue(new
File(newContainer.getContainerData().getContainerPath()).exists());
+
+ // Verify old container still exists
+ assertTrue(new
File(container.getContainerData().getContainerPath()).exists());
+ assertFalse(FileUtils.isEmptyDirectory(new
File(container.getContainerData().getContainerPath())));
+ assertEquals(State.DELETED, container.getContainerState());
+
+ // Verify volume usage is updated correctly
+ assertEquals(initialSourceUsed,
+ sourceVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(initialDestUsed + CONTAINER_SIZE,
+ destVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
+ }
- // Assertions for successful move despite old container cleanup failure
- assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
- assertEquals(0, diskBalancerService.getMetrics().getFailureCount());
- assertEquals(CONTAINER_SIZE,
diskBalancerService.getMetrics().getSuccessBytes());
+ @Test
+ public void testDestVolumeCommittedSpaceReleased() throws IOException {
+ createContainer(CONTAINER_ID, sourceVolume, State.CLOSED);
+ long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+ long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+ long initialDestCommitted = destVolume.getCommittedBytes();
+ long initialSourceDelta =
diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
+ 0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
- // Verify new container is active on the destination volume
- Container newContainer = containerSet.getContainer(CONTAINER_ID);
- assertNotNull(newContainer);
- assertEquals(destVolume, newContainer.getContainerData().getVolume());
- assertTrue(new
File(newContainer.getContainerData().getContainerPath()).exists());
+ GenericTestUtils.LogCapturer serviceLog =
GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class);
+ DiskBalancerService.DiskBalancerTask task = getTask();
+ long defaultContainerSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+ // verify committed space is reserved for destination volume
+ assertEquals(defaultContainerSize, destVolume.getCommittedBytes() -
initialDestCommitted);
- // Verify volume usage is updated correctly
- assertEquals(initialSourceUsed - CONTAINER_SIZE,
- sourceVolume.getCurrentUsage().getUsedSpace());
- assertEquals(initialDestUsed + CONTAINER_SIZE,
- destVolume.getCurrentUsage().getUsedSpace());
+ // delete the container from containerSet to simulate a failure
+ containerSet.removeContainer(CONTAINER_ID);
+
+ task.call();
+ String expectedString = "Container " + CONTAINER_ID + " doesn't exist in
ContainerSet";
+ assertTrue(serviceLog.getOutput().contains(expectedString));
+ Container originalContainer = containerSet.getContainer(CONTAINER_ID);
+ assertNull(originalContainer);
+ assertEquals(initialSourceUsed,
sourceVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+ assertEquals(0, destVolume.getCommittedBytes() - initialDestCommitted);
+ assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+ assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
+
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+ assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
- private KeyValueContainer createContainer(long containerId, HddsVolume vol)
+ private KeyValueContainer createContainer(long containerId, HddsVolume vol,
State state)
throws IOException {
KeyValueContainerData containerData = new KeyValueContainerData(
containerId, ContainerLayoutVersion.FILE_PER_BLOCK, CONTAINER_SIZE,
UUID.randomUUID().toString(), datanodeUuid);
- containerData.setState(State.CLOSED);
+ containerData.setState(state);
containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE);
KeyValueContainer container = new KeyValueContainer(containerData, conf);
@@ -337,8 +582,7 @@ private KeyValueContainer createContainer(long containerId,
HddsVolume vol)
return container;
}
- private DiskBalancerService.DiskBalancerTask getTask(ContainerData data) {
- return diskBalancerService.createDiskBalancerTask(data, sourceVolume,
- destVolume);
+ private DiskBalancerService.DiskBalancerTask getTask() {
+ return (DiskBalancerService.DiskBalancerTask)
diskBalancerService.getTasks().poll();
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
index e300f8da6e..b65cfa637c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.scm.node;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -36,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
@@ -83,7 +86,7 @@ public class TestVolumeChoosingPolicy {
public void setup() throws Exception {
hddsVolumes = new ArrayList<>();
createVolumes();
- volumeChoosingPolicy = new DefaultVolumeChoosingPolicy();
+ volumeChoosingPolicy = new DefaultVolumeChoosingPolicy(new
ReentrantLock());
executor = Executors.newFixedThreadPool(NUM_THREADS);
}
@@ -108,6 +111,18 @@ public void cleanUp() {
}
}
+ @Test
+ @Timeout(30)
+ public void testVolumeChoosingFailureDueToDiskFull() {
+ // update volume configure, set a huge min free space
+ CONF.set("hdds.datanode.volume.min.free.space", "990GB");
+ for (StorageVolume volume: volumeSet.getVolumesList()) {
+ volume.setConf(CONF);
+ }
+ assertNull(volumeChoosingPolicy.chooseVolume(volumeSet, THRESHOLD,
deltaSizes, 0));
+ assertEquals(NUM_VOLUMES, volumeSet.getVolumesList().size());
+ }
+
@Test
@Timeout(300)
public void testConcurrentVolumeChoosingPerformance() throws Exception {
@@ -159,7 +174,7 @@ private void testPolicyPerformance(String policyName,
DiskBalancerVolumeChoosing
long threadStart = System.nanoTime();
try {
- Pair<HddsVolume, HddsVolume> pair =
policy.chooseVolume(volumeSet, THRESHOLD, deltaSizes);
+ Pair<HddsVolume, HddsVolume> pair =
policy.chooseVolume(volumeSet, THRESHOLD, deltaSizes, 0);
if (pair == null) {
volumeNotChosen++;
@@ -200,7 +215,7 @@ private void testPolicyPerformance(String policyName,
DiskBalancerVolumeChoosing
System.out.println("Performance results for " + policyName);
System.out.println("Total volumes: " + NUM_VOLUMES);
System.out.println("Total threads: " + NUM_THREADS);
- System.out.println("Threshold(%): " + THRESHOLD * 100.0);
+ System.out.println("Threshold(%): " + THRESHOLD);
System.out.println("Total operations: " + totalOperations);
System.out.println("Volume Pair Chosen operations: " +
pairChosenCount.get());
System.out.println("Volume Pair Not Chosen operations: " +
pairNotChosenCount.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]