This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new 2f2d7304aa HDDS-12437. [DiskBalancer] Estimate the total size pending 
to move before disk usage becomes even (#8056)
2f2d7304aa is described below

commit 2f2d7304aa35661d7ea9f7e26d45ea5ca7558222
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Mon Mar 24 13:10:09 2025 +0530

    HDDS-12437. [DiskBalancer] Estimate the total size pending to move before 
disk usage becomes even (#8056)
---
 .../container/diskbalancer/DiskBalancerInfo.java   |  6 +-
 .../diskbalancer/DiskBalancerService.java          | 41 +++++++++++-
 .../diskbalancer/TestDiskBalancerService.java      | 72 ++++++++++++++++++++++
 .../interface-client/src/main/proto/hdds.proto     |  1 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  1 +
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  | 11 ++--
 .../hadoop/hdds/scm/node/DiskBalancerStatus.java   |  8 ++-
 .../cli/datanode/DiskBalancerStatusSubcommand.java | 30 ++++++++-
 .../cli/datanode/TestDiskBalancerSubCommand.java   |  4 +-
 9 files changed, 162 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index 87269f2a68..0a296d5d9f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -34,6 +34,7 @@ public class DiskBalancerInfo {
   private DiskBalancerVersion version;
   private long successCount;
   private long failureCount;
+  private long bytesToMove;
 
   public DiskBalancerInfo(boolean shouldRun, double threshold,
       long bandwidthInMB, int parallelThread) {
@@ -50,9 +51,10 @@ public DiskBalancerInfo(boolean shouldRun, double threshold,
     this.version = version;
   }
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public DiskBalancerInfo(boolean shouldRun, double threshold,
       long bandwidthInMB, int parallelThread, DiskBalancerVersion version,
-      long successCount, long failureCount) {
+      long successCount, long failureCount, long bytesToMove) {
     this.shouldRun = shouldRun;
     this.threshold = threshold;
     this.bandwidthInMB = bandwidthInMB;
@@ -60,6 +62,7 @@ public DiskBalancerInfo(boolean shouldRun, double threshold,
     this.version = version;
     this.successCount = successCount;
     this.failureCount = failureCount;
+    this.bytesToMove = bytesToMove;
   }
 
   public DiskBalancerInfo(boolean shouldRun,
@@ -94,6 +97,7 @@ public 
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBala
     builder.setDiskBalancerConf(confProto);
     builder.setSuccessMoveCount(successCount);
     builder.setFailureMoveCount(failureCount);
+    builder.setBytesToMove(bytesToMove);
     return builder.build();
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 9897a0acdb..5bad084835 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -103,6 +103,7 @@ public class DiskBalancerService extends BackgroundService {
   private final File diskBalancerInfoFile;
 
   private DiskBalancerServiceMetrics metrics;
+  private long bytesToMove;
 
   public DiskBalancerService(OzoneContainer ozoneContainer,
       long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
@@ -351,7 +352,10 @@ public BackgroundTaskQueue getTasks() {
 
     if (queue.isEmpty()) {
       metrics.incrIdleLoopNoAvailableVolumePairCount();
+    } else {
+      bytesToMove = calculateBytesToMove(volumeSet);
     }
+
     return queue;
   }
 
@@ -505,7 +509,42 @@ private void postCall() {
 
   public DiskBalancerInfo getDiskBalancerInfo() {
     return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
-        parallelThread, version, metrics.getSuccessCount(), 
metrics.getFailureCount());
+        parallelThread, version, metrics.getSuccessCount(),
+        metrics.getFailureCount(), bytesToMove);
+  }
+
+  public long calculateBytesToMove(MutableVolumeSet inputVolumeSet) {
+    long bytesPendingToMove = 0;
+    long totalUsedSpace = 0;
+    long totalCapacity = 0;
+
+    for (HddsVolume volume : 
StorageVolumeUtil.getHddsVolumesList(inputVolumeSet.getVolumesList())) {
+      totalUsedSpace += volume.getCurrentUsage().getUsedSpace();
+      totalCapacity += volume.getCurrentUsage().getCapacity();
+    }
+
+    if (totalCapacity == 0) {
+      return 0;
+    }
+
+    double datanodeUtilization = (double) totalUsedSpace / totalCapacity;
+
+    double thresholdFraction = threshold / 100.0;
+    double upperLimit = datanodeUtilization + thresholdFraction;
+
+    // Calculate excess data in overused volumes
+    for (HddsVolume volume : 
StorageVolumeUtil.getHddsVolumesList(inputVolumeSet.getVolumesList())) {
+      long usedSpace = volume.getCurrentUsage().getUsedSpace();
+      long capacity = volume.getCurrentUsage().getCapacity();
+      double volumeUtilization = (double) usedSpace / capacity;
+
+      // Consider only volumes exceeding the upper threshold
+      if (volumeUtilization > upperLimit) {
+        long excessData = usedSpace - (long) (upperLimit * capacity);
+        bytesPendingToMove += excessData;
+      }
+    }
+    return bytesPendingToMove;
   }
 
   private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 34e1c634ab..b5eb50f69a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -28,7 +28,9 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.List;
 import java.util.UUID;
+import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -37,6 +39,7 @@
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
@@ -50,6 +53,9 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * This is a test class for DiskBalancerService.
@@ -89,6 +95,7 @@ public void init() throws IOException {
   public void cleanup() throws IOException {
     BlockUtils.shutdownCache(conf);
     FileUtils.deleteDirectory(testRoot);
+    volumeSet.shutdown();
   }
 
   @ContainerTestVersionInfo.ContainerTest
@@ -152,6 +159,10 @@ public void 
testPolicyClassInitialization(ContainerTestVersionInfo versionInfo)
   }
 
   private String generateVolumeLocation(String base, int volumeCount) {
+    if (volumeCount == 0) {
+      return "";
+    }
+
     StringBuilder sb = new StringBuilder();
     for (int i = 0; i < volumeCount; i++) {
       sb.append(base + "/vol" + i);
@@ -170,6 +181,67 @@ private DiskBalancerServiceTestImpl getDiskBalancerService(
         threadCount);
   }
 
+  public static Stream<Arguments> values() {
+    return Stream.of(
+        Arguments.arguments(0, 0, 0),
+        Arguments.arguments(1, 0, 0),
+        Arguments.arguments(1, 50, 0),
+        Arguments.arguments(2, 0, 0),
+        Arguments.arguments(2, 10, 0),
+        Arguments.arguments(2, 50, 40), // one disk is 50% above average, the 
other disk is 50% below average
+        Arguments.arguments(3, 0, 0),
+        Arguments.arguments(3, 10, 0),
+        Arguments.arguments(4, 0, 0),
+        Arguments.arguments(4, 50, 40) // two disks are 50% above average, the 
other two disks are 50% below average
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("values")
+  public void testCalculateBytesToMove(int volumeCount, int deltaUsagePercent,
+      long expectedBytesToMovePercent) throws IOException {
+    int updatedVolumeCount = volumeCount == 0 ? 1 : volumeCount;
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        generateVolumeLocation(testRoot.getAbsolutePath(), 
updatedVolumeCount));
+    volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+    if (volumeCount == 0) {
+      volumeSet.failVolume(((HddsVolume) 
volumeSet.getVolumesList().get(0)).getHddsRootDir().getAbsolutePath());
+    }
+
+    double avgUtilization = 0.5;
+    int totalOverUtilisedVolumes = 0;
+
+    List<StorageVolume> volumes = volumeSet.getVolumesList();
+    for (int i = 0; i < volumes.size(); i++) {
+      StorageVolume vol = volumes.get(i);
+      long totalCapacityPerVolume = vol.getCurrentUsage().getCapacity();
+      if (i % 2 == 0) {
+        vol.incrementUsedSpace((long) (totalCapacityPerVolume * 
(avgUtilization + deltaUsagePercent / 100.0)));
+        totalOverUtilisedVolumes++;
+      } else {
+        vol.incrementUsedSpace((long) (totalCapacityPerVolume * 
(avgUtilization - deltaUsagePercent / 100.0)));
+      }
+    }
+
+    ContainerSet containerSet = new ContainerSet(1000);
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    KeyValueHandler keyValueHandler =
+        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
+            metrics, c -> {
+        });
+    DiskBalancerServiceTestImpl svc =
+        getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1);
+
+    long totalCapacity = volumes.isEmpty() ? 0 : 
volumes.get(0).getCurrentUsage().getCapacity();
+    long expectedBytesToMove = (long) Math.ceil(
+        (totalCapacity * expectedBytesToMovePercent) / 100.0 * 
totalOverUtilisedVolumes);
+
+    // data precision loss due to double data involved in calculation
+    assertEquals(Math.abs(expectedBytesToMove - 
svc.calculateBytesToMove(volumeSet)) <= 1, true);
+  }
+
   private OzoneContainer mockDependencies(ContainerSet containerSet,
       KeyValueHandler keyValueHandler, ContainerController controller) {
     OzoneContainer ozoneContainer = mock(OzoneContainer.class);
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 27ca714e8a..74934fe938 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -563,4 +563,5 @@ message DatanodeDiskBalancerInfoProto {
     optional DiskBalancerConfigurationProto diskBalancerConf = 4;
     optional uint64 successMoveCount = 5;
     optional uint64 failureMoveCount = 6;
+    optional uint64 bytesToMove = 7;
 }
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 2f0bb37b59..487ac8a136 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -501,6 +501,7 @@ message DiskBalancerReportProto {
   optional DiskBalancerConfigurationProto diskBalancerConf = 3;
   optional uint64 successMoveCount = 4;
   optional uint64 failureMoveCount = 5;
+  optional uint64 bytesToMove = 6;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 6ca5e610d8..5567a61b03 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -267,7 +267,8 @@ private HddsProtos.DatanodeDiskBalancerInfoProto 
getInfoProto(
             .setCurrentVolumeDensitySum(volumeDensitySum)
             .setRunningStatus(status.getRunningStatus())
             .setSuccessMoveCount(status.getSuccessMoveCount())
-            .setFailureMoveCount(status.getFailureMoveCount());
+            .setFailureMoveCount(status.getFailureMoveCount())
+            .setBytesToMove(status.getBytesToMove());
     if (status.getRunningStatus() != DiskBalancerRunningStatus.UNKNOWN) {
       builder.setDiskBalancerConf(statusMap.get(dn)
           .getDiskBalancerConfiguration().toProtobufBuilder());
@@ -306,13 +307,14 @@ private double getVolumeDataDensitySumForDatanodeDetails(
 
   private DiskBalancerStatus getStatus(DatanodeDetails datanodeDetails) {
     return statusMap.computeIfAbsent(datanodeDetails,
-        dn -> new DiskBalancerStatus(DiskBalancerRunningStatus.UNKNOWN, new 
DiskBalancerConfiguration(), 0, 0));
+        dn -> new DiskBalancerStatus(DiskBalancerRunningStatus.UNKNOWN,
+            new DiskBalancerConfiguration(), 0, 0, 0));
   }
 
   @VisibleForTesting
   public void addRunningDatanode(DatanodeDetails datanodeDetails) {
     statusMap.put(datanodeDetails, new 
DiskBalancerStatus(DiskBalancerRunningStatus.RUNNING,
-        new DiskBalancerConfiguration(), 0, 0));
+        new DiskBalancerConfiguration(), 0, 0, 0));
   }
 
   public void processDiskBalancerReport(DiskBalancerReportProto reportProto,
@@ -325,9 +327,10 @@ public void 
processDiskBalancerReport(DiskBalancerReportProto reportProto,
             new DiskBalancerConfiguration();
     long successMoveCount = reportProto.getSuccessMoveCount();
     long failureMoveCount = reportProto.getFailureMoveCount();
+    long bytesToMove = reportProto.getBytesToMove();
     statusMap.put(dn, new DiskBalancerStatus(
         isRunning ? DiskBalancerRunningStatus.RUNNING : 
DiskBalancerRunningStatus.STOPPED,
-        diskBalancerConfiguration, successMoveCount, failureMoveCount));
+        diskBalancerConfiguration, successMoveCount, failureMoveCount, 
bytesToMove));
     if (reportProto.hasBalancedBytes()) {
       balancedBytesMap.put(dn, reportProto.getBalancedBytes());
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
index dc8f4836ab..0b99bfbaa3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -33,13 +33,15 @@ public class DiskBalancerStatus {
   private DiskBalancerConfiguration diskBalancerConfiguration;
   private long successMoveCount;
   private long failureMoveCount;
+  private long bytesToMove;
 
   public DiskBalancerStatus(DiskBalancerRunningStatus isRunning, 
DiskBalancerConfiguration conf,
-      long successMoveCount, long failureMoveCount) {
+      long successMoveCount, long failureMoveCount, long bytesToMove) {
     this.isRunning = isRunning;
     this.diskBalancerConfiguration = conf;
     this.successMoveCount = successMoveCount;
     this.failureMoveCount = failureMoveCount;
+    this.bytesToMove = bytesToMove;
   }
 
   public DiskBalancerRunningStatus getRunningStatus() {
@@ -57,4 +59,8 @@ public long getSuccessMoveCount() {
   public long getFailureMoveCount() {
     return failureMoveCount;
   }
+
+  public long getBytesToMove() {
+    return bytesToMove;
+  }
 }
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
index 75d3759ffd..2f14079327 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
@@ -60,7 +60,7 @@ public void execute(ScmClient scmClient) throws IOException {
   private String generateStatus(
       List<HddsProtos.DatanodeDiskBalancerInfoProto> protos) {
     StringBuilder formatBuilder = new StringBuilder("Status result:%n" +
-        "%-40s %-20s %-10s %-10s %-15s %-15s %-15s %-15s%n");
+        "%-35s %-25s %-15s %-15s %-15s %-12s %-12s %-12s %-12s %-12s%n");
 
     List<String> contentList = new ArrayList<>();
     contentList.add("Datanode");
@@ -71,11 +71,16 @@ private String generateStatus(
     contentList.add("Threads");
     contentList.add("SuccessMove");
     contentList.add("FailureMove");
+    contentList.add("EstBytesToMove(MB)");
+    contentList.add("EstTimeLeft(min)");
 
     for (HddsProtos.DatanodeDiskBalancerInfoProto proto: protos) {
-      formatBuilder.append("%-40s %-20s %-10s %-10s %-15s %-15s %-15s 
%-15s%n");
+      formatBuilder.append("%-35s %-25s %-15s %-15s %-15s %-12s %-12s %-12s 
%-12s %-12s%n");
+      long estimatedTimeLeft = calculateEstimatedTimeLeft(proto);
+      long bytesToMoveMB = proto.getBytesToMove() / (1024 * 1024);
+
       contentList.add(proto.getNode().getHostName());
-      contentList.add(String.valueOf(proto.getCurrentVolumeDensitySum()));
+      contentList.add(String.format("%.18f", 
proto.getCurrentVolumeDensitySum()));
       contentList.add(proto.getRunningStatus().name());
       contentList.add(
           String.format("%.4f", proto.getDiskBalancerConf().getThreshold()));
@@ -85,9 +90,28 @@ private String generateStatus(
           String.valueOf(proto.getDiskBalancerConf().getParallelThread()));
       contentList.add(String.valueOf(proto.getSuccessMoveCount()));
       contentList.add(String.valueOf(proto.getFailureMoveCount()));
+      contentList.add(String.valueOf(bytesToMoveMB));
+      contentList.add(estimatedTimeLeft >= 0 ? 
String.valueOf(estimatedTimeLeft) : "N/A");
     }
 
+    formatBuilder.append("%nNote: Estimated time left is calculated" +
+        " based on the estimated bytes to move and the configured disk 
bandwidth.");
+
     return String.format(formatBuilder.toString(),
         contentList.toArray(new String[0]));
   }
+
+  private long 
calculateEstimatedTimeLeft(HddsProtos.DatanodeDiskBalancerInfoProto proto) {
+    long bytesToMove = proto.getBytesToMove();
+
+    if (bytesToMove == 0) {
+      return 0;
+    }
+    long bandwidth = proto.getDiskBalancerConf().getDiskBandwidthInMB();
+
+    // Convert estimated data from bytes to MB
+    double estimatedDataPendingMB = bytesToMove / (1024.0 * 1024.0);
+    double estimatedTimeLeft = (bandwidth > 0) ? (estimatedDataPendingMB / 
bandwidth) / 60 : -1;
+    return (long) Math.ceil(estimatedTimeLeft);
+  }
 }
diff --git 
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
 
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
index 49ce4f3cdd..897eccba53 100644
--- 
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
+++ 
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
@@ -100,8 +100,8 @@ public void testDiskBalancerStatusSubcommand()
 
     statusCmd.execute(scmClient);
 
-    // 2 Headers + 10 results
-    assertEquals(12, newLineCount(outContent.toString(DEFAULT_ENCODING)));
+    // 2 Headers + 10 results + 1 note
+    assertEquals(13, newLineCount(outContent.toString(DEFAULT_ENCODING)));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to