This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new a6db12ca30 HDDS-13278. [DiskBalancer] Incorrect tracking of
DiskBalancer inProgressTask and balancedByteMap initialisation (#8650)
a6db12ca30 is described below
commit a6db12ca3022a03583ac82654fee5f33dfdc5027
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Tue Jul 15 11:58:26 2025 +0530
HDDS-13278. [DiskBalancer] Incorrect tracking of DiskBalancer
inProgressTask and balancedByteMap initialisation (#8650)
---
.../diskbalancer/DiskBalancerService.java | 24 +++++++--
.../diskbalancer/TestDiskBalancerService.java | 60 ++++++++++++++++++++++
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 1 +
3 files changed, 80 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index be7f092852..52a6f74aca 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -92,7 +92,6 @@ public class DiskBalancerService extends BackgroundService {
private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
- private Map<DiskBalancerTask, Integer> inProgressTasks;
private Set<Long> inProgressContainers;
/**
@@ -151,7 +150,6 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
Preconditions.checkNotNull(diskBalancerInfoPath);
diskBalancerInfoFile = new File(diskBalancerInfoPath);
- inProgressTasks = new ConcurrentHashMap<>();
inProgressContainers = ConcurrentHashMap.newKeySet();
deltaSizes = new ConcurrentHashMap<>();
volumeSet = ozoneContainer.getVolumeSet();
@@ -377,7 +375,7 @@ public BackgroundTaskQueue getTasks() {
return queue;
}
- int availableTaskCount = parallelThread - inProgressTasks.size();
+ int availableTaskCount = parallelThread - inProgressContainers.size();
if (availableTaskCount <= 0) {
LOG.info("No available thread for disk balancer service. " +
"Current thread count is {}.", parallelThread);
@@ -394,8 +392,9 @@ public BackgroundTaskQueue getTasks() {
ContainerData toBalanceContainer = containerChoosingPolicy
.chooseContainer(ozoneContainer, sourceVolume, inProgressContainers);
if (toBalanceContainer != null) {
- queue.add(new DiskBalancerTask(toBalanceContainer, sourceVolume,
- destVolume));
+ DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer,
sourceVolume,
+ destVolume);
+ queue.add(task);
inProgressContainers.add(toBalanceContainer.getContainerID());
deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
- toBalanceContainer.getBytesUsed());
@@ -643,6 +642,21 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
return volumeChoosingPolicy;
}
+ @VisibleForTesting
+ public void setVolumeChoosingPolicy(VolumeChoosingPolicy
volumeChoosingPolicy) {
+ this.volumeChoosingPolicy = volumeChoosingPolicy;
+ }
+
+ @VisibleForTesting
+ public void setContainerChoosingPolicy(ContainerChoosingPolicy
containerChoosingPolicy) {
+ this.containerChoosingPolicy = containerChoosingPolicy;
+ }
+
+ @VisibleForTesting
+ public Set<Long> getInProgressContainers() {
+ return inProgressContainers;
+ }
+
/**
* Handle state changes for DiskBalancerService.
*/
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 10b6ede721..2795e031b6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -30,29 +31,38 @@
import java.nio.file.Path;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
+import
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy;
+import
org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -87,6 +97,7 @@ public void init() throws IOException {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
conf.set("hdds.datanode.du.factory.classname",
"org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory$HalfTera");
+ conf.setTimeDuration("hdds.datanode.disk.balancer.service.interval", 2,
TimeUnit.SECONDS);
datanodeUuid = UUID.randomUUID().toString();
volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
@@ -244,6 +255,55 @@ public void testCalculateBytesToMove(int volumeCount, int
deltaUsagePercent,
assertTrue(Math.abs(expectedBytesToMove -
svc.calculateBytesToMove(volumeSet)) <= 1);
}
+ @Test
+ public void testConcurrentTasksNotExceedThreadLimit() throws Exception {
+ LogCapturer serviceLog =
LogCapturer.captureLogs(DiskBalancerService.class);
+ int parallelThread = 3;
+
+ ContainerSet containerSet = ContainerSet.newReadOnlyContainerSet(1000);
+ ContainerMetrics metrics = ContainerMetrics.create(conf);
+ KeyValueHandler keyValueHandler =
+ new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
+ metrics, c -> {
+ }, new ContainerChecksumTreeManager(conf));
+ DiskBalancerServiceTestImpl svc =
+ getDiskBalancerService(containerSet, conf, keyValueHandler, null,
parallelThread);
+
+ // Set operational state to RUNNING
+ DiskBalancerInfo info = new DiskBalancerInfo(
+ DiskBalancerOperationalState.RUNNING, 10.0d, 100L, parallelThread,
+ false, DiskBalancerVersion.DEFAULT_VERSION);
+ svc.refresh(info);
+
+ VolumeChoosingPolicy volumePolicy = mock(VolumeChoosingPolicy.class);
+ ContainerChoosingPolicy containerPolicy =
mock(ContainerChoosingPolicy.class);
+ svc.setVolumeChoosingPolicy(volumePolicy);
+ svc.setContainerChoosingPolicy(containerPolicy);
+
+ List<StorageVolume> volumes = volumeSet.getVolumesList();
+ HddsVolume source = (HddsVolume) volumes.get(0);
+ HddsVolume dest = (HddsVolume) volumes.get(1);
+ ContainerData containerData = mock(ContainerData.class);
+
+ // Mock unique container IDs to correctly populate the Set
+ when(containerData.getContainerID()).thenAnswer(invocation ->
System.nanoTime());
+ when(containerData.getBytesUsed()).thenReturn(100L);
+
+ when(volumePolicy.chooseVolume(any(), anyDouble(),
any())).thenReturn(Pair.of(source, dest));
+ when(containerPolicy.chooseContainer(any(), any(),
any())).thenReturn(containerData);
+
+ // Test when no tasks are in progress, it should schedule up to the limit
+ BackgroundTaskQueue queue = svc.getTasks();
+ assertEquals(parallelThread, queue.size());
+ assertEquals(parallelThread, svc.getInProgressContainers().size());
+
+ // Test when in-progress tasks are at the limit, no new tasks are scheduled
+ svc.getTasks();
+ GenericTestUtils.waitFor(() -> serviceLog.getOutput().contains("No
available thread " +
+ "for disk balancer service. Current thread count is 3."),
+ 100, 5000);
+ }
+
private OzoneContainer mockDependencies(ContainerSet containerSet,
KeyValueHandler keyValueHandler, ContainerController controller) {
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 6b377da6b7..bd4cb79196 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -74,6 +74,7 @@ public DiskBalancerManager(OzoneConfiguration conf,
this.nodeManager = nodeManager;
this.useHostnames = conf.getBoolean(HDDS_DATANODE_USE_DN_HOSTNAME,
HDDS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.statusMap = new ConcurrentHashMap<>();
+ this.balancedBytesMap = new ConcurrentHashMap<>();
}
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]