This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f31c264e38 HDDS-13045. Implement Immediate Triggering of Heartbeat
when Volume Full (#8590)
f31c264e38 is described below
commit f31c264e38b72b34532a2d15488246c7d50011f5
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Fri Jun 13 17:48:08 2025 +0530
HDDS-13045. Implement Immediate Triggering of Heartbeat when Volume Full
(#8590)
---
.../ozone/container/common/impl/ContainerData.java | 7 +++
.../container/common/impl/HddsDispatcher.java | 36 +++++++++-------
.../ozone/container/common/volume/HddsVolume.java | 6 +++
.../container/common/impl/TestHddsDispatcher.java | 50 ++++++++++++++++++++--
4 files changed, 80 insertions(+), 19 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 1ef5ac23d8..ce0d17477e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -42,6 +42,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
@@ -61,6 +62,8 @@ public abstract class ContainerData {
// For now, we support only KeyValueContainer.
private final ContainerType containerType;
+ private final AtomicBoolean immediateCloseActionSent = new
AtomicBoolean(false);
+
// Unique identifier for the container
private final long containerID;
@@ -162,6 +165,10 @@ public long getContainerID() {
return containerID;
}
+ public AtomicBoolean getImmediateCloseActionSent() {
+ return immediateCloseActionSent;
+ }
+
/**
* Returns the path to base dir of the container.
* @return Path to base dir.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index c4e03e4533..6c35e30bcc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -32,6 +32,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
@@ -580,7 +581,9 @@ public void validateContainerCommand(
*/
private void sendCloseContainerActionIfNeeded(Container container) {
// We have to find a more efficient way to close a container.
- boolean isSpaceFull = isContainerFull(container) ||
isVolumeFull(container);
+ boolean isOpen = container != null && container.getContainerState() ==
State.OPEN;
+ boolean isVolumeFull = isOpen &&
isVolumeFullExcludingCommittedSpace(container);
+ boolean isSpaceFull = isVolumeFull || isContainerFull(container);
boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
if (shouldClose) {
ContainerData containerData = container.getContainerData();
@@ -591,6 +594,21 @@ private void sendCloseContainerActionIfNeeded(Container
container) {
.setContainerID(containerData.getContainerID())
.setAction(ContainerAction.Action.CLOSE).setReason(reason).build();
context.addContainerActionIfAbsent(action);
+ if (isVolumeFull) {
+ HddsVolume volume = containerData.getVolume();
+ LOG.warn("Volume [{}] is full. containerID: {}. Volume usage: [{}].",
volume, containerData.getContainerID(),
+ volume.getCurrentUsage());
+ }
+ AtomicBoolean immediateCloseActionSent =
containerData.getImmediateCloseActionSent();
+ // if an immediate heartbeat has not been triggered already, trigger it
now
+ if (immediateCloseActionSent.compareAndSet(false, true)) {
+ context.getParent().triggerHeartbeat();
+ if (isVolumeFull) {
+ // log only if volume is full
+ // don't want to log if only container is full because that is
expected to happen frequently
+ LOG.warn("Triggered immediate heartbeat because of full volume.");
+ }
+ }
}
}
@@ -608,20 +626,8 @@ private boolean isContainerFull(Container container) {
}
}
- private boolean isVolumeFull(Container container) {
- boolean isOpen = Optional.ofNullable(container)
- .map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN)
- .orElse(Boolean.FALSE);
- if (isOpen) {
- HddsVolume volume = container.getContainerData().getVolume();
- StorageLocationReport volumeReport = volume.getReport();
- boolean full = volumeReport.getUsableSpace() <= 0;
- if (full) {
- LOG.info("Container {} volume is full: {}",
container.getContainerData().getContainerID(), volumeReport);
- }
- return full;
- }
- return false;
+ private boolean isVolumeFullExcludingCommittedSpace(Container container) {
+ return container.getContainerData().getVolume().isVolumeFull();
}
private boolean isContainerUnhealthy(Container container) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index c34a443d15..ca82a61d13 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -195,6 +195,12 @@ public VolumeInfoMetrics getVolumeInfoStats() {
return volumeInfoMetrics;
}
+ public boolean isVolumeFull() {
+ SpaceUsageSource currentUsage = getCurrentUsage();
+ // if the volume is failed, this method will implicitly return true
because available space will be 0
+ return currentUsage.getAvailable() -
getFreeSpaceToSpare(currentUsage.getCapacity()) <= 0;
+ }
+
@Override
protected StorageLocationReport.Builder reportBuilder() {
StorageLocationReport.Builder builder = super.reportBuilder();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index d209adbb43..1451a14f0f 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -79,6 +79,7 @@
import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
@@ -125,6 +126,16 @@ public static void init() {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(new
OzoneConfiguration());
}
+ /**
+ * Tests that close container action is sent when a container is full. First
two containers are created. Then we
+ * write to one of them to confirm normal writes are successful. Then we
increase the used space of both containers
+ * such that they're close to full, and write to both of them
simultaneously. The expectation is that close
+ * container action should be added for both of them and two immediate
heartbeats should be sent. Next, we write
+ * again to the first container. This time the close container action should
be queued but immediate heartbeat
+ * should not be sent because of throttling. This confirms that the
throttling is per container.
+ * @param layout
+ * @throws IOException
+ */
@ContainerLayoutTestInfo.ContainerTest
public void testContainerCloseActionWhenFull(
ContainerLayoutVersion layout) throws IOException {
@@ -141,16 +152,23 @@ public void testContainerCloseActionWhenFull(
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
+ // create both containers
KeyValueContainerData containerData = new KeyValueContainerData(1L,
layout,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
dd.getUuidString());
+ KeyValueContainerData containerData2 = new KeyValueContainerData(2L,
+ layout, (long) StorageUnit.GB.toBytes(1),
UUID.randomUUID().toString(), dd.getUuidString());
Container container = new KeyValueContainer(containerData, conf);
+ Container container2 = new KeyValueContainer(containerData2, conf);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
+ container2.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
+ scmId.toString());
containerSet.addContainer(container);
+ containerSet.addContainer(container2);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
@@ -159,6 +177,7 @@ public void testContainerCloseActionWhenFull(
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, volumeChoosingPolicy, metrics,
NO_OP_ICR_SENDER));
}
+ // write successfully to first container
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics, null);
hddsDispatcher.setClusterId(scmId.toString());
@@ -168,15 +187,30 @@ public void testContainerCloseActionWhenFull(
responseOne.getResult());
verify(context, times(0))
.addContainerActionIfAbsent(any(ContainerAction.class));
+ // increment used space of both containers
containerData.getStatistics().setBlockBytesForTesting(Double.valueOf(
StorageUnit.MB.toBytes(950)).longValue());
+ containerData2.getStatistics().setBlockBytesForTesting(Double.valueOf(
+ StorageUnit.MB.toBytes(950)).longValue());
ContainerCommandResponseProto responseTwo = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
+ ContainerCommandResponseProto responseThree = hddsDispatcher
+ .dispatch(getWriteChunkRequest(dd.getUuidString(), 2L, 1L), null);
assertEquals(ContainerProtos.Result.SUCCESS,
responseTwo.getResult());
- verify(context, times(1))
+ assertEquals(ContainerProtos.Result.SUCCESS, responseThree.getResult());
+ // container action should be added for both containers
+ verify(context, times(2))
.addContainerActionIfAbsent(any(ContainerAction.class));
-
+ DatanodeStateMachine stateMachine = context.getParent();
+ // immediate heartbeat should be triggered for both the containers
+ verify(stateMachine, times(2)).triggerHeartbeat();
+
+ // if we write again to container 1, the container action should get
added but heartbeat should not get triggered
+ // again because of throttling
+ hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L,
3L), null);
+ verify(context,
times(3)).addContainerActionIfAbsent(any(ContainerAction.class));
+ verify(stateMachine, times(2)).triggerHeartbeat(); // was called twice
before
} finally {
volumeSet.shutdown();
ContainerMetrics.remove();
@@ -276,6 +310,7 @@ public void testContainerCloseActionWhenVolumeFull(
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
+ DatanodeStateMachine stateMachine = context.getParent();
// create a 50 byte container
// available (160) > 100 (min free space) + 50 (container size)
KeyValueContainerData containerData = new KeyValueContainerData(1L,
@@ -300,14 +335,21 @@ public void testContainerCloseActionWhenVolumeFull(
conf, containerSet, volumeSet, handlers, context, metrics, null);
hddsDispatcher.setClusterId(scmId.toString());
containerData.getVolume().getVolumeUsage()
- .ifPresent(usage -> usage.incrementUsedSpace(50));
- usedSpace.addAndGet(50);
+ .ifPresent(usage -> usage.incrementUsedSpace(60));
+ usedSpace.addAndGet(60);
ContainerCommandResponseProto response = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));
+ // verify that immediate heartbeat is triggered
+ verify(stateMachine, times(1)).triggerHeartbeat();
+ // the volume has reached the min free space boundary but this time the
heartbeat should not be triggered because
+ // of throttling
+ hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L,
2L), null);
+ verify(context,
times(2)).addContainerActionIfAbsent(any(ContainerAction.class));
+ verify(stateMachine, times(1)).triggerHeartbeat(); // was called once
before
// try creating another container now as the volume used has crossed
// threshold
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]