This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new a6db12ca30 HDDS-13278. [DiskBalancer] Incorrect tracking of 
DiskBalancer inProgressTask and balancedByteMap initialisation (#8650)
a6db12ca30 is described below

commit a6db12ca3022a03583ac82654fee5f33dfdc5027
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Tue Jul 15 11:58:26 2025 +0530

    HDDS-13278. [DiskBalancer] Incorrect tracking of DiskBalancer 
inProgressTask and balancedByteMap initialisation (#8650)
---
 .../diskbalancer/DiskBalancerService.java          | 24 +++++++--
 .../diskbalancer/TestDiskBalancerService.java      | 60 ++++++++++++++++++++++
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  |  1 +
 3 files changed, 80 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index be7f092852..52a6f74aca 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -92,7 +92,6 @@ public class DiskBalancerService extends BackgroundService {
   private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
   private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
 
-  private Map<DiskBalancerTask, Integer> inProgressTasks;
   private Set<Long> inProgressContainers;
 
   /**
@@ -151,7 +150,6 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
     Preconditions.checkNotNull(diskBalancerInfoPath);
     diskBalancerInfoFile = new File(diskBalancerInfoPath);
 
-    inProgressTasks = new ConcurrentHashMap<>();
     inProgressContainers = ConcurrentHashMap.newKeySet();
     deltaSizes = new ConcurrentHashMap<>();
     volumeSet = ozoneContainer.getVolumeSet();
@@ -377,7 +375,7 @@ public BackgroundTaskQueue getTasks() {
       return queue;
     }
 
-    int availableTaskCount = parallelThread - inProgressTasks.size();
+    int availableTaskCount = parallelThread - inProgressContainers.size();
     if (availableTaskCount <= 0) {
       LOG.info("No available thread for disk balancer service. " +
           "Current thread count is {}.", parallelThread);
@@ -394,8 +392,9 @@ public BackgroundTaskQueue getTasks() {
       ContainerData toBalanceContainer = containerChoosingPolicy
           .chooseContainer(ozoneContainer, sourceVolume, inProgressContainers);
       if (toBalanceContainer != null) {
-        queue.add(new DiskBalancerTask(toBalanceContainer, sourceVolume,
-            destVolume));
+        DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, 
sourceVolume,
+            destVolume);
+        queue.add(task);
         inProgressContainers.add(toBalanceContainer.getContainerID());
         deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
             - toBalanceContainer.getBytesUsed());
@@ -643,6 +642,21 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
     return volumeChoosingPolicy;
   }
 
+  @VisibleForTesting
+  public void setVolumeChoosingPolicy(VolumeChoosingPolicy 
volumeChoosingPolicy) {
+    this.volumeChoosingPolicy = volumeChoosingPolicy;
+  }
+
+  @VisibleForTesting
+  public void setContainerChoosingPolicy(ContainerChoosingPolicy 
containerChoosingPolicy) {
+    this.containerChoosingPolicy = containerChoosingPolicy;
+  }
+
+  @VisibleForTesting
+  public Set<Long> getInProgressContainers() {
+    return inProgressContainers;
+  }
+
   /**
    * Handle state changes for DiskBalancerService.
    */
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 10b6ede721..2795e031b6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -22,6 +22,7 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyDouble;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -30,29 +31,38 @@
 import java.nio.file.Path;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -87,6 +97,7 @@ public void init() throws IOException {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
     conf.set("hdds.datanode.du.factory.classname",
         "org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory$HalfTera");
+    conf.setTimeDuration("hdds.datanode.disk.balancer.service.interval", 2, 
TimeUnit.SECONDS);
     datanodeUuid = UUID.randomUUID().toString();
     volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
@@ -244,6 +255,55 @@ public void testCalculateBytesToMove(int volumeCount, int 
deltaUsagePercent,
     assertTrue(Math.abs(expectedBytesToMove - 
svc.calculateBytesToMove(volumeSet)) <= 1);
   }
 
+  @Test
+  public void testConcurrentTasksNotExceedThreadLimit() throws Exception {
+    LogCapturer serviceLog = 
LogCapturer.captureLogs(DiskBalancerService.class);
+    int parallelThread = 3;
+
+    ContainerSet containerSet = ContainerSet.newReadOnlyContainerSet(1000);
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    KeyValueHandler keyValueHandler =
+        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
+            metrics, c -> {
+        }, new ContainerChecksumTreeManager(conf));
+    DiskBalancerServiceTestImpl svc =
+        getDiskBalancerService(containerSet, conf, keyValueHandler, null, 
parallelThread);
+
+    // Set operational state to RUNNING
+    DiskBalancerInfo info = new DiskBalancerInfo(
+        DiskBalancerOperationalState.RUNNING, 10.0d, 100L, parallelThread,
+        false, DiskBalancerVersion.DEFAULT_VERSION);
+    svc.refresh(info);
+
+    VolumeChoosingPolicy volumePolicy = mock(VolumeChoosingPolicy.class);
+    ContainerChoosingPolicy containerPolicy = 
mock(ContainerChoosingPolicy.class);
+    svc.setVolumeChoosingPolicy(volumePolicy);
+    svc.setContainerChoosingPolicy(containerPolicy);
+
+    List<StorageVolume> volumes = volumeSet.getVolumesList();
+    HddsVolume source = (HddsVolume) volumes.get(0);
+    HddsVolume dest = (HddsVolume) volumes.get(1);
+    ContainerData containerData = mock(ContainerData.class);
+
+    // Mock unique container IDs to correctly populate the Set
+    when(containerData.getContainerID()).thenAnswer(invocation -> 
System.nanoTime());
+    when(containerData.getBytesUsed()).thenReturn(100L);
+
+    when(volumePolicy.chooseVolume(any(), anyDouble(), 
any())).thenReturn(Pair.of(source, dest));
+    when(containerPolicy.chooseContainer(any(), any(), 
any())).thenReturn(containerData);
+
+    // Test when no tasks are in progress, it should schedule up to the limit
+    BackgroundTaskQueue queue = svc.getTasks();
+    assertEquals(parallelThread, queue.size());
+    assertEquals(parallelThread, svc.getInProgressContainers().size());
+
+    // Test when in-progress tasks are at the limit, no new tasks are scheduled
+    svc.getTasks();
+    GenericTestUtils.waitFor(() -> serviceLog.getOutput().contains("No 
available thread " +
+            "for disk balancer service. Current thread count is 3."),
+        100, 5000);
+  }
+
   private OzoneContainer mockDependencies(ContainerSet containerSet,
       KeyValueHandler keyValueHandler, ContainerController controller) {
     OzoneContainer ozoneContainer = mock(OzoneContainer.class);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 6b377da6b7..bd4cb79196 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -74,6 +74,7 @@ public DiskBalancerManager(OzoneConfiguration conf,
     this.nodeManager = nodeManager;
     this.useHostnames = conf.getBoolean(HDDS_DATANODE_USE_DN_HOSTNAME, 
HDDS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.statusMap = new ConcurrentHashMap<>();
+    this.balancedBytesMap = new ConcurrentHashMap<>();
   }
 
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to