This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new bf4f26c871 HDDS-12654. [DiskBalancer]Add a
hdds.datanode.disk.balancer.stop.after.disk.even property (#8239)
bf4f26c871 is described below
commit bf4f26c87101082bcf8b6821c0589943fb7d492d
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Fri May 9 19:59:42 2025 +0530
HDDS-12654. [DiskBalancer]Add a
hdds.datanode.disk.balancer.stop.after.disk.even property (#8239)
---
.../apache/hadoop/hdds/scm/client/ScmClient.java | 2 +
.../protocol/StorageContainerLocationProtocol.java | 2 +
.../scm/storage/DiskBalancerConfiguration.java | 27 ++++++++++++--
.../container/diskbalancer/DiskBalancerInfo.java | 28 +++++++++++---
.../diskbalancer/DiskBalancerService.java | 38 +++++++++++++++----
.../container/diskbalancer/DiskBalancerYaml.java | 14 ++++++-
.../states/endpoint/TestHeartbeatEndpointTask.java | 2 +-
.../diskbalancer/TestDiskBalancerService.java | 5 ++-
.../diskbalancer/TestDiskBalancerYaml.java | 3 +-
...inerLocationProtocolClientSideTranslatorPB.java | 7 +++-
.../interface-client/src/main/proto/hdds.proto | 1 +
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 15 ++++----
...inerLocationProtocolServerSideTranslatorPB.java | 2 +
.../hdds/scm/server/SCMClientProtocolServer.java | 9 +++--
.../hdds/scm/cli/ContainerOperationClient.java | 8 ++--
.../scm/cli/datanode/DiskBalancerCommands.java | 6 ++-
.../cli/datanode/DiskBalancerStartSubcommand.java | 6 ++-
.../cli/datanode/DiskBalancerUpdateSubcommand.java | 6 ++-
.../cli/datanode/TestDiskBalancerSubCommand.java | 8 ++--
.../hadoop/ozone/scm/node/TestDiskBalancer.java | 43 ++++++++++++++++++++++
20 files changed, 187 insertions(+), 45 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 8857913c9a..17417f8c98 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -496,6 +496,7 @@ List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread,
+ Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;
/**
@@ -512,5 +513,6 @@ List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold,
Optional<Long> bandwidth,
Optional<Integer> parallelThread,
+ Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 8774e6eb07..7b9ad4c10f 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -508,6 +508,7 @@ List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread,
+ Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;
/**
@@ -523,5 +524,6 @@ List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread,
+ Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index 951cc62a8a..6984bb94a9 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -110,12 +110,20 @@ public final class DiskBalancerConfiguration {
"service.")
private Class<?> containerChoosingPolicyClass;
+ @Config(key = "stop.after.disk.even",
+ type = ConfigType.BOOLEAN,
+ defaultValue = "true",
+ tags = {ConfigTag.DISKBALANCER},
+ description = "If true, the DiskBalancer will automatically stop once
disks are balanced.")
+ private boolean stopAfterDiskEven = true;
+
public DiskBalancerConfiguration(Optional<Double> threshold,
Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread) {
+ Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven) {
threshold.ifPresent(aDouble -> this.threshold = aDouble);
bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong);
parallelThread.ifPresent(integer -> this.parallelThread = integer);
+ stopAfterDiskEven.ifPresent(bool -> this.stopAfterDiskEven = bool);
}
public DiskBalancerConfiguration() {
@@ -157,6 +165,14 @@ public Class<?> getContainerChoosingPolicyClass() {
return containerChoosingPolicyClass;
}
+ public boolean isStopAfterDiskEven() {
+ return stopAfterDiskEven;
+ }
+
+ public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
+ this.stopAfterDiskEven = stopAfterDiskEven;
+ }
+
/**
* Gets the threshold value for DiskBalancer.
*
@@ -234,10 +250,11 @@ public String toString() {
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
+ "%-50s %s%n" +
"%-50s %s%n",
"Key", "Value",
"Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB,
- "Parallel Thread", parallelThread);
+ "Parallel Thread", parallelThread, "Stop After Disk Even",
stopAfterDiskEven);
}
public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder()
{
@@ -246,7 +263,8 @@ public HddsProtos.DiskBalancerConfigurationProto.Builder
toProtobufBuilder() {
builder.setThreshold(threshold)
.setDiskBandwidthInMB(diskBandwidthInMB)
- .setParallelThread(parallelThread);
+ .setParallelThread(parallelThread)
+ .setStopAfterDiskEven(stopAfterDiskEven);
return builder;
}
@@ -262,6 +280,9 @@ public static DiskBalancerConfiguration fromProtobuf(
if (proto.hasParallelThread()) {
config.setParallelThread(proto.getParallelThread());
}
+ if (proto.hasStopAfterDiskEven()) {
+ config.setStopAfterDiskEven(proto.getStopAfterDiskEven());
+ }
return config;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index 236c22063f..60eabf2376 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -31,6 +31,7 @@ public class DiskBalancerInfo {
private double threshold;
private long bandwidthInMB;
private int parallelThread;
+ private boolean stopAfterDiskEven;
private DiskBalancerVersion version;
private long successCount;
private long failureCount;
@@ -38,28 +39,30 @@ public class DiskBalancerInfo {
private long balancedBytes;
public DiskBalancerInfo(boolean shouldRun, double threshold,
- long bandwidthInMB, int parallelThread) {
- this(shouldRun, threshold, bandwidthInMB, parallelThread,
+ long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven) {
+ this(shouldRun, threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven,
DiskBalancerVersion.DEFAULT_VERSION);
}
public DiskBalancerInfo(boolean shouldRun, double threshold,
- long bandwidthInMB, int parallelThread, DiskBalancerVersion version) {
+ long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven,
DiskBalancerVersion version) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
+ this.stopAfterDiskEven = stopAfterDiskEven;
this.version = version;
}
@SuppressWarnings("checkstyle:ParameterNumber")
public DiskBalancerInfo(boolean shouldRun, double threshold,
- long bandwidthInMB, int parallelThread, DiskBalancerVersion version,
+ long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven,
DiskBalancerVersion version,
long successCount, long failureCount, long bytesToMove, long
balancedBytes) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
+ this.stopAfterDiskEven = stopAfterDiskEven;
this.version = version;
this.successCount = successCount;
this.failureCount = failureCount;
@@ -73,6 +76,7 @@ public DiskBalancerInfo(boolean shouldRun,
this.threshold = diskBalancerConf.getThreshold();
this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB();
this.parallelThread = diskBalancerConf.getParallelThread();
+ this.stopAfterDiskEven = diskBalancerConf.isStopAfterDiskEven();
this.version = DiskBalancerVersion.DEFAULT_VERSION;
}
@@ -86,11 +90,14 @@ public void updateFromConf(DiskBalancerConfiguration
diskBalancerConf) {
if (parallelThread != diskBalancerConf.getParallelThread()) {
setParallelThread(diskBalancerConf.getParallelThread());
}
+ if (stopAfterDiskEven != diskBalancerConf.isStopAfterDiskEven()) {
+ setStopAfterDiskEven(diskBalancerConf.isStopAfterDiskEven());
+ }
}
public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto
toDiskBalancerReportProto() {
DiskBalancerConfiguration conf = new
DiskBalancerConfiguration(Optional.of(threshold),
- Optional.of(bandwidthInMB), Optional.of(parallelThread));
+ Optional.of(bandwidthInMB), Optional.of(parallelThread),
Optional.of(stopAfterDiskEven));
HddsProtos.DiskBalancerConfigurationProto confProto =
conf.toProtobufBuilder().build();
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder
builder =
@@ -136,6 +143,14 @@ public void setParallelThread(int parallelThread) {
this.parallelThread = parallelThread;
}
+ public boolean isStopAfterDiskEven() {
+ return stopAfterDiskEven;
+ }
+
+ public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
+ this.stopAfterDiskEven = stopAfterDiskEven;
+ }
+
public DiskBalancerVersion getVersion() {
return version;
}
@@ -157,12 +172,13 @@ public boolean equals(Object o) {
Double.compare(that.threshold, threshold) == 0 &&
bandwidthInMB == that.bandwidthInMB &&
parallelThread == that.parallelThread &&
+ stopAfterDiskEven == that.stopAfterDiskEven &&
version == that.version;
}
@Override
public int hashCode() {
- return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread,
+ return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven,
version);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 558b806b39..610fe816a8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -58,7 +59,6 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.util.Time;
-import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +82,7 @@ public class DiskBalancerService extends BackgroundService {
private double threshold;
private long bandwidthInMB;
private int parallelThread;
+ private boolean stopAfterDiskEven;
private DiskBalancerVersion version;
@@ -161,8 +162,8 @@ private void constructTmpDir() throws IOException {
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
Path tmpDir = getDiskBalancerTmpDir(volume);
try {
- FileUtils.deleteFully(tmpDir);
- FileUtils.createDirectories(tmpDir);
+ FileUtils.deleteDirectory(tmpDir.toFile());
+ FileUtils.forceMkdir(tmpDir.toFile());
} catch (IOException ex) {
LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
ex);
@@ -206,6 +207,7 @@ private void applyDiskBalancerInfo(DiskBalancerInfo
diskBalancerInfo)
setThreshold(diskBalancerInfo.getThreshold());
setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
setParallelThread(diskBalancerInfo.getParallelThread());
+ setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven());
setVersion(diskBalancerInfo.getVersion());
// Default executorService is ScheduledThreadPoolExecutor, so we can
@@ -295,6 +297,10 @@ public void setParallelThread(int parallelThread) {
this.parallelThread = parallelThread;
}
+ public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
+ this.stopAfterDiskEven = stopAfterDiskEven;
+ }
+
public void setVersion(DiskBalancerVersion version) {
this.version = version;
}
@@ -309,6 +315,7 @@ public DiskBalancerReportProto getDiskBalancerReportProto()
{
.setThreshold(threshold)
.setDiskBandwidthInMB(bandwidthInMB)
.setParallelThread(parallelThread)
+ .setStopAfterDiskEven(stopAfterDiskEven)
.build())
.build();
}
@@ -354,6 +361,18 @@ public BackgroundTaskQueue getTasks() {
}
if (queue.isEmpty()) {
+ bytesToMove = 0;
+ if (stopAfterDiskEven) {
+ LOG.info("Disk balancer is stopped due to disk even as" +
+ " the property StopAfterDiskEven is set to true.");
+ setShouldRun(false);
+ try {
+ // Persist the updated shouldRun status into the YAML file
+ writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile);
+ } catch (IOException e) {
+ LOG.warn("Failed to persist updated DiskBalancerInfo to file.", e);
+ }
+ }
metrics.incrIdleLoopNoAvailableVolumePairCount();
} else {
bytesToMove = calculateBytesToMove(volumeSet);
@@ -463,9 +482,11 @@ public BackgroundTaskResult call() {
totalBalancedBytes.addAndGet(containerSize);
} catch (IOException e) {
moveSucceeded = false;
+ LOG.warn("Failed to move container {}", containerId, e);
if (diskBalancerTmpDir != null) {
try {
- Files.deleteIfExists(diskBalancerTmpDir);
+ File dir = new File(String.valueOf(diskBalancerTmpDir));
+ FileUtils.deleteDirectory(dir);
} catch (IOException ex) {
LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir,
ex);
@@ -473,10 +494,11 @@ public BackgroundTaskResult call() {
}
if (diskBalancerDestDir != null) {
try {
- Files.deleteIfExists(diskBalancerDestDir);
+ File dir = new File(String.valueOf(diskBalancerDestDir));
+ FileUtils.deleteDirectory(dir);
} catch (IOException ex) {
- LOG.warn("Failed to delete dest directory {}: {}.",
- diskBalancerDestDir, ex.getMessage());
+ LOG.warn("Failed to delete dest directory {}",
+ diskBalancerDestDir, ex);
}
}
// Only need to check for destVolume, sourceVolume's usedSpace is
@@ -514,7 +536,7 @@ private void postCall() {
public DiskBalancerInfo getDiskBalancerInfo() {
return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
- parallelThread, version, metrics.getSuccessCount(),
+ parallelThread, stopAfterDiskEven, version, metrics.getSuccessCount(),
metrics.getFailureCount(), bytesToMove, metrics.getSuccessBytes());
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
index 381efdd08e..d33e75cded 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
@@ -79,6 +79,7 @@ public static DiskBalancerInfo readDiskBalancerInfoFile(File
path)
diskBalancerInfoYaml.getThreshold(),
diskBalancerInfoYaml.getBandwidthInMB(),
diskBalancerInfoYaml.getParallelThread(),
+ diskBalancerInfoYaml.isStopAfterDiskEven(),
DiskBalancerVersion.getDiskBalancerVersion(
diskBalancerInfoYaml.version));
}
@@ -94,6 +95,7 @@ public static class DiskBalancerInfoYaml {
private double threshold;
private long bandwidthInMB;
private int parallelThread;
+ private boolean stopAfterDiskEven;
private int version;
@@ -102,11 +104,12 @@ public DiskBalancerInfoYaml() {
}
private DiskBalancerInfoYaml(boolean shouldRun, double threshold,
- long bandwidthInMB, int parallelThread, int version) {
+ long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, int
version) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
+ this.stopAfterDiskEven = stopAfterDiskEven;
this.version = version;
}
@@ -142,6 +145,14 @@ public int getParallelThread() {
return this.parallelThread;
}
+ public boolean isStopAfterDiskEven() {
+ return stopAfterDiskEven;
+ }
+
+ public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
+ this.stopAfterDiskEven = stopAfterDiskEven;
+ }
+
public void setVersion(int version) {
this.version = version;
}
@@ -159,6 +170,7 @@ private static DiskBalancerInfoYaml getDiskBalancerInfoYaml(
diskBalancerInfo.getThreshold(),
diskBalancerInfo.getBandwidthInMB(),
diskBalancerInfo.getParallelThread(),
+ diskBalancerInfo.isStopAfterDiskEven(),
diskBalancerInfo.getVersion().getVersion());
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 831d9c4782..47cd07a950 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -79,7 +79,7 @@ public class TestHeartbeatEndpointTask {
public void setup() {
datanodeStateMachine = mock(DatanodeStateMachine.class);
container = mock(OzoneContainer.class);
- when(container.getDiskBalancerInfo()).thenReturn(new
DiskBalancerInfo(true, 10, 20, 30));
+ when(container.getDiskBalancerInfo()).thenReturn(new
DiskBalancerInfo(true, 10, 20, 30, true));
when(datanodeStateMachine.getContainer()).thenReturn(container);
PipelineReportsProto pipelineReportsProto =
mock(PipelineReportsProto.class);
when(pipelineReportsProto.getPipelineReportList()).thenReturn(Collections.emptyList());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 9c9360dc4b..eb591215d6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -120,6 +120,7 @@ public void testUpdateService(ContainerTestVersionInfo
versionInfo) throws Excep
svc.setThreshold(10.0d);
svc.setBandwidthInMB(1L);
svc.setParallelThread(5);
+ svc.setStopAfterDiskEven(true);
svc.setVersion(DiskBalancerVersion.DEFAULT_VERSION);
svc.start();
@@ -128,14 +129,16 @@ public void testUpdateService(ContainerTestVersionInfo
versionInfo) throws Excep
assertEquals(10, svc.getDiskBalancerInfo().getThreshold(), 0.0);
assertEquals(1, svc.getDiskBalancerInfo().getBandwidthInMB());
assertEquals(5, svc.getDiskBalancerInfo().getParallelThread());
+ assertTrue(svc.getDiskBalancerInfo().isStopAfterDiskEven());
- DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10);
+ DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10,
false);
svc.refresh(newInfo);
assertFalse(svc.getDiskBalancerInfo().isShouldRun());
assertEquals(20, svc.getDiskBalancerInfo().getThreshold(), 0.0);
assertEquals(5, svc.getDiskBalancerInfo().getBandwidthInMB());
assertEquals(10, svc.getDiskBalancerInfo().getParallelThread());
+ assertFalse(svc.getDiskBalancerInfo().isStopAfterDiskEven());
svc.shutdown();
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
index 7eccda2e47..6cdd087097 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
@@ -39,13 +39,14 @@ public void testCreateYaml() throws IOException {
double threshold = 10;
long bandwidthInMB = 100;
int parallelThread = 5;
+ boolean stopAfterDiskEven = true;
DiskBalancerVersion version = DiskBalancerVersion.DEFAULT_VERSION;
File file = new File(tmpDir.toString(),
OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT);
DiskBalancerInfo info = new DiskBalancerInfo(shouldRun, threshold,
- bandwidthInMB, parallelThread, version);
+ bandwidthInMB, parallelThread, stopAfterDiskEven, version);
DiskBalancerYaml.createDiskBalancerInfoFile(info, file);
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index a5fd9cfbb7..43cff0f52d 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -1218,12 +1218,14 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
@Override
public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
- Optional<List<String>> hosts) throws IOException {
+ Optional<Boolean> stopAfterDiskEven, Optional<List<String>> hosts)
+ throws IOException {
HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
HddsProtos.DiskBalancerConfigurationProto.newBuilder();
threshold.ifPresent(confBuilder::setThreshold);
bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
parallelThread.ifPresent(confBuilder::setParallelThread);
+ stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven);
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
@@ -1268,13 +1270,14 @@ public List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
@Override
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<List<String>> hosts)
+ Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
throws IOException {
HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
HddsProtos.DiskBalancerConfigurationProto.newBuilder();
threshold.ifPresent(confBuilder::setThreshold);
bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
parallelThread.ifPresent(confBuilder::setParallelThread);
+ stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven);
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 10f1f197a1..da8aedd310 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -552,6 +552,7 @@ message DiskBalancerConfigurationProto {
optional double threshold = 1;
optional uint64 diskBandwidthInMB = 2;
optional int32 parallelThread = 3;
+ optional bool stopAfterDiskEven = 4;
}
enum DiskBalancerRunningStatus {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index b6a7032007..4e18758e89 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -140,8 +140,8 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
*/
public List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<List<String>> hosts)
- throws IOException {
+ Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
+ Optional<List<String>> hosts) throws IOException {
List<DatanodeDetails> dns;
if (hosts.isPresent()) {
dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
@@ -161,7 +161,7 @@ public List<DatanodeAdminError> startDiskBalancer(
// If command doesn't have configuration change, then we reuse the
// latest configuration reported from Datnaodes
DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn,
- threshold, bandwidthInMB, parallelThread);
+ threshold, bandwidthInMB, parallelThread, stopAfterDiskEven);
DiskBalancerCommand command = new DiskBalancerCommand(
HddsProtos.DiskBalancerOpType.START, updateConf);
sendCommand(dn, command);
@@ -213,7 +213,7 @@ public List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
*/
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<List<String>> hosts)
+ Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
throws IOException {
List<DatanodeDetails> dns;
if (hosts.isPresent()) {
@@ -229,7 +229,7 @@ public List<DatanodeAdminError>
updateDiskBalancerConfiguration(
// If command doesn't have configuration change, then we reuse the
// latest configuration reported from Datnaodes
DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn,
- threshold, bandwidthInMB, parallelThread);
+ threshold, bandwidthInMB, parallelThread, stopAfterDiskEven);
DiskBalancerCommand command = new DiskBalancerCommand(
HddsProtos.DiskBalancerOpType.UPDATE, updateConf);
sendCommand(dn, command);
@@ -329,7 +329,7 @@ public void
processDiskBalancerReport(DiskBalancerReportProto reportProto,
statusMap.put(dn, new DiskBalancerStatus(
isRunning ? DiskBalancerRunningStatus.RUNNING :
DiskBalancerRunningStatus.STOPPED,
diskBalancerConfiguration, successMoveCount, failureMoveCount,
bytesToMove, balancedBytes));
- if (reportProto.hasBalancedBytes()) {
+ if (reportProto.hasBalancedBytes() && balancedBytesMap != null) {
balancedBytesMap.put(dn, reportProto.getBalancedBytes());
}
}
@@ -346,13 +346,14 @@ public void markStatusUnknown(DatanodeDetails dn) {
private DiskBalancerConfiguration attachDiskBalancerConf(
DatanodeDetails dn, Optional<Double> threshold,
- Optional<Long> bandwidthInMB, Optional<Integer> parallelThread) {
+ Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven) {
DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ?
statusMap.get(dn).getDiskBalancerConfiguration() :
new DiskBalancerConfiguration();
threshold.ifPresent(baseConf::setThreshold);
bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB);
parallelThread.ifPresent(baseConf::setParallelThread);
+ stopAfterDiskEven.ifPresent(baseConf::setStopAfterDiskEven);
return baseConf;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 9b6647924f..87cbd06401 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1404,6 +1404,7 @@ public DatanodeDiskBalancerOpResponseProto
getDatanodeDiskBalancerOp(
conf.hasThreshold() ? Optional.of(conf.getThreshold()) :
Optional.empty(),
conf.hasDiskBandwidthInMB() ?
Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(),
conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) :
Optional.empty(),
+ conf.hasStopAfterDiskEven() ?
Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(),
request.getHostsList().isEmpty() ? Optional.empty() :
Optional.of(request.getHostsList()));
break;
case UPDATE:
@@ -1412,6 +1413,7 @@ public DatanodeDiskBalancerOpResponseProto
getDatanodeDiskBalancerOp(
conf.hasThreshold() ? Optional.of(conf.getThreshold()) :
Optional.empty(),
conf.hasDiskBandwidthInMB() ?
Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(),
conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) :
Optional.empty(),
+ conf.hasStopAfterDiskEven() ?
Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(),
request.getHostsList().isEmpty() ? Optional.empty() :
Optional.of(request.getHostsList()));
break;
case STOP:
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 1ab3ede316..2a7f259e79 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1530,7 +1530,8 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
@Override
public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
- Optional<List<String>> hosts) throws IOException {
+ Optional<Boolean> stopAfterDiskEven, Optional<List<String>> hosts)
+ throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser(), false);
} catch (IOException e) {
@@ -1539,7 +1540,7 @@ public List<DatanodeAdminError>
startDiskBalancer(Optional<Double> threshold,
}
return scm.getDiskBalancerManager()
- .startDiskBalancer(threshold, bandwidthInMB, parallelThread, hosts);
+ .startDiskBalancer(threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven, hosts);
}
@Override
@@ -1557,7 +1558,7 @@ public List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
@Override
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<List<String>> hosts)
+ Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser(), false);
@@ -1567,7 +1568,7 @@ public List<DatanodeAdminError>
updateDiskBalancerConfiguration(
}
return scm.getDiskBalancerManager().updateDiskBalancerConfiguration(
- threshold, bandwidthInMB, parallelThread, hosts);
+ threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, hosts);
}
/**
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index cbe071818c..7e94a0aebf 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -607,10 +607,10 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
@Override
public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
- Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+ Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException {
return storageContainerLocationClient.startDiskBalancer(threshold,
- bandwidthInMB, parallelThread, hosts);
+ bandwidthInMB, parallelThread, stopAfterDiskEven, hosts);
}
@Override
@@ -631,9 +631,9 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
@Override
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold, Optional<Long> bandwidth,
- Optional<Integer> parallelThread, Optional<List<String>> hosts)
+ Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
throws IOException {
return storageContainerLocationClient.updateDiskBalancerConfiguration(
- threshold, bandwidth, parallelThread, hosts);
+ threshold, bandwidth, parallelThread, stopAfterDiskEven, hosts);
}
}
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java
index eec770f517..fd8a98468f 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java
@@ -34,6 +34,7 @@
* [ -t/--threshold {@literal <threshold>}]
* [ -b/--bandwidthInMB {@literal <bandwidthInMB>}]
* [ -p/--parallelThread {@literal <parallelThread>}]
+ * [ -s/--stop-after-disk-even {@literal <stopAfterDiskEven>}]
* [ -a/--all {@literal <alldatanodes>}]
* [ -d/--datanodes {@literal <datanodes>}]
* [ {@literal <hosts>}]
@@ -43,13 +44,16 @@
* datanodes
* ozone admin datanode diskbalancer start -a
* start balancer with default values in the configuration on all
- * datanodes in the cluster
+ * datanodes in the cluster and stops automatically after balancing
* ozone admin datanode diskbalancer start -t 5 -d {@literal <hosts>}
* start balancer with a threshold of 5%
* ozone admin datanode diskbalancer start -b 20 -d {@literal <hosts>}
* start balancer with maximum 20MB/s diskbandwidth
* ozone admin datanode diskbalancer start -p 5 -d {@literal <hosts>}
* start balancer with 5 parallel thread on each datanode
+ * ozone admin datanode diskbalancer start -s=false -a}
+ * start balancer on each datanode and will keep running even after
+ * disks are balanced until stopped by the stop command.
* To stop:
* ozone admin datanode diskbalancer stop -a
* stop diskblancer on all datanodes
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
index 23084b8947..c37f560198 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
@@ -53,6 +53,10 @@ public class DiskBalancerStartSubcommand extends
ScmSubcommand {
description = "Max parallelThread for DiskBalancer.")
private Optional<Integer> parallelThread;
+ @Option(names = {"-s", "--stop-after-disk-even"},
+ description = "Stop DiskBalancer automatically after disk utilization is
even.")
+ private Optional<Boolean> stopAfterDiskEven;
+
@CommandLine.Mixin
private DiskBalancerCommonOptions commonOptions =
new DiskBalancerCommonOptions();
@@ -63,7 +67,7 @@ public void execute(ScmClient scmClient) throws IOException {
return;
}
List<DatanodeAdminError> errors =
- scmClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread,
+ scmClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven,
commonOptions.getSpecifiedDatanodes());
System.out.println("Start DiskBalancer on datanode(s):\n" +
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
index 08deec169b..6f1b7ea0d1 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
@@ -53,6 +53,10 @@ public class DiskBalancerUpdateSubcommand extends
ScmSubcommand {
description = "Max parallelThread for DiskBalancer.")
private Optional<Integer> parallelThread;
+ @Option(names = {"-s", "--stop-after-disk-even"},
+ description = "Stop DiskBalancer automatically after disk utilization is
even.")
+ private Optional<Boolean> stopAfterDiskEven;
+
@CommandLine.Mixin
private DiskBalancerCommonOptions commonOptions =
new DiskBalancerCommonOptions();
@@ -64,7 +68,7 @@ public void execute(ScmClient scmClient) throws IOException {
}
List<DatanodeAdminError> errors =
scmClient.updateDiskBalancerConfiguration(threshold, bandwidthInMB,
- parallelThread, commonOptions.getSpecifiedDatanodes());
+ parallelThread, stopAfterDiskEven,
commonOptions.getSpecifiedDatanodes());
System.out.println("Update DiskBalancer Configuration on datanode(s):\n" +
commonOptions.getHostString());
diff --git
a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
index ac97a27f25..091ae68154 100644
---
a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
+++
b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
@@ -116,7 +116,7 @@ public void testDiskBalancerStartSubcommand() throws
IOException {
// Return error
Mockito.when(scmClient.startDiskBalancer(Mockito.any(), Mockito.any(),
- Mockito.any(), Mockito.any()))
+ Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(generateError(10));
try {
@@ -127,7 +127,7 @@ public void testDiskBalancerStartSubcommand() throws
IOException {
// Do not return error
Mockito.when(scmClient.startDiskBalancer(Mockito.any(), Mockito.any(),
- Mockito.any(), Mockito.any()))
+ Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(generateError(0));
try {
@@ -146,7 +146,7 @@ public void testDiskBalancerUpdateSubcommand() throws
IOException {
// Return error
Mockito.when(scmClient.updateDiskBalancerConfiguration(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any()))
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(generateError(10));
try {
@@ -157,7 +157,7 @@ public void testDiskBalancerUpdateSubcommand() throws
IOException {
// Do not return error
Mockito.when(scmClient.updateDiskBalancerConfiguration(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any()))
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(generateError(0));
try {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
index c9083a0b33..13d361ad2f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -21,7 +21,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -33,7 +37,11 @@
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -55,6 +63,7 @@ public static void setup() throws Exception {
ozoneConf = new OzoneConfiguration();
ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+ ozoneConf.setTimeDuration("hdds.datanode.disk.balancer.service.interval",
3, TimeUnit.SECONDS);
cluster =
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
storageClient = new ContainerOperationClient(ozoneConf);
cluster.waitForClusterToBeReady();
@@ -86,6 +95,40 @@ public void testDatanodeDiskBalancerReport() throws
IOException {
>= reportProtoList.get(1).getCurrentVolumeDensitySum());
}
+ @Test
+ public void testDiskBalancerStopAfterEven() throws IOException,
+ InterruptedException, TimeoutException {
+ //capture LOG for DiskBalancerManager and DiskBalancerService
+ LogCapturer logCapturer = LogCapturer.captureLogs(DiskBalancerManager.LOG);
+ LogCapturer dnLogCapturer =
LogCapturer.captureLogs(DiskBalancerService.class);
+
+ // Start DiskBalancer on all datanodes
+ diskBalancerManager.startDiskBalancer(
+ Optional.of(10.0), // threshold
+ Optional.of(10L), // bandwidth in MB
+ Optional.of(5), // parallel threads
+ Optional.of(true), // stopAfterDiskEven
+ Optional.empty()); // apply to all datanodes
+
+ // verify logs for all DNs has started
+ String logs = logCapturer.getOutput();
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ String uuid = dn.getDatanodeDetails().getUuidString();
+ assertTrue(logs.contains("Sending diskBalancerCommand: opType=START") &&
+ logs.contains(uuid));
+ }
+
+ // Wait up to 5 seconds for all DNs to log the stop message
+ GenericTestUtils.waitFor(() -> {
+ String dnLogs = dnLogCapturer.getOutput();
+ long count = Arrays.stream(dnLogs.split("\n"))
+ .filter(line -> line.contains("Disk balancer is stopped due to disk
even as" +
+ " the property StopAfterDiskEven is set to true"))
+ .count();
+ return count >= cluster.getHddsDatanodes().size();
+ }, 100, 5000); // check every 100ms, timeout after 5s
+ }
+
@Test
public void testDatanodeDiskBalancerStatus() throws IOException {
// TODO: Test status command with datanodes in balancing
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]