This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f31c264e38 HDDS-13045. Implement Immediate Triggering of Heartbeat 
when Volume Full (#8590)
f31c264e38 is described below

commit f31c264e38b72b34532a2d15488246c7d50011f5
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Fri Jun 13 17:48:08 2025 +0530

    HDDS-13045. Implement Immediate Triggering of Heartbeat when Volume Full 
(#8590)
---
 .../ozone/container/common/impl/ContainerData.java |  7 +++
 .../container/common/impl/HddsDispatcher.java      | 36 +++++++++-------
 .../ozone/container/common/volume/HddsVolume.java  |  6 +++
 .../container/common/impl/TestHddsDispatcher.java  | 50 ++++++++++++++++++++--
 4 files changed, 80 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 1ef5ac23d8..ce0d17477e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -42,6 +42,7 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
@@ -61,6 +62,8 @@ public abstract class ContainerData {
   // For now, we support only KeyValueContainer.
   private final ContainerType containerType;
 
+  private final AtomicBoolean immediateCloseActionSent = new 
AtomicBoolean(false);
+
   // Unique identifier for the container
   private final long containerID;
 
@@ -162,6 +165,10 @@ public long getContainerID() {
     return containerID;
   }
 
+  public AtomicBoolean getImmediateCloseActionSent() {
+    return immediateCloseActionSent;
+  }
+
   /**
    * Returns the path to base dir of the container.
    * @return Path to base dir.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index c4e03e4533..6c35e30bcc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -32,6 +32,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -580,7 +581,9 @@ public void validateContainerCommand(
    */
   private void sendCloseContainerActionIfNeeded(Container container) {
     // We have to find a more efficient way to close a container.
-    boolean isSpaceFull = isContainerFull(container) || 
isVolumeFull(container);
+    boolean isOpen = container != null && container.getContainerState() == 
State.OPEN;
+    boolean isVolumeFull = isOpen && 
isVolumeFullExcludingCommittedSpace(container);
+    boolean isSpaceFull = isVolumeFull || isContainerFull(container);
     boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
     if (shouldClose) {
       ContainerData containerData = container.getContainerData();
@@ -591,6 +594,21 @@ private void sendCloseContainerActionIfNeeded(Container 
container) {
           .setContainerID(containerData.getContainerID())
           .setAction(ContainerAction.Action.CLOSE).setReason(reason).build();
       context.addContainerActionIfAbsent(action);
+      if (isVolumeFull) {
+        HddsVolume volume = containerData.getVolume();
+        LOG.warn("Volume [{}] is full. containerID: {}. Volume usage: [{}].", 
volume, containerData.getContainerID(),
+            volume.getCurrentUsage());
+      }
+      AtomicBoolean immediateCloseActionSent = 
containerData.getImmediateCloseActionSent();
+      // if an immediate heartbeat has not been triggered already, trigger it 
now
+      if (immediateCloseActionSent.compareAndSet(false, true)) {
+        context.getParent().triggerHeartbeat();
+        if (isVolumeFull) {
+          // log only if volume is full
+          // don't want to log if only container is full because that is 
expected to happen frequently
+          LOG.warn("Triggered immediate heartbeat because of full volume.");
+        }
+      }
     }
   }
 
@@ -608,20 +626,8 @@ private boolean isContainerFull(Container container) {
     }
   }
 
-  private boolean isVolumeFull(Container container) {
-    boolean isOpen = Optional.ofNullable(container)
-        .map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN)
-        .orElse(Boolean.FALSE);
-    if (isOpen) {
-      HddsVolume volume = container.getContainerData().getVolume();
-      StorageLocationReport volumeReport = volume.getReport();
-      boolean full = volumeReport.getUsableSpace() <= 0;
-      if (full) {
-        LOG.info("Container {} volume is full: {}", 
container.getContainerData().getContainerID(), volumeReport);
-      }
-      return full;
-    }
-    return false;
+  private boolean isVolumeFullExcludingCommittedSpace(Container container) {
+    return container.getContainerData().getVolume().isVolumeFull();
   }
 
   private boolean isContainerUnhealthy(Container container) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index c34a443d15..ca82a61d13 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -195,6 +195,12 @@ public VolumeInfoMetrics getVolumeInfoStats() {
     return volumeInfoMetrics;
   }
 
+  public boolean isVolumeFull() {
+    SpaceUsageSource currentUsage = getCurrentUsage();
+    // if the volume is failed, this method will implicitly return true 
because available space will be 0
+    return currentUsage.getAvailable() - 
getFreeSpaceToSpare(currentUsage.getCapacity()) <= 0;
+  }
+
   @Override
   protected StorageLocationReport.Builder reportBuilder() {
     StorageLocationReport.Builder builder = super.reportBuilder();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index d209adbb43..1451a14f0f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -79,6 +79,7 @@
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
@@ -125,6 +126,16 @@ public static void init() {
     volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(new 
OzoneConfiguration());
   }
 
+  /**
+   * Tests that close container action is sent when a container is full. First 
two containers are created. Then we
+   * write to one of them to confirm normal writes are successful. Then we 
increase the used space of both containers
+   * such that they're close to full, and write to both of them 
simultaneously. The expectation is that close
+   * container action should be added for both of them and two immediate 
heartbeats should be sent. Next, we write
+   * again to the first container. This time the close container action should 
be queued but immediate heartbeat
+   * should not be sent because of throttling. This confirms that the 
throttling is per container.
+   * @param layout
+   * @throws IOException
+   */
   @ContainerLayoutTestInfo.ContainerTest
   public void testContainerCloseActionWhenFull(
       ContainerLayoutVersion layout) throws IOException {
@@ -141,16 +152,23 @@ public void testContainerCloseActionWhenFull(
       UUID scmId = UUID.randomUUID();
       ContainerSet containerSet = newContainerSet();
       StateContext context = ContainerTestUtils.getMockContext(dd, conf);
+      // create both containers
       KeyValueContainerData containerData = new KeyValueContainerData(1L,
           layout,
           (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
           dd.getUuidString());
+      KeyValueContainerData containerData2 = new KeyValueContainerData(2L,
+          layout, (long) StorageUnit.GB.toBytes(1), 
UUID.randomUUID().toString(), dd.getUuidString());
       Container container = new KeyValueContainer(containerData, conf);
+      Container container2 = new KeyValueContainer(containerData2, conf);
       StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
           .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
       container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
           scmId.toString());
+      container2.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
+          scmId.toString());
       containerSet.addContainer(container);
+      containerSet.addContainer(container2);
       ContainerMetrics metrics = ContainerMetrics.create(conf);
       Map<ContainerType, Handler> handlers = Maps.newHashMap();
       for (ContainerType containerType : ContainerType.values()) {
@@ -159,6 +177,7 @@ public void testContainerCloseActionWhenFull(
                 context.getParent().getDatanodeDetails().getUuidString(),
                 containerSet, volumeSet, volumeChoosingPolicy, metrics, 
NO_OP_ICR_SENDER));
       }
+      // write successfully to first container
       HddsDispatcher hddsDispatcher = new HddsDispatcher(
           conf, containerSet, volumeSet, handlers, context, metrics, null);
       hddsDispatcher.setClusterId(scmId.toString());
@@ -168,15 +187,30 @@ public void testContainerCloseActionWhenFull(
           responseOne.getResult());
       verify(context, times(0))
           .addContainerActionIfAbsent(any(ContainerAction.class));
+      // increment used space of both containers
       containerData.getStatistics().setBlockBytesForTesting(Double.valueOf(
           StorageUnit.MB.toBytes(950)).longValue());
+      containerData2.getStatistics().setBlockBytesForTesting(Double.valueOf(
+          StorageUnit.MB.toBytes(950)).longValue());
       ContainerCommandResponseProto responseTwo = hddsDispatcher
           .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
+      ContainerCommandResponseProto responseThree = hddsDispatcher
+          .dispatch(getWriteChunkRequest(dd.getUuidString(), 2L, 1L), null);
       assertEquals(ContainerProtos.Result.SUCCESS,
           responseTwo.getResult());
-      verify(context, times(1))
+      assertEquals(ContainerProtos.Result.SUCCESS, responseThree.getResult());
+      // container action should be added for both containers
+      verify(context, times(2))
           .addContainerActionIfAbsent(any(ContainerAction.class));
-
+      DatanodeStateMachine stateMachine = context.getParent();
+      // immediate heartbeat should be triggered for both the containers
+      verify(stateMachine, times(2)).triggerHeartbeat();
+
+      // if we write again to container 1, the container action should get 
added but heartbeat should not get triggered
+      // again because of throttling
+      hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 
3L), null);
+      verify(context, 
times(3)).addContainerActionIfAbsent(any(ContainerAction.class));
+      verify(stateMachine, times(2)).triggerHeartbeat(); // was called twice 
before
     } finally {
       volumeSet.shutdown();
       ContainerMetrics.remove();
@@ -276,6 +310,7 @@ public void testContainerCloseActionWhenVolumeFull(
       UUID scmId = UUID.randomUUID();
       ContainerSet containerSet = newContainerSet();
       StateContext context = ContainerTestUtils.getMockContext(dd, conf);
+      DatanodeStateMachine stateMachine = context.getParent();
       // create a 50 byte container
       // available (160) > 100 (min free space) + 50 (container size)
       KeyValueContainerData containerData = new KeyValueContainerData(1L,
@@ -300,14 +335,21 @@ public void testContainerCloseActionWhenVolumeFull(
           conf, containerSet, volumeSet, handlers, context, metrics, null);
       hddsDispatcher.setClusterId(scmId.toString());
       containerData.getVolume().getVolumeUsage()
-          .ifPresent(usage -> usage.incrementUsedSpace(50));
-      usedSpace.addAndGet(50);
+          .ifPresent(usage -> usage.incrementUsedSpace(60));
+      usedSpace.addAndGet(60);
       ContainerCommandResponseProto response = hddsDispatcher
           .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
       assertEquals(ContainerProtos.Result.SUCCESS,
           response.getResult());
       verify(context, times(1))
           .addContainerActionIfAbsent(any(ContainerAction.class));
+      // verify that immediate heartbeat is triggered
+      verify(stateMachine, times(1)).triggerHeartbeat();
+      // the volume has reached the min free space boundary but this time the 
heartbeat should not be triggered because
+      // of throttling
+      hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 
2L), null);
+      verify(context, 
times(2)).addContainerActionIfAbsent(any(ContainerAction.class));
+      verify(stateMachine, times(1)).triggerHeartbeat(); // was called once 
before
 
       // try creating another container now as the volume used has crossed
       // threshold


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to