platinumhamburg commented on code in PR #2926:
URL: https://github.com/apache/fluss/pull/2926#discussion_r2986065942


##########
fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java:
##########
@@ -374,6 +376,83 @@ tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 
0)),
                 () -> 
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
     }
 
+    @Test
+    void testFetchTimeoutReleasesPooledByteBuf() throws Exception {
+        // This test verifies that when a fetchLog RPC times out, the pooled 
ByteBuf
+        // held by the late-arriving FetchLogResponse is properly released.
+        // Without the fix, the ByteBuf would leak, causing Netty direct 
memory growth.
+
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+        try {
+            Configuration conf = new Configuration();
+            ServerNode followerNode =
+                    new ServerNode(
+                            followerServerId,
+                            "localhost",
+                            10001,
+                            ServerType.TABLET_SERVER,
+                            "rack2");
+            TestingLeaderEndpoint testingEndpoint =
+                    new TestingLeaderEndpoint(conf, leaderRM, followerNode);
+
+            // Append records to leader so fetch responses carry actual data
+            CompletableFuture<List<ProduceLogResultForBucket>> future = new 
CompletableFuture<>();
+            leaderRM.appendRecordsToLog(
+                    1000,
+                    1,
+                    Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                    null,
+                    future::complete);
+            assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
+
+            // Configure the endpoint to delay responses by 3 seconds (longer 
than 1s timeout)
+            testingEndpoint.setFetchDelay(scheduler, 3000);
+
+            // Create a fetcher with a very short timeout (1 second) to 
trigger timeout quickly
+            ReplicaFetcherThread timeoutFetcher =
+                    new ReplicaFetcherThread(
+                            "test-timeout-fetcher",
+                            followerRM,
+                            testingEndpoint,
+                            1000,
+                            1 /* 1 second timeout */);
+
+            timeoutFetcher.addBuckets(
+                    Collections.singletonMap(
+                            tb,
+                            new InitialFetchStatus(
+                                    DATA1_TABLE_ID, DATA1_TABLE_PATH, 
leader.id(), 0L)));
+
+            // Start the fetcher - it will send fetches, each timing out after 
1s,
+            // then the delayed responses arrive after 3s
+            timeoutFetcher.start();
+
+            // Wait for at least one timeout + delayed response cycle to 
complete
+            Thread.sleep(5000);
+
+            // Shutdown the fetcher to stop new requests
+            timeoutFetcher.shutdown();
+
+            // Wait a bit more for any remaining delayed responses to arrive 
and be cleaned up
+            Thread.sleep(2000);

Review Comment:
   Yes, it's a flaky case. Now I replaced Thread.sleep with retry() 
condition-based waiting — the test now polls until all allocated ByteBufs reach 
refCnt == 0 (with a 15s timeout), so it won't fail due to timing on slow CI 
hosts. See the follow-up commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to