This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new fd3823a931e HDDS-13586. [DiskBalancer] Refactor DiskBalancer to remove
unused methods and improve naming (#8956)
fd3823a931e is described below
commit fd3823a931e129b612c37e36c16f42564af7dae4
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Wed Aug 20 15:54:42 2025 +0530
HDDS-13586. [DiskBalancer] Refactor DiskBalancer to remove unused methods
and improve naming (#8956)
---
.../apache/hadoop/hdds/scm/client/ScmClient.java | 27 ++--
.../protocol/StorageContainerLocationProtocol.java | 27 ++--
.../scm/storage/DiskBalancerConfiguration.java | 24 ++-
.../apache/hadoop/hdds/utils/SlidingWindow.java | 179 ---------------------
.../hadoop/hdds/utils/TestSlidingWindow.java | 141 ----------------
.../container/common/interfaces/Container.java | 7 +-
.../container/diskbalancer/DiskBalancerInfo.java | 5 +-
.../diskbalancer/DiskBalancerService.java | 20 +--
.../policy/ContainerChoosingPolicy.java | 3 +-
.../policy/DefaultContainerChoosingPolicy.java | 6 +-
.../policy/DefaultVolumeChoosingPolicy.java | 29 ++--
.../container/keyvalue/KeyValueContainer.java | 35 +---
.../ozone/container/keyvalue/KeyValueHandler.java | 2 +-
.../diskbalancer/TestDiskBalancerTask.java | 57 ++++---
...inerLocationProtocolClientSideTranslatorPB.java | 69 +++++---
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 55 ++++---
...inerLocationProtocolServerSideTranslatorPB.java | 26 +--
.../hdds/scm/server/SCMClientProtocolServer.java | 17 +-
.../hdds/scm/node/TestDiskBalancerManager.java | 7 +-
hadoop-ozone/cli-admin/pom.xml | 4 +
.../hdds/scm/cli/ContainerOperationClient.java | 17 +-
.../cli/datanode/DiskBalancerCommonOptions.java | 6 +-
.../cli/datanode/DiskBalancerStartSubcommand.java | 9 +-
.../cli/datanode/DiskBalancerStatusSubcommand.java | 5 +-
.../cli/datanode/DiskBalancerUpdateSubcommand.java | 9 +-
.../scm/node/TestContainerChoosingPolicy.java | 11 +-
.../hadoop/ozone/scm/node/TestDiskBalancer.java | 37 +++--
...skBalancerDuringDecommissionAndMaintenance.java | 55 +++----
28 files changed, 298 insertions(+), 591 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 29aef020522..dec42494a33 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.client;
+import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -496,24 +497,24 @@ List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
* @throws IOException
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts,
- Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+ @Nullable List<String> hosts,
+ @Nullable HddsProtos.DiskBalancerRunningStatus runningStatus)
throws IOException;
/**
* Start DiskBalancer.
*/
List<DatanodeAdminError> startDiskBalancer(
- Optional<Double> threshold,
- Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread,
- Optional<Boolean> stopAfterDiskEven,
- Optional<List<String>> hosts) throws IOException;
+ @Nullable Double threshold,
+ @Nullable Long bandwidthInMB,
+ @Nullable Integer parallelThread,
+ @Nullable Boolean stopAfterDiskEven,
+ @Nullable List<String> hosts) throws IOException;
/**
* Stop DiskBalancer.
*/
- List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+ List<DatanodeAdminError> stopDiskBalancer(@Nullable List<String> hosts)
throws IOException;
@@ -521,9 +522,9 @@ List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
* Update DiskBalancer Configuration.
*/
List<DatanodeAdminError> updateDiskBalancerConfiguration(
- Optional<Double> threshold,
- Optional<Long> bandwidth,
- Optional<Integer> parallelThread,
- Optional<Boolean> stopAfterDiskEven,
- Optional<List<String>> hosts) throws IOException;
+ @Nullable Double threshold,
+ @Nullable Long bandwidth,
+ @Nullable Integer parallelThread,
+ @Nullable Boolean stopAfterDiskEven,
+ @Nullable List<String> hosts) throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 9996d90cb87..84eb6283340 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.protocol;
+import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -499,35 +500,35 @@ List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
* Get DiskBalancer status.
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts,
- Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus,
+ @Nullable List<String> hosts,
+ @Nullable HddsProtos.DiskBalancerRunningStatus runningStatus,
int clientVersion) throws IOException;
/**
* Start DiskBalancer.
*/
List<DatanodeAdminError> startDiskBalancer(
- Optional<Double> threshold,
- Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread,
- Optional<Boolean> stopAfterDiskEven,
- Optional<List<String>> hosts) throws IOException;
+ @Nullable Double threshold,
+ @Nullable Long bandwidthInMB,
+ @Nullable Integer parallelThread,
+ @Nullable Boolean stopAfterDiskEven,
+ @Nullable List<String> hosts) throws IOException;
/**
* Stop DiskBalancer.
*/
- List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+ List<DatanodeAdminError> stopDiskBalancer(@Nullable List<String> hosts)
throws IOException;
/**
* Update DiskBalancer Configuration.
*/
List<DatanodeAdminError> updateDiskBalancerConfiguration(
- Optional<Double> threshold,
- Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread,
- Optional<Boolean> stopAfterDiskEven,
- Optional<List<String>> hosts) throws IOException;
+ @Nullable Double threshold,
+ @Nullable Long bandwidthInMB,
+ @Nullable Integer parallelThread,
+ @Nullable Boolean stopAfterDiskEven,
+ @Nullable List<String> hosts) throws IOException;
/**
* Trigger a reconcile command to datanodes for the current container ID.
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index 6984bb94a95..3580b54c8ef 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -21,7 +21,6 @@
import jakarta.annotation.Nonnull;
import java.time.Duration;
-import java.util.Optional;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
@@ -117,13 +116,22 @@ public final class DiskBalancerConfiguration {
description = "If true, the DiskBalancer will automatically stop once
disks are balanced.")
private boolean stopAfterDiskEven = true;
- public DiskBalancerConfiguration(Optional<Double> threshold,
- Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven) {
- threshold.ifPresent(aDouble -> this.threshold = aDouble);
- bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong);
- parallelThread.ifPresent(integer -> this.parallelThread = integer);
- stopAfterDiskEven.ifPresent(bool -> this.stopAfterDiskEven = bool);
+ public DiskBalancerConfiguration(Double threshold,
+ Long bandwidthInMB,
+ Integer parallelThread,
+ Boolean stopAfterDiskEven) {
+ if (threshold != null) {
+ this.threshold = threshold;
+ }
+ if (bandwidthInMB != null) {
+ this.diskBandwidthInMB = bandwidthInMB;
+ }
+ if (parallelThread != null) {
+ this.parallelThread = parallelThread;
+ }
+ if (stopAfterDiskEven != null) {
+ this.stopAfterDiskEven = stopAfterDiskEven;
+ }
}
public DiskBalancerConfiguration() {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java
deleted file mode 100644
index 744e842a398..00000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- * A sliding window implementation that combines time-based expiry with a
- * maximum size constraint. The window tracks event timestamps and maintains
two
- * limits:
- * <ul>
- * <li>Time-based: Events older than the specified expiry duration are
- * automatically removed
- * <li>Size-based: The window maintains at most windowSize latest events,
removing
- * older events when this limit is exceeded
- * </ul>
- *
- * The window is considered full when the number of non-expired events exceeds
- * the specified window size. Events are automatically pruned based on both
- * their age and the maximum size constraint.
- */
-public class SlidingWindow {
- private final Object lock = new Object();
- private final int windowSize;
- private final Deque<Long> timestamps;
- private final long expiryDurationMillis;
- private final Clock clock;
-
- /**
- * Default constructor that uses a monotonic clock.
- *
- * @param windowSize the maximum number of events that are tracked
- * @param expiryDuration the duration after which an entry in the window
expires
- */
- public SlidingWindow(int windowSize, Duration expiryDuration) {
- this(windowSize, expiryDuration, new MonotonicClock());
- }
-
- /**
- * Constructor with a custom clock for testing.
- *
- * @param windowSize the maximum number of events that are tracked
- * @param expiryDuration the duration after which an entry in the window
expires
- * @param clock the clock to use for time measurements
- */
- public SlidingWindow(int windowSize, Duration expiryDuration, Clock clock) {
- if (windowSize < 0) {
- throw new IllegalArgumentException("Window size must be greater than 0");
- }
- if (expiryDuration.isNegative() || expiryDuration.isZero()) {
- throw new IllegalArgumentException("Expiry duration must be greater than
0");
- }
- this.windowSize = windowSize;
- this.expiryDurationMillis = expiryDuration.toMillis();
- this.clock = clock;
- // We limit the initial queue size to 100 to control the memory usage
- this.timestamps = new ArrayDeque<>(Math.min(windowSize + 1, 100));
- }
-
- public void add() {
- synchronized (lock) {
- if (isExceeded()) {
- timestamps.remove();
- }
-
- timestamps.add(getCurrentTime());
- }
- }
-
- /**
- * Checks if the sliding window has exceeded its maximum size.
- * This is useful to track if we have encountered more events than the
window's defined limit.
- * @return true if the number of tracked timestamps in the sliding window
- * exceeds the specified window size, false otherwise.
- */
- public boolean isExceeded() {
- synchronized (lock) {
- removeExpired();
- return timestamps.size() > windowSize;
- }
- }
-
- /**
- * Returns the current number of events that are tracked within the sliding
window queue.
- * The number of events can exceed the window size.
- * This method ensures that expired events are removed before computing the
count.
- *
- * @return the number of valid timestamps currently in the sliding window
- */
- @VisibleForTesting
- public int getNumEvents() {
- synchronized (lock) {
- removeExpired();
- return timestamps.size();
- }
- }
-
- /**
- * Returns the current number of events that are tracked within the sliding
window queue.
- * The number of events cannot exceed the window size.
- * This method ensures that expired events are removed before computing the
count.
- *
- * @return the number of valid timestamps currently in the sliding window
- */
- public int getNumEventsInWindow() {
- synchronized (lock) {
- removeExpired();
- return Math.min(timestamps.size(), windowSize);
- }
- }
-
- private void removeExpired() {
- synchronized (lock) {
- long currentTime = getCurrentTime();
- long expirationThreshold = currentTime - expiryDurationMillis;
-
- while (!timestamps.isEmpty() && timestamps.peek() < expirationThreshold)
{
- timestamps.remove();
- }
- }
- }
-
- public int getWindowSize() {
- return windowSize;
- }
-
- private long getCurrentTime() {
- return clock.millis();
- }
-
- /**
- * A custom monotonic clock implementation.
- * Implementation of Clock that uses System.nanoTime() for real usage.
- * See {@see org.apache.ozone.test.TestClock}
- */
- private static final class MonotonicClock extends Clock {
- @Override
- public long millis() {
- return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
- }
-
- @Override
- public java.time.Instant instant() {
- return java.time.Instant.ofEpochMilli(millis());
- }
-
- @Override
- public java.time.ZoneId getZone() {
- return java.time.ZoneOffset.UTC;
- }
-
- @Override
- public Clock withZone(java.time.ZoneId zone) {
- // Ignore zone for monotonic clock
- throw new UnsupportedOperationException("Sliding Window class does not
allow changing the timezone");
- }
- }
-}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java
deleted file mode 100644
index 369426bcfd0..00000000000
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.time.Duration;
-import org.apache.ozone.test.TestClock;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * Tests for {@link SlidingWindow} class.
- */
-class TestSlidingWindow {
-
- private TestClock testClock;
-
- @BeforeEach
- void setup() {
- testClock = TestClock.newInstance();
- }
-
- @Test
- void testConstructorValidation() {
- // Test invalid window size
- assertThrows(IllegalArgumentException.class, () -> new SlidingWindow(-1,
Duration.ofMillis(100)));
-
- // Test invalid expiry duration
- assertThrows(IllegalArgumentException.class, () -> new SlidingWindow(1,
Duration.ofMillis(0)));
- assertThrows(IllegalArgumentException.class, () -> new SlidingWindow(1,
Duration.ofMillis(-1)));
- }
-
- @Test
- void testAdd() {
- SlidingWindow slidingWindow = new SlidingWindow(3, Duration.ofSeconds(5),
testClock);
- for (int i = 0; i < slidingWindow.getWindowSize(); i++) {
- slidingWindow.add();
- assertEquals(i + 1, slidingWindow.getNumEvents());
- assertFalse(slidingWindow.isExceeded());
- }
-
- slidingWindow.add();
- assertEquals(slidingWindow.getWindowSize() + 1,
slidingWindow.getNumEvents());
- assertTrue(slidingWindow.isExceeded());
- }
-
- @Test
- void testEventExpiration() {
- SlidingWindow slidingWindow = new SlidingWindow(2, Duration.ofMillis(500),
testClock);
-
- // Add events to reach threshold
- slidingWindow.add();
- slidingWindow.add();
- slidingWindow.add();
- assertEquals(3, slidingWindow.getNumEvents());
- assertTrue(slidingWindow.isExceeded());
-
- // Fast forward time to expire events
- testClock.fastForward(600);
-
- assertEquals(0, slidingWindow.getNumEvents());
- assertFalse(slidingWindow.isExceeded());
-
- // Add one more event - should not be enough to mark as full
- slidingWindow.add();
- assertEquals(1, slidingWindow.getNumEvents());
- assertFalse(slidingWindow.isExceeded());
- }
-
- @Test
- void testPartialExpiration() {
- SlidingWindow slidingWindow = new SlidingWindow(3, Duration.ofSeconds(1),
testClock);
-
- slidingWindow.add();
- slidingWindow.add();
- slidingWindow.add();
- slidingWindow.add();
- assertEquals(4, slidingWindow.getNumEvents());
- assertTrue(slidingWindow.isExceeded());
-
- testClock.fastForward(600);
- slidingWindow.add(); // this will remove the oldest event as the window is
full
- assertEquals(4, slidingWindow.getNumEvents());
-
- // Fast forward time to expire the oldest events
- testClock.fastForward(500);
- assertEquals(1, slidingWindow.getNumEvents());
- assertFalse(slidingWindow.isExceeded());
- }
-
- @Test
- void testZeroWindowSize() {
- SlidingWindow slidingWindow = new SlidingWindow(0, Duration.ofSeconds(5),
testClock);
-
- // Verify initial state
- assertEquals(0, slidingWindow.getWindowSize());
- assertEquals(0, slidingWindow.getNumEvents());
- assertFalse(slidingWindow.isExceeded());
-
- // Add an event - with window size 0, any event should cause isExceeded to
return true
- slidingWindow.add();
- assertEquals(1, slidingWindow.getNumEvents());
- assertTrue(slidingWindow.isExceeded());
-
- // Add another event - should replace the previous one as window is
exceeded
- slidingWindow.add();
- assertEquals(1, slidingWindow.getNumEvents());
- assertTrue(slidingWindow.isExceeded());
-
- // Test expiration
- testClock.fastForward(6000); // Move past expiry time
- assertEquals(0, slidingWindow.getNumEvents());
- assertFalse(slidingWindow.isExceeded());
-
- // Add multiple events in sequence - should always keep only the latest one
- for (int i = 0; i < 5; i++) {
- slidingWindow.add();
- assertEquals(1, slidingWindow.getNumEvents());
- assertTrue(slidingWindow.isExceeded());
- }
- }
-}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 9c110bded02..d7f71e6e3b7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -144,11 +144,6 @@ void updateDataScanTimestamp(Instant timestamp)
void importContainerData(InputStream stream,
ContainerPacker<CONTAINERDATA> packer) throws IOException;
- /**
- * Import the container from a container path.
- */
- void importContainerData(Path containerPath) throws IOException;
-
/**
* Export all the data of the container to one output archive with the help
* of the packer.
@@ -204,7 +199,7 @@ DataScanResult scanData(DataTransferThrottler throttler,
Canceler canceler)
/**
* Copy all the data of the container to the destination path.
*/
- void copyContainerData(Path destPath) throws IOException;
+ void copyContainerDirectory(Path destPath) throws IOException;
/** Acquire read lock. */
void readLock();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index eaad116dcf3..579d3db23f2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.diskbalancer;
import java.util.Objects;
-import java.util.Optional;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
@@ -101,8 +100,8 @@ public void updateFromConf(DiskBalancerConfiguration
diskBalancerConf) {
}
public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto
toDiskBalancerReportProto() {
- DiskBalancerConfiguration conf = new
DiskBalancerConfiguration(Optional.of(threshold),
- Optional.of(bandwidthInMB), Optional.of(parallelThread),
Optional.of(stopAfterDiskEven));
+ DiskBalancerConfiguration conf = new DiskBalancerConfiguration(threshold,
+ bandwidthInMB, parallelThread, stopAfterDiskEven);
HddsProtos.DiskBalancerConfigurationProto confProto =
conf.toProtobufBuilder().build();
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder
builder =
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index b01217b74bc..b827d2251f6 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -20,7 +20,6 @@
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
@@ -29,6 +28,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -43,6 +43,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import org.apache.hadoop.hdds.server.ServerUtils;
@@ -60,6 +61,7 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
@@ -83,8 +85,6 @@ public class DiskBalancerService extends BackgroundService {
public static final String DISK_BALANCER_DIR = "diskBalancer";
- private static final String DISK_BALANCER_TMP_DIR = "tmp";
-
private OzoneContainer ozoneContainer;
private final ConfigurationSource conf;
@@ -102,7 +102,7 @@ public class DiskBalancerService extends BackgroundService {
private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
- private Set<Long> inProgressContainers;
+ private Set<ContainerID> inProgressContainers;
private static FaultInjector injector;
/**
@@ -159,7 +159,7 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
this.conf = conf;
String diskBalancerInfoPath = getDiskBalancerInfoPath();
- Preconditions.checkNotNull(diskBalancerInfoPath);
+ Objects.requireNonNull(diskBalancerInfoPath);
diskBalancerInfoFile = new File(diskBalancerInfoPath);
inProgressContainers = ConcurrentHashMap.newKeySet();
@@ -176,7 +176,7 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
.getContainerChoosingPolicyClass().newInstance();
} catch (Exception e) {
LOG.error("Got exception when initializing DiskBalancerService", e);
- throw new RuntimeException(e);
+ throw new IOException(e);
}
metrics = DiskBalancerServiceMetrics.create();
@@ -408,7 +408,7 @@ public BackgroundTaskQueue getTasks() {
DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer,
sourceVolume,
destVolume);
queue.add(task);
- inProgressContainers.add(toBalanceContainer.getContainerID());
+
inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
- toBalanceContainer.getBytesUsed());
} else {
@@ -620,7 +620,7 @@ public int getPriority() {
}
private void postCall(boolean success, long startTime) {
- inProgressContainers.remove(containerData.getContainerID());
+
inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
containerData.getBytesUsed());
destVolume.incCommittedBytes(0 - containerDefaultSize);
@@ -677,7 +677,7 @@ public long calculateBytesToMove(MutableVolumeSet
inputVolumeSet) {
private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
return Paths.get(hddsVolume.getVolumeRootDir())
- .resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR);
+ .resolve(StorageVolume.TMP_DIR_NAME).resolve(DISK_BALANCER_DIR);
}
public DiskBalancerServiceMetrics getMetrics() {
@@ -708,7 +708,7 @@ public void
setContainerChoosingPolicy(ContainerChoosingPolicy containerChoosing
}
@VisibleForTesting
- public Set<Long> getInProgressContainers() {
+ public Set<ContainerID> getInProgressContainers() {
return inProgressContainers;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
index c3e89ab93d1..aa80ccdf867 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.diskbalancer.policy;
import java.util.Set;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -35,5 +36,5 @@ public interface ContainerChoosingPolicy {
* @return a Container
*/
ContainerData chooseContainer(OzoneContainer ozoneContainer,
- HddsVolume volume, Set<Long> inProgressContainerIDs);
+ HddsVolume volume, Set<ContainerID> inProgressContainerIDs);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
index 76a0f30c149..bfc9d15a1da 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
@@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -48,7 +49,7 @@ public class DefaultContainerChoosingPolicy implements
ContainerChoosingPolicy {
@Override
public ContainerData chooseContainer(OzoneContainer ozoneContainer,
- HddsVolume hddsVolume, Set<Long> inProgressContainerIDs) {
+ HddsVolume hddsVolume, Set<ContainerID> inProgressContainerIDs) {
Iterator<Container<?>> itr;
try {
itr = CACHE.get().get(hddsVolume,
@@ -61,7 +62,8 @@ public ContainerData chooseContainer(OzoneContainer
ozoneContainer,
while (itr.hasNext()) {
ContainerData containerData = itr.next().getContainerData();
if (!inProgressContainerIDs.contains(
- containerData.getContainerID()) && (containerData.isClosed() ||
(test && containerData.isQuasiClosed()))) {
+ ContainerID.valueOf(containerData.getContainerID())) &&
+ (containerData.isClosed() || (test &&
containerData.isQuasiClosed()))) {
return containerData;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
index cba87740f71..20cd2aef0c4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java
@@ -22,6 +22,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -58,20 +59,26 @@ public Pair<HddsVolume, HddsVolume>
chooseVolume(MutableVolumeSet volumeSet,
List<HddsVolume> volumes = StorageVolumeUtil
.getHddsVolumesList(volumeSet.getVolumesList())
.stream()
- .filter(volume ->
- Math.abs(
- ((double)((volume.getCurrentUsage().getCapacity() -
volume.getCurrentUsage().getAvailable())
+ .filter(volume -> {
+ SpaceUsageSource usage = volume.getCurrentUsage();
+
+ return Math.abs(
+ ((double)((usage.getCapacity() - usage.getAvailable())
+ deltaMap.getOrDefault(volume, 0L) +
volume.getCommittedBytes()))
- / volume.getCurrentUsage().getCapacity() - idealUsage)
>= normalizedThreshold)
- .sorted((v1, v2) ->
- Double.compare(
- (double) ((v2.getCurrentUsage().getCapacity() -
v2.getCurrentUsage().getAvailable())
+ / usage.getCapacity() - idealUsage) >=
normalizedThreshold;
+
+ }).sorted((v1, v2) -> {
+ SpaceUsageSource usage1 = v1.getCurrentUsage();
+ SpaceUsageSource usage2 = v2.getCurrentUsage();
+
+ return Double.compare(
+ (double) ((usage2.getCapacity() - usage2.getAvailable())
+ deltaMap.getOrDefault(v2, 0L) +
v2.getCommittedBytes()) /
- v2.getCurrentUsage().getCapacity(),
- (double) ((v1.getCurrentUsage().getCapacity() -
v1.getCurrentUsage().getAvailable())
+ usage2.getCapacity(),
+ (double) ((usage1.getCapacity() - usage1.getAvailable())
+ deltaMap.getOrDefault(v1, 0L) +
v1.getCommittedBytes()) /
- v1.getCurrentUsage().getCapacity()))
- .collect(Collectors.toList());
+ usage1.getCapacity());
+ }).collect(Collectors.toList());
// Can not generate DiskBalancerTask if we have less than 2 results
if (volumes.size() <= 1) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index b3ba31da8e8..b8214882f12 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -23,7 +23,6 @@
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_DESCRIPTOR_MISSING;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_FILES_CREATE_ERROR;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_OPEN;
@@ -660,38 +659,6 @@ public void importContainerData(KeyValueContainerData
originalContainerData)
update(originalContainerData.getMetadata(), true);
}
- @Override
- public void importContainerData(Path containerPath) throws IOException {
- writeLock();
- try {
- if (!getContainerFile().exists()) {
- String errorMessage = String.format(
- "Can't load container (cid=%d) data from a specific location"
- + " as the container descriptor (%s) is missing",
- getContainerData().getContainerID(),
- getContainerFile().getAbsolutePath());
- throw new StorageContainerException(errorMessage,
- CONTAINER_DESCRIPTOR_MISSING);
- }
- KeyValueContainerData originalContainerData =
- (KeyValueContainerData) ContainerDataYaml
- .readContainerFile(getContainerFile());
-
- importContainerData(originalContainerData);
- } catch (Exception ex) {
- if (ex instanceof StorageContainerException &&
- ((StorageContainerException) ex).getResult() ==
- CONTAINER_DESCRIPTOR_MISSING) {
- throw ex;
- }
- //delete all the temporary data in case of any exception.
- cleanupFailedImport();
- throw ex;
- } finally {
- writeUnlock();
- }
- }
-
private void cleanupFailedImport() {
try {
if (containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
@@ -935,7 +902,7 @@ public DataScanResult scanData(DataTransferThrottler
throttler, Canceler cancele
}
@Override
- public void copyContainerData(Path destination) throws IOException {
+ public void copyContainerDirectory(Path destination) throws IOException {
readLock();
try {
// Closed/ Quasi closed containers are considered for replication by
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 9a2f5e48503..775c4b426f6 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1590,7 +1590,7 @@ public void closeContainer(Container container)
public void copyContainer(final Container container, Path destinationPath)
throws IOException {
final KeyValueContainer kvc = (KeyValueContainer) container;
- kvc.copyContainerData(destinationPath);
+ kvc.copyContainerDirectory(destinationPath);
}
private KeyValueContainer createNewContainer(
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index ee72c347a60..f914d5f1c78 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import org.apache.hadoop.hdds.utils.FaultInjector;
@@ -71,6 +72,7 @@
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -84,7 +86,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -117,6 +118,7 @@ public class TestDiskBalancerTask {
private static final long CONTAINER_SIZE = 1024L * 1024L; // 1 MB
private final TestFaultInjector kvFaultInjector = new TestFaultInjector();
+ private String schemaVersion;
/**
* A FaultInjector that can be configured to throw an exception on a
@@ -306,8 +308,11 @@ public void moveSuccess(State containerState) throws
IOException {
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
- @Test
- public void moveFailsAfterCopy() throws IOException, InterruptedException,
TimeoutException, ExecutionException {
+ @ContainerTestVersionInfo.ContainerTest
+ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo)
+ throws IOException, InterruptedException, TimeoutException,
ExecutionException {
+ setLayoutAndSchemaForTest(versionInfo);
+
Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
@@ -335,7 +340,7 @@ public void moveFailsAfterCopy() throws IOException,
InterruptedException, Timeo
}
return false;
}, 100, 30000);
-
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertTrue(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
serviceFaultInjector.resume();
// wait for task to be completed
@@ -351,12 +356,15 @@ public void moveFailsAfterCopy() throws IOException,
InterruptedException, Timeo
assertFalse(Files.exists(tempContainerDir), "Temp container directory
should be cleaned up");
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
-
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
- @Test
- public void moveFailsOnAtomicMove() throws IOException,
InterruptedException, TimeoutException, ExecutionException {
+ @ContainerTestVersionInfo.ContainerTest
+ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo)
+ throws IOException, InterruptedException, TimeoutException,
ExecutionException {
+ setLayoutAndSchemaForTest(versionInfo);
+
Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
@@ -394,7 +402,7 @@ public void moveFailsOnAtomicMove() throws IOException,
InterruptedException, Ti
}
return false;
}, 100, 30000);
-
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertTrue(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
serviceFaultInjector.resume();
completableFuture.get();
@@ -412,13 +420,15 @@ public void moveFailsOnAtomicMove() throws IOException,
InterruptedException, Ti
assertTrue(testfile.toFile().exists(), "testfile should not be cleaned
up");
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
-
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
- @Test
- public void moveFailsDuringInMemoryUpdate()
+ @ContainerTestVersionInfo.ContainerTest
+ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo
versionInfo)
throws IOException, InterruptedException, TimeoutException,
ExecutionException {
+ setLayoutAndSchemaForTest(versionInfo);
+
Container container = createContainer(CONTAINER_ID, sourceVolume,
State.QUASI_CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
@@ -453,7 +463,7 @@ public void moveFailsDuringInMemoryUpdate()
}
return false;
}, 100, 30000);
-
assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertTrue(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
serviceFaultInjector.resume();
// wait for task to be completed
completableFuture.get();
@@ -476,12 +486,14 @@ public void moveFailsDuringInMemoryUpdate()
"Moved container at destination should be cleaned up on failure");
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
-
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
- @Test
- public void moveFailsDuringOldContainerRemove() throws IOException {
+ @ContainerTestVersionInfo.ContainerTest
+ public void moveFailsDuringOldContainerRemove(ContainerTestVersionInfo
versionInfo) throws IOException {
+ setLayoutAndSchemaForTest(versionInfo);
+
Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
@@ -523,13 +535,15 @@ public void moveFailsDuringOldContainerRemove() throws
IOException {
assertEquals(initialDestUsed + CONTAINER_SIZE,
destVolume.getCurrentUsage().getUsedSpace());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
-
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
}
- @Test
- public void testDestVolumeCommittedSpaceReleased() throws IOException {
+ @ContainerTestVersionInfo.ContainerTest
+ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo
versionInfo) throws IOException {
+ setLayoutAndSchemaForTest(versionInfo);
+
createContainer(CONTAINER_ID, sourceVolume, State.CLOSED);
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
@@ -558,7 +572,7 @@ public void testDestVolumeCommittedSpaceReleased() throws
IOException {
assertEquals(0, destVolume.getCommittedBytes() - initialDestCommitted);
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
-
assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID));
+
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
@@ -585,4 +599,9 @@ private KeyValueContainer createContainer(long containerId,
HddsVolume vol, Stat
private DiskBalancerService.DiskBalancerTask getTask() {
return (DiskBalancerService.DiskBalancerTask)
diskBalancerService.getTasks().poll();
}
+
+ private void setLayoutAndSchemaForTest(ContainerTestVersionInfo versionInfo)
{
+ this.schemaVersion = versionInfo.getSchemaVersion();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 4c123c233e0..a6658e4c5e9 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -1213,14 +1214,18 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts,
- Optional<HddsProtos.DiskBalancerRunningStatus> status,
+ @Nullable List<String> hosts,
+ @Nullable HddsProtos.DiskBalancerRunningStatus status,
int clientVersion) throws IOException {
DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
DatanodeDiskBalancerInfoRequestProto.newBuilder()
.setInfoType(DatanodeDiskBalancerInfoType.status);
- hosts.ifPresent(requestBuilder::addAllHosts);
- status.ifPresent(requestBuilder::setStatus);
+ if (hosts != null && !hosts.isEmpty()) {
+ requestBuilder.addAllHosts(hosts);
+ }
+ if (status != null) {
+ requestBuilder.setStatus(status);
+ }
DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
DatanodeDiskBalancerInfoResponseProto response =
@@ -1232,22 +1237,32 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
}
@Override
- public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
- Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
- Optional<Boolean> stopAfterDiskEven, Optional<List<String>> hosts)
+ public List<DatanodeAdminError> startDiskBalancer(Double threshold,
+ @Nullable Long bandwidthInMB, @Nullable Integer parallelThread,
+ @Nullable Boolean stopAfterDiskEven, @Nullable List<String> hosts)
throws IOException {
HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
HddsProtos.DiskBalancerConfigurationProto.newBuilder();
- threshold.ifPresent(confBuilder::setThreshold);
- bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
- parallelThread.ifPresent(confBuilder::setParallelThread);
- stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven);
+ if (threshold != null) {
+ confBuilder.setThreshold(threshold);
+ }
+ if (bandwidthInMB != null) {
+ confBuilder.setDiskBandwidthInMB(bandwidthInMB);
+ }
+ if (parallelThread != null) {
+ confBuilder.setParallelThread(parallelThread);
+ }
+ if (stopAfterDiskEven != null) {
+ confBuilder.setStopAfterDiskEven(stopAfterDiskEven);
+ }
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
.setOpType(HddsProtos.DiskBalancerOpType.START)
.setConf(confBuilder);
- hosts.ifPresent(requestBuilder::addAllHosts);
+ if (hosts != null && !hosts.isEmpty()) {
+ requestBuilder.addAllHosts(hosts);
+ }
DatanodeDiskBalancerOpResponseProto response =
submitRequest(Type.DatanodeDiskBalancerOp,
@@ -1263,12 +1278,14 @@ public List<DatanodeAdminError>
startDiskBalancer(Optional<Double> threshold,
}
@Override
- public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>>
hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(@Nullable List<String>
hosts)
throws IOException {
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
.setOpType(HddsProtos.DiskBalancerOpType.STOP);
- hosts.ifPresent(requestBuilder::addAllHosts);
+ if (hosts != null && !hosts.isEmpty()) {
+ requestBuilder.addAllHosts(hosts);
+ }
DatanodeDiskBalancerOpResponseProto response =
submitRequest(Type.DatanodeDiskBalancerOp,
@@ -1285,21 +1302,31 @@ public List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
@Override
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
- Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
+ @Nullable Double threshold, @Nullable Long bandwidthInMB,
+ @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven,
@Nullable List<String> hosts)
throws IOException {
HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
HddsProtos.DiskBalancerConfigurationProto.newBuilder();
- threshold.ifPresent(confBuilder::setThreshold);
- bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
- parallelThread.ifPresent(confBuilder::setParallelThread);
- stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven);
+ if (threshold != null) {
+ confBuilder.setThreshold(threshold);
+ }
+ if (bandwidthInMB != null) {
+ confBuilder.setDiskBandwidthInMB(bandwidthInMB);
+ }
+ if (parallelThread != null) {
+ confBuilder.setParallelThread(parallelThread);
+ }
+ if (stopAfterDiskEven != null) {
+ confBuilder.setStopAfterDiskEven(stopAfterDiskEven);
+ }
DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
DatanodeDiskBalancerOpRequestProto.newBuilder()
.setOpType(HddsProtos.DiskBalancerOpType.UPDATE)
.setConf(confBuilder);
- hosts.ifPresent(requestBuilder::addAllHosts);
+ if (hosts != null && !hosts.isEmpty()) {
+ requestBuilder.addAllHosts(hosts);
+ }
DatanodeDiskBalancerOpResponseProto response =
submitRequest(Type.DatanodeDiskBalancerOp,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index bd4cb791965..5e66d081a25 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -27,7 +27,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -103,12 +102,12 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
* If hosts is null, return status of all datanodes in balancing.
*/
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts,
- Optional<HddsProtos.DiskBalancerRunningStatus> status,
+ List<String> hosts,
+ HddsProtos.DiskBalancerRunningStatus status,
int clientVersion) throws IOException {
List<DatanodeDetails> filterDns = null;
- if (hosts.isPresent() && !hosts.get().isEmpty()) {
- filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ if (hosts != null && !hosts.isEmpty()) {
+ filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts,
useHostnames).stream()
.filter(dn -> {
try {
@@ -127,7 +126,7 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
}
// Filter Running Status by default
- HddsProtos.DiskBalancerRunningStatus filterStatus = status.orElse(null);
+ HddsProtos.DiskBalancerRunningStatus filterStatus = status;
if (filterDns != null) {
return filterDns.stream()
@@ -154,12 +153,12 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
* @throws IOException
*/
public List<DatanodeAdminError> startDiskBalancer(
- Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
- Optional<List<String>> hosts) throws IOException {
+ Double threshold, Long bandwidthInMB,
+ Integer parallelThread, Boolean stopAfterDiskEven,
+ List<String> hosts) throws IOException {
List<DatanodeDetails> dns;
- if (hosts.isPresent()) {
- dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ if (hosts != null && !hosts.isEmpty()) {
+ dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts,
useHostnames);
} else {
dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
@@ -193,11 +192,11 @@ public List<DatanodeAdminError> startDiskBalancer(
* If hosts is not specified, send commands to all datanodes.
* @param hosts Datanodes that command will apply on
* */
- public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>>
hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(List<String> hosts)
throws IOException {
List<DatanodeDetails> dns;
- if (hosts.isPresent()) {
- dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ if (hosts != null && !hosts.isEmpty()) {
+ dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts,
useHostnames);
} else {
dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
@@ -227,12 +226,12 @@ public List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
* @throws IOException
*/
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
- Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
+ Double threshold, Long bandwidthInMB,
+ Integer parallelThread, Boolean stopAfterDiskEven, List<String> hosts)
throws IOException {
List<DatanodeDetails> dns;
- if (hosts.isPresent()) {
- dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ if (hosts != null && !hosts.isEmpty()) {
+ dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts,
useHostnames);
} else {
dns = nodeManager.getNodes(NodeStatus.inServiceHealthy());
@@ -360,15 +359,23 @@ public void markStatusUnknown(DatanodeDetails dn) {
}
private DiskBalancerConfiguration attachDiskBalancerConf(
- DatanodeDetails dn, Optional<Double> threshold,
- Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven) {
+ DatanodeDetails dn, Double threshold,
+ Long bandwidthInMB, Integer parallelThread, Boolean stopAfterDiskEven) {
DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ?
statusMap.get(dn).getDiskBalancerConfiguration() :
new DiskBalancerConfiguration();
- threshold.ifPresent(baseConf::setThreshold);
- bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB);
- parallelThread.ifPresent(baseConf::setParallelThread);
- stopAfterDiskEven.ifPresent(baseConf::setStopAfterDiskEven);
+ if (threshold != null) {
+ baseConf.setThreshold(threshold);
+ }
+ if (bandwidthInMB != null) {
+ baseConf.setDiskBandwidthInMB(bandwidthInMB);
+ }
+ if (parallelThread != null) {
+ baseConf.setParallelThread(parallelThread);
+ }
+ if (stopAfterDiskEven != null) {
+ baseConf.setStopAfterDiskEven(stopAfterDiskEven);
+ }
return baseConf;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index cfa64a3253f..c7f03636775 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1384,10 +1384,10 @@ public DatanodeDiskBalancerInfoResponseProto
getDatanodeDiskBalancerInfo(
break;
case status:
infoProtoList = impl.getDiskBalancerStatus(
- Optional.of(request.getHostsList()),
+ request.getHostsList().isEmpty() ? null : request.getHostsList(),
// If an optional proto enum field is not set, it will return the
first
// enum value. So, we need to check if the field is set.
- request.hasStatus() ? Optional.of(request.getStatus()) :
Optional.empty(),
+ request.hasStatus() ? request.getStatus() : null,
clientVersion);
break;
default:
@@ -1416,24 +1416,24 @@ public DatanodeDiskBalancerOpResponseProto
getDatanodeDiskBalancerOp(
switch (request.getOpType()) {
case START:
errors = impl.startDiskBalancer(
- conf.hasThreshold() ? Optional.of(conf.getThreshold()) :
Optional.empty(),
- conf.hasDiskBandwidthInMB() ?
Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(),
- conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) :
Optional.empty(),
- conf.hasStopAfterDiskEven() ?
Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(),
- request.getHostsList().isEmpty() ? Optional.empty() :
Optional.of(request.getHostsList()));
+ conf.hasThreshold() ? conf.getThreshold() : null,
+ conf.hasDiskBandwidthInMB() ? conf.getDiskBandwidthInMB() : null,
+ conf.hasParallelThread() ? conf.getParallelThread() : null,
+ conf.hasStopAfterDiskEven() ? conf.getStopAfterDiskEven() : null,
+ request.getHostsList().isEmpty() ? null : request.getHostsList());
break;
case UPDATE:
errors = impl.updateDiskBalancerConfiguration(
- conf.hasThreshold() ? Optional.of(conf.getThreshold()) :
Optional.empty(),
- conf.hasDiskBandwidthInMB() ?
Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(),
- conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) :
Optional.empty(),
- conf.hasStopAfterDiskEven() ?
Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(),
- request.getHostsList().isEmpty() ? Optional.empty() :
Optional.of(request.getHostsList()));
+ conf.hasThreshold() ? conf.getThreshold() : null,
+ conf.hasDiskBandwidthInMB() ? conf.getDiskBandwidthInMB() : null,
+ conf.hasParallelThread() ? conf.getParallelThread() : null,
+ conf.hasStopAfterDiskEven() ? conf.getStopAfterDiskEven() : null,
+ request.getHostsList().isEmpty() ? null : request.getHostsList());
break;
case STOP:
errors = impl.stopDiskBalancer(
- request.getHostsList().isEmpty() ? Optional.empty() :
Optional.of(request.getHostsList()));
+ request.getHostsList().isEmpty() ? null : request.getHostsList());
break;
default:
errors = new ArrayList<>();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 24eaa586b31..c5efbe1bb8b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -35,6 +35,7 @@
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
+import jakarta.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
@@ -1530,8 +1531,8 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts,
- Optional<HddsProtos.DiskBalancerRunningStatus> status,
+ @Nullable List<String> hosts,
+ @Nullable HddsProtos.DiskBalancerRunningStatus status,
int clientVersion) throws IOException {
checkDiskBalancerEnabled();
return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status,
@@ -1539,9 +1540,9 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
}
@Override
- public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
- Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
- Optional<Boolean> stopAfterDiskEven, Optional<List<String>> hosts)
+ public List<DatanodeAdminError> startDiskBalancer(@Nullable Double threshold,
+ @Nullable Long bandwidthInMB, Integer parallelThread,
+ @Nullable Boolean stopAfterDiskEven, List<String> hosts)
throws IOException {
checkDiskBalancerEnabled();
@@ -1557,7 +1558,7 @@ public List<DatanodeAdminError>
startDiskBalancer(Optional<Double> threshold,
}
@Override
- public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>>
hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(@Nullable List<String>
hosts)
throws IOException {
checkDiskBalancerEnabled();
@@ -1572,8 +1573,8 @@ public List<DatanodeAdminError>
stopDiskBalancer(Optional<List<String>> hosts)
@Override
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
- Optional<Double> threshold, Optional<Long> bandwidthInMB,
- Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
+ @Nullable Double threshold, @Nullable Long bandwidthInMB,
+ @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven,
@Nullable List<String> hosts)
throws IOException {
checkDiskBalancerEnabled();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
index e3aaf0d87b0..b696ab6bd63 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -88,8 +87,7 @@ public void testDatanodeDiskBalancerStatus() throws
IOException {
Collectors.toList());
List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
- diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(dns, null,
ClientVersion.CURRENT_VERSION);
Assertions.assertEquals(3, statusProtoList.size());
@@ -100,8 +98,7 @@ public void testDatanodeDiskBalancerStatus() throws
IOException {
Collectors.toList());
statusProtoList =
- diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(dns, null,
ClientVersion.CURRENT_VERSION);
Assertions.assertEquals(1, statusProtoList.size());
diff --git a/hadoop-ozone/cli-admin/pom.xml b/hadoop-ozone/cli-admin/pom.xml
index 7357cdeb3bf..e10864206e8 100644
--- a/hadoop-ozone/cli-admin/pom.xml
+++ b/hadoop-ozone/cli-admin/pom.xml
@@ -60,6 +60,10 @@
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>
+ <dependency>
+ <groupId>jakarta.annotation</groupId>
+ <artifactId>jakarta.annotation-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 55044e3dfec..cfca72a6711 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -23,6 +23,7 @@
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import com.google.common.base.Preconditions;
+import jakarta.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -611,23 +612,23 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerReport(
}
@Override
- public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
- Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven,
- Optional<List<String>> hosts) throws IOException {
+ public List<DatanodeAdminError> startDiskBalancer(@Nullable Double threshold,
+ @Nullable Long bandwidthInMB, @Nullable Integer parallelThread,
@Nullable Boolean stopAfterDiskEven,
+ @Nullable List<String> hosts) throws IOException {
return storageContainerLocationClient.startDiskBalancer(threshold,
bandwidthInMB, parallelThread, stopAfterDiskEven, hosts);
}
@Override
- public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>>
hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(@Nullable List<String>
hosts)
throws IOException {
return storageContainerLocationClient.stopDiskBalancer(hosts);
}
@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts,
- Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+ @Nullable List<String> hosts,
+ @Nullable HddsProtos.DiskBalancerRunningStatus runningStatus)
throws IOException {
return storageContainerLocationClient.getDiskBalancerStatus(hosts,
runningStatus, ClientVersion.CURRENT_VERSION);
@@ -635,8 +636,8 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto>
getDiskBalancerStatus(
@Override
public List<DatanodeAdminError> updateDiskBalancerConfiguration(
- Optional<Double> threshold, Optional<Long> bandwidth,
- Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts)
+ @Nullable Double threshold, @Nullable Long bandwidth,
+ @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven,
@Nullable List<String> hosts)
throws IOException {
return storageContainerLocationClient.updateDiskBalancerConfiguration(
threshold, bandwidth, parallelThread, stopAfterDiskEven, hosts);
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java
index d9cbf22f432..2abac224967 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java
@@ -19,7 +19,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import picocli.CommandLine;
/**
@@ -60,9 +59,8 @@ public String getHostString() {
return isAllHosts() ? "All datanodes" : String.join("\n", getDatanodes());
}
- public Optional<List<String>> getSpecifiedDatanodes() {
- return getDatanodes().isEmpty() ?
- Optional.empty() : Optional.of(getDatanodes());
+ public List<String> getSpecifiedDatanodes() {
+ return getDatanodes().isEmpty() ? null : getDatanodes();
}
public boolean isAllHosts() {
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
index c37f5601986..49d1fe01050 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
@@ -43,19 +42,19 @@ public class DiskBalancerStartSubcommand extends
ScmSubcommand {
description = "Percentage deviation from average utilization of " +
"the disks after which a datanode will be rebalanced (for " +
"example, '10' for 10%%).")
- private Optional<Double> threshold;
+ private Double threshold;
@Option(names = {"-b", "--bandwidthInMB"},
description = "Maximum bandwidth for DiskBalancer per second.")
- private Optional<Long> bandwidthInMB;
+ private Long bandwidthInMB;
@Option(names = {"-p", "--parallelThread"},
description = "Max parallelThread for DiskBalancer.")
- private Optional<Integer> parallelThread;
+ private Integer parallelThread;
@Option(names = {"-s", "--stop-after-disk-even"},
description = "Stop DiskBalancer automatically after disk utilization is
even.")
- private Optional<Boolean> stopAfterDiskEven;
+ private Boolean stopAfterDiskEven;
@CommandLine.Mixin
private DiskBalancerCommonOptions commonOptions =
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
index d8d0113eb13..34a7abbadf6 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
@@ -51,8 +50,8 @@ public class DiskBalancerStatusSubcommand extends
ScmSubcommand {
public void execute(ScmClient scmClient) throws IOException {
List<HddsProtos.DatanodeDiskBalancerInfoProto> resultProto =
scmClient.getDiskBalancerStatus(
- hosts.isEmpty() ? Optional.empty() : Optional.of(hosts),
- state == null ? Optional.empty() : Optional.of(state));
+ hosts.isEmpty() ? null : hosts,
+ state);
System.out.println(generateStatus(resultProto));
}
diff --git
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
index 6f1b7ea0d1d..b6d2dcc4510 100644
---
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
+++
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
@@ -43,19 +42,19 @@ public class DiskBalancerUpdateSubcommand extends
ScmSubcommand {
description = "Percentage deviation from average utilization of " +
"the disks after which a datanode will be rebalanced (for " +
"example, '10' for 10%%).")
- private Optional<Double> threshold;
+ private Double threshold;
@Option(names = {"-b", "--bandwidthInMB"},
description = "Maximum bandwidth for DiskBalancer per second.")
- private Optional<Long> bandwidthInMB;
+ private Long bandwidthInMB;
@Option(names = {"-p", "--parallelThread"},
description = "Max parallelThread for DiskBalancer.")
- private Optional<Integer> parallelThread;
+ private Integer parallelThread;
@Option(names = {"-s", "--stop-after-disk-even"},
description = "Stop DiskBalancer automatically after disk utilization is
even.")
- private Optional<Boolean> stopAfterDiskEven;
+ private Boolean stopAfterDiskEven;
@CommandLine.Mixin
private DiskBalancerCommonOptions commonOptions =
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
index 68b5aa4e940..2869e8de90e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.fs.SpaceUsagePersistence;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataScanOrder;
@@ -90,7 +91,7 @@ public class TestContainerChoosingPolicy {
private ContainerController containerController;
// Simulate containers currently being balanced (in progress)
- private Set<Long> inProgressContainerIDs = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> inProgressContainerIDs =
ConcurrentHashMap.newKeySet();
@BeforeEach
public void setup() throws Exception {
@@ -139,7 +140,7 @@ public void testContainerDeletionAfterIteratorGeneration()
throws Exception {
ContainerData container =
containerChoosingPolicy.chooseContainer(ozoneContainer, volume,
inProgressContainerIDs);
assertEquals(containerList.get(0).getContainerData().getContainerID(),
container.getContainerID());
ozoneContainer.getContainerSet().removeContainer(containerList.get(1).getContainerData().getContainerID());
- inProgressContainerIDs.add(container.getContainerID());
+
inProgressContainerIDs.add(ContainerID.valueOf(container.getContainerID()));
container = containerChoosingPolicy.chooseContainer(ozoneContainer,
volume, inProgressContainerIDs);
assertEquals(containerList.get(1).getContainerData().getContainerID(),
container.getContainerID());
}
@@ -175,7 +176,7 @@ private void testPolicyPerformance(String policyName,
ContainerChoosingPolicy po
} else {
containerChosen++;
if (inProgressContainerIDs.size() < MAX_IN_PROGRESS) {
- inProgressContainerIDs.add(c.getContainerID());
+
inProgressContainerIDs.add(ContainerID.valueOf(c.getContainerID()));
}
}
} catch (Exception e) {
@@ -231,7 +232,7 @@ public void createVolumes() throws IOException {
}
public void createContainers() {
- List<Long> closedContainerIDs = new ArrayList<>();
+ List<ContainerID> closedContainerIDs = new ArrayList<>();
Random random = new Random();
long startTime = System.currentTimeMillis();
@@ -256,7 +257,7 @@ public void createContainers() {
// Collect IDs of closed containers
if (!isOpen) {
- closedContainerIDs.add((long) i);
+ closedContainerIDs.add(ContainerID.valueOf((long) i));
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
index cde2a7ce2ac..5d39d260ead 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -28,7 +28,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -113,11 +112,11 @@ public void testDiskBalancerStopAfterEven() throws
IOException,
// Start DiskBalancer on all datanodes
diskBalancerManager.startDiskBalancer(
- Optional.of(10.0), // threshold
- Optional.of(10L), // bandwidth in MB
- Optional.of(5), // parallel threads
- Optional.of(true), // stopAfterDiskEven
- Optional.empty()); // apply to all datanodes
+ 10.0, // threshold
+ 10L, // bandwidth in MB
+ 5, // parallel threads
+ true, // stopAfterDiskEven
+ null); // apply to all datanodes
// verify logs for all DNs has started
String logs = logCapturer.getOutput();
@@ -144,16 +143,16 @@ public void testDatanodeDiskBalancerStatus() throws
IOException, InterruptedExce
DatanodeDetails toDecommission = dns.get(0).getDatanodeDetails();
diskBalancerManager.startDiskBalancer(
- Optional.of(10.0), // threshold
- Optional.of(10L), // bandwidth in MB
- Optional.of(5), // parallel threads
- Optional.of(true), // stopAfterDiskEven
- Optional.empty());
+ 10.0, // threshold
+ 10L, // bandwidth in MB
+ 5, // parallel threads
+ true, // stopAfterDiskEven
+ null);
//all DNs IN_SERVICE, so disk balancer status for all should be present
List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
- diskBalancerManager.getDiskBalancerStatus(Optional.empty(),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(null,
+ null,
ClientVersion.CURRENT_VERSION);
assertEquals(3, statusProtoList.size());
@@ -165,15 +164,15 @@ public void testDatanodeDiskBalancerStatus() throws
IOException, InterruptedExce
waitForDnToReachOpState(nm, toDecommission, DECOMMISSIONING);
//one DN is in DECOMMISSIONING state, so disk balancer status for it
should not be present
- statusProtoList =
diskBalancerManager.getDiskBalancerStatus(Optional.empty(),
- Optional.empty(),
+ statusProtoList = diskBalancerManager.getDiskBalancerStatus(null,
+ null,
ClientVersion.CURRENT_VERSION);
assertEquals(2, statusProtoList.size());
// Check status for the decommissioned DN should not be present
statusProtoList = diskBalancerManager.getDiskBalancerStatus(
-
Optional.of(Collections.singletonList(getDNHostAndPort(toDecommission))),
- Optional.empty(),
+ Collections.singletonList(getDNHostAndPort(toDecommission)),
+ null,
ClientVersion.CURRENT_VERSION);
assertEquals(0, statusProtoList.size());
@@ -183,8 +182,8 @@ public void testDatanodeDiskBalancerStatus() throws
IOException, InterruptedExce
// Check status for the recommissioned DN should now be present
statusProtoList = diskBalancerManager.getDiskBalancerStatus(
-
Optional.of(Collections.singletonList(getDNHostAndPort(toDecommission))),
- Optional.empty(),
+ Collections.singletonList(getDNHostAndPort(toDecommission)),
+ null,
ClientVersion.CURRENT_VERSION);
assertEquals(1, statusProtoList.size());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
index b33b119ce80..81490a9b3ce 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
@@ -30,7 +30,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -103,7 +102,7 @@ public static void cleanup() throws Exception {
@AfterEach
public void stopDiskBalancer() throws IOException, InterruptedException,
TimeoutException {
// Stop disk balancer after each test
- diskBalancerManager.stopDiskBalancer(Optional.empty());
+ diskBalancerManager.stopDiskBalancer(null);
// Verify that all DNs have stopped DiskBalancerService
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
GenericTestUtils.waitFor(() -> {
@@ -124,11 +123,11 @@ public void
testDiskBalancerWithDecommissionAndMaintenanceNodes()
// Start disk balancer on all DNs
diskBalancerManager.startDiskBalancer(
- Optional.of(10.0),
- Optional.of(10L),
- Optional.of(5),
- Optional.of(false),
- Optional.empty());
+ 10.0,
+ 10L,
+ 5,
+ false,
+ null);
NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
@@ -149,8 +148,8 @@ public void
testDiskBalancerWithDecommissionAndMaintenanceNodes()
//get diskBalancer status
List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
- diskBalancerManager.getDiskBalancerStatus(Optional.empty(),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(null,
+ null,
ClientVersion.CURRENT_VERSION);
// Verify that decommissioning and maintenance DN is not
@@ -193,8 +192,8 @@ public void
testDiskBalancerWithDecommissionAndMaintenanceNodes()
// Verify that recommissioned DN is included in DiskBalancer report and
status
reportProtoList = diskBalancerManager.getDiskBalancerReport(5,
ClientVersion.CURRENT_VERSION);
- statusProtoList =
diskBalancerManager.getDiskBalancerStatus(Optional.empty(),
- Optional.empty(),
+ statusProtoList = diskBalancerManager.getDiskBalancerStatus(null,
+ null,
ClientVersion.CURRENT_VERSION);
boolean isRecommissionedDnInReport = reportProtoList.stream()
@@ -228,18 +227,17 @@ public void testStopDiskBalancerOnDecommissioningNode()
throws Exception {
// Start disk balancer on this specific DN
diskBalancerManager.startDiskBalancer(
- Optional.of(10.0),
- Optional.of(10L),
- Optional.of(1),
- Optional.of(false),
- Optional.of(dnAddressList));
+ 10.0,
+ 10L,
+ 1,
+ false,
+ dnAddressList);
// Verify diskBalancer is running
GenericTestUtils.waitFor(() -> {
try {
HddsProtos.DatanodeDiskBalancerInfoProto status =
-
diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(dnAddressList, null,
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
return status != null && status.getRunningStatus() ==
HddsProtos.DiskBalancerRunningStatus.RUNNING;
} catch (IOException e) {
@@ -258,7 +256,7 @@ public void testStopDiskBalancerOnDecommissioningNode()
throws Exception {
100, 5000);
// Attempt to stop disk balancer on the decommissioning DN
- diskBalancerManager.stopDiskBalancer(Optional.of(dnAddressList));
+ diskBalancerManager.stopDiskBalancer(dnAddressList);
// Verify disk balancer is now explicitly stopped (operationalState
becomes STOPPED)
final String expectedLogForStop =
@@ -272,8 +270,7 @@ public void testStopDiskBalancerOnDecommissioningNode()
throws Exception {
// Verify it does not automatically restart (since it was explicitly
stopped)
HddsProtos.DatanodeDiskBalancerInfoProto statusAfterRecommission =
- diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(dnAddressList, null,
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
assertEquals(HddsProtos.DiskBalancerRunningStatus.STOPPED,
statusAfterRecommission.getRunningStatus());
}
@@ -292,8 +289,7 @@ public void testStartDiskBalancerOnDecommissioningNode()
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
HddsProtos.DatanodeDiskBalancerInfoProto status =
-
diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(dnAddressList, null,
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
return status != null && status.getRunningStatus() ==
HddsProtos.DiskBalancerRunningStatus.STOPPED;
} catch (IOException e) {
@@ -312,11 +308,11 @@ public void testStartDiskBalancerOnDecommissioningNode()
throws Exception {
// Attempt to start disk balancer on the decommissioning DN
diskBalancerManager.startDiskBalancer(
- Optional.of(10.0),
- Optional.of(10L),
- Optional.of(1),
- Optional.of(false),
- Optional.of(dnAddressList));
+ 10.0,
+ 10L,
+ 1,
+ false,
+ dnAddressList);
// Verify disk balancer goes to PAUSED_BY_NODE_STATE
final String expectedLogForPause =
@@ -332,8 +328,7 @@ public void testStartDiskBalancerOnDecommissioningNode()
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
HddsProtos.DatanodeDiskBalancerInfoProto status =
-
diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
- Optional.empty(),
+ diskBalancerManager.getDiskBalancerStatus(dnAddressList, null,
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
return status != null && status.getRunningStatus() ==
HddsProtos.DiskBalancerRunningStatus.RUNNING;
} catch (IOException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]