This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 200a2430458 HDDS-13939. Potential channel leak in 
StreamingClient.stream() method (#9304)
200a2430458 is described below

commit 200a24304583d369faeb91325afdcd5ba4c6e99a
Author: GUAN-HAO HUANG <[email protected]>
AuthorDate: Tue Nov 18 00:03:43 2025 +0800

    HDDS-13939. Potential channel leak in StreamingClient.stream() method 
(#9304)
---
 .../ozone/container/stream/StreamingClient.java    | 18 +++++++---
 .../container/stream/TestStreamingServer.java      | 38 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
index 55233e749bc..d4aff71b54d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
@@ -86,11 +86,17 @@ public void stream(String id) {
   }
 
   public void stream(String id, long timeout, TimeUnit unit) {
+    Channel channel = null;
     try {
-      Channel channel = bootstrap.connect(host, port).sync().channel();
-      channel.writeAndFlush(id + "\n")
-          .await(timeout, unit);
-      channel.closeFuture().await(timeout, unit);
+      channel = bootstrap.connect(host, port).sync().channel();
+      boolean writeSuccess = channel.writeAndFlush(id + "\n").await(timeout, 
unit);
+      if (!writeSuccess) {
+        throw new StreamingException("Failed to write id " + id + ": timed out 
" + timeout + " " + unit);
+      }
+      boolean closeSuccess = channel.closeFuture().await(timeout, unit);
+      if (!closeSuccess) {
+        throw new StreamingException("Failed to close channel for id " + id + 
": timed out " + timeout + " " + unit);
+      }
       if (!dirstreamClientHandler.isAtTheEnd()) {
         throw new StreamingException("Streaming is failed. Not all files " +
             "are streamed. Please check the log of the server." +
@@ -100,6 +106,10 @@ public void stream(String id, long timeout, TimeUnit unit) 
{
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StreamingException(e);
+    } finally {
+      if (channel != null && channel.isActive()) {
+        channel.close();
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
index 47e280e97ae..7de9e98d3f3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java
@@ -159,6 +159,44 @@ public Map<String, Path> getFilesToStream(String id)
 
   }
 
+  @Test
+  public void testChannelLeakOnTimeoutWithoutClose() throws Exception {
+    Files.createDirectories(sourceDir.resolve(SUBDIR));
+    Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT);
+
+    try (StreamingServer server = new StreamingServer(
+        new DirectoryServerSource(sourceDir) {
+          @Override
+          public Map<String, Path> getFilesToStream(String id)
+              throws InterruptedException {
+            // Delay to cause timeout
+            Thread.sleep(3000L);
+            return super.getFilesToStream(id);
+          }
+        }, 0)) {
+      server.start();
+
+      // Create client WITHOUT try-with-resources to simulate resource leak
+      StreamingClient client = new StreamingClient("localhost", 
server.getPort(),
+          new DirectoryServerDestination(destDir));
+
+      try {
+        client.stream(SUBDIR, 1L, TimeUnit.SECONDS);
+        // Should not reach here
+        throw new AssertionError("Expected exception, but none was thrown");
+      } catch (StreamingException e) {
+        String message = e.getMessage();
+        if (!message.contains("timed out") && !message.contains("timeout")) {
+          throw new AssertionError(
+              "Expected timeout exception, but got: " + message + ". " +
+              "This indicates the bug: await() returned false but we didn't 
check it. " +
+              "Channel may be leaking.");
+        }
+      }
+      client.close();
+    }
+  }
+
   private void streamDir(String subdir) {
     try (StreamingServer server = new StreamingServer(
         new DirectoryServerSource(sourceDir), 0)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to