This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new 21dcb9c140 HDDS-13291. [DiskBalancer] Add Performance Test for 
VolumeChoosingPolicy in DiskBalancer (#8661)
21dcb9c140 is described below

commit 21dcb9c1400d4141e7f57033914d3bd86d3e3bbc
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Wed Jul 9 13:45:51 2025 +0530

    HDDS-13291. [DiskBalancer] Add Performance Test for VolumeChoosingPolicy in 
DiskBalancer (#8661)
---
 .../ozone/scm/node/TestVolumeChoosingPolicy.java   | 254 +++++++++++++++++++++
 1 file changed, 254 insertions(+)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
new file mode 100644
index 0000000000..9f07588c20
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsagePersistence;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeChoosingPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * This class tests the VolumeChoosingPolicy.
+ */
+public class TestVolumeChoosingPolicy {
+
+  private static final int NUM_VOLUMES = 20;
+  private static final int NUM_THREADS = 10;
+  private static final int NUM_ITERATIONS = 10000;
+  private static final double THRESHOLD = 0.1; // 10% threshold
+
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+  @TempDir
+  private Path baseDir;
+
+  private MutableVolumeSet volumeSet;
+  private List<HddsVolume> hddsVolumes;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private ExecutorService executor;
+
+  // delta sizes for source volumes
+  private Map<HddsVolume, Long> deltaSizes = new ConcurrentHashMap<>();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    hddsVolumes = new ArrayList<>();
+    createVolumes();
+    volumeChoosingPolicy = new DefaultVolumeChoosingPolicy();
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    hddsVolumes.forEach(HddsVolume::shutdown);
+
+    // Shutdown executor service
+    if (executor != null && !executor.isShutdown()) {
+      executor.shutdownNow();
+    }
+
+    // Clear in-progress delta sizes
+    if (deltaSizes != null) {
+      deltaSizes.clear();
+    }
+
+    // Clear VolumeSet
+    if (volumeSet != null) {
+      volumeSet.shutdown();
+      volumeSet = null;
+    }
+  }
+
+  @Test
+  @Timeout(300)
+  public void testConcurrentVolumeChoosingPerformance() throws Exception {
+    testPolicyPerformance("DefaultVolumeChoosingPolicy", volumeChoosingPolicy);
+  }
+
+  /**
+   * pairChosenCount: Number of successful volume pair choices from the policy.
+   * FailureCount: Failures due to any exceptions thrown during volume choice 
or null return.
+   */
+  private void testPolicyPerformance(String policyName, VolumeChoosingPolicy 
policy) throws Exception {
+    CountDownLatch latch = new CountDownLatch(NUM_THREADS);
+    AtomicInteger pairChosenCount = new AtomicInteger(0);
+    AtomicInteger pairNotChosenCount = new AtomicInteger(0);
+    AtomicInteger failureCount = new AtomicInteger(0);
+    AtomicLong totalTimeNanos = new AtomicLong(0);
+
+    Random rand = new Random();
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      executor.submit(() -> {
+        try {
+          int volumeChosen = 0;
+          int volumeNotChosen = 0;
+          int failures = 0;
+
+          for (int j = 0; j < NUM_ITERATIONS; j++) {
+            // Simulate background activity with a 5% probability in each
+            // iteration. This mimics other threads changing volume state
+            // (e.g., reserving space on a destination) while the policy is
+            // being evaluated, testing its behavior in a dynamic environment.
+            if (rand.nextDouble() < 0.05 && hddsVolumes.size() >= 2) {
+              HddsVolume sourceVolume = 
hddsVolumes.get(rand.nextInt(hddsVolumes.size()));
+
+              // Randomly pick a destination volume, ensuring it's different 
from the source
+              HddsVolume destVolume;
+              do {
+                destVolume = hddsVolumes.get(rand.nextInt(hddsVolumes.size()));
+              } while (sourceVolume.equals(destVolume));
+
+              long containerSize = rand.nextInt(100 * 1024 * 1024) + 1;
+
+              // Simulate deltaSizes update for the source volume(negative 
value)
+              deltaSizes.compute(sourceVolume, (k, v) -> (v == null ? 0L : v) 
- containerSize);
+
+              // Simulate committedBytes update for the destination volume 
(space reserved)
+              destVolume.incCommittedBytes(containerSize);
+            }
+
+            long threadStart = System.nanoTime();
+            try {
+              Pair<HddsVolume, HddsVolume> pair = 
policy.chooseVolume(volumeSet, THRESHOLD, deltaSizes);
+
+              if (pair == null) {
+                volumeNotChosen++;
+              } else {
+                volumeChosen++;
+                // Note: In a real DiskBalancerService, after a successful 
choice,
+                // the committed bytes on the destination and delta on the 
source
+                // would be reverted/adjusted once the move completes or fails.
+                // This simulation focuses on the state before the policy 
makes a choice,
+                // assuming some background activity. For simplicity, we are 
not
+                // reverting these simulated incCommittedBytes or deltaSizes 
updates
+                // within this loop for each "chosen" pair. The random updates
+                // provide the dynamic background.
+              }
+            } catch (Exception e) {
+              failures++;
+            } finally {
+              totalTimeNanos.addAndGet(System.nanoTime() - threadStart);
+            }
+          }
+
+          pairChosenCount.addAndGet(volumeChosen);
+          pairNotChosenCount.addAndGet(volumeNotChosen);
+          failureCount.addAndGet(failures);
+        } finally {
+          latch.countDown();
+        }
+      });
+    }
+
+    // Wait max 5 minutes for test completion
+    assertTrue(latch.await(5, TimeUnit.MINUTES), "Test timed out");
+
+    long totalOperations = (long) NUM_THREADS * NUM_ITERATIONS;
+    double avgTimePerOp = (double) totalTimeNanos.get() / totalOperations;
+    double opsPerSec = totalOperations / (totalTimeNanos.get() / 
1_000_000_000.0);
+
+    System.out.println("Performance results for " + policyName);
+    System.out.println("Total volumes: " + NUM_VOLUMES);
+    System.out.println("Total threads: " + NUM_THREADS);
+    System.out.println("Threshold(%): " + THRESHOLD * 100.0);
+    System.out.println("Total operations: " + totalOperations);
+    System.out.println("Volume Pair Chosen operations: " + 
pairChosenCount.get());
+    System.out.println("Volume Pair Not Chosen operations: " + 
pairNotChosenCount.get());
+    System.out.println("Failed operations: " + failureCount.get());
+    System.out.println("Total time (ms): " + totalTimeNanos.get() / 1_000_000);
+    System.out.println("Average time per operation (ns): " + avgTimePerOp);
+    System.out.println("Operations per second: " + opsPerSec);
+  }
+
+  private void createVolumes() throws IOException {
+    // set a dummy path for initialisation, as we will override its internal 
volumeMap.
+    String initialDnDir = 
baseDir.resolve("initialDnDir").toFile().getAbsolutePath();
+    Files.createDirectories(baseDir.resolve("initialDnDir"));
+    CONF.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, initialDnDir);
+
+    StateContext mockContext = mock(StateContext.class);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), CONF,
+        mockContext, StorageVolume.VolumeType.DATA_VOLUME, null);
+
+    // This map will replace the one inside 'volumeSet'
+    Map<String, StorageVolume> newVolumeMap = new ConcurrentHashMap<>();
+    Random random = new Random();
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < NUM_VOLUMES; i++) {
+      String volumePath = baseDir.resolve("disk" + i).toString();
+      //capacity of each volume is 1TB
+      long capacity = 1L * 1024 * 1024 * 1024 * 1024;
+
+      // Distribute available space to create some variation in usage
+      // Between 5% and 55%
+      long usedBytes = (long) (capacity * (0.05 + random.nextDouble() * 0.5));
+      long available = capacity - usedBytes;
+
+      SpaceUsageSource source = MockSpaceUsageSource.fixed(capacity, 
available);
+      SpaceUsageCheckFactory factory = MockSpaceUsageCheckFactory.of(
+          source, Duration.ZERO, SpaceUsagePersistence.None.INSTANCE);
+      HddsVolume volume = new HddsVolume.Builder(volumePath)
+          .conf(CONF)
+          .usageCheckFactory(factory)
+          .build();
+      hddsVolumes.add(volume);
+      newVolumeMap.put(volume.getStorageDir().getPath(), volume);
+    }
+
+    // Initialize the volumeSet with the new volume map
+    volumeSet.setVolumeMap(newVolumeMap);
+    System.out.println("Created " +  NUM_VOLUMES + " volumes in " +
+        (System.currentTimeMillis() - startTime) + " ms");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to