wuchong commented on code in PR #2926:
URL: https://github.com/apache/fluss/pull/2926#discussion_r2985598075


##########
fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java:
##########
@@ -374,6 +376,83 @@ tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 
0)),
                 () -> 
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
     }
 
+    @Test
+    void testFetchTimeoutReleasesPooledByteBuf() throws Exception {
+        // This test verifies that when a fetchLog RPC times out, the pooled 
ByteBuf
+        // held by the late-arriving FetchLogResponse is properly released.
+        // Without the fix, the ByteBuf would leak, causing Netty direct 
memory growth.
+
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+        try {
+            Configuration conf = new Configuration();
+            ServerNode followerNode =
+                    new ServerNode(
+                            followerServerId,
+                            "localhost",
+                            10001,
+                            ServerType.TABLET_SERVER,
+                            "rack2");
+            TestingLeaderEndpoint testingEndpoint =
+                    new TestingLeaderEndpoint(conf, leaderRM, followerNode);
+
+            // Append records to leader so fetch responses carry actual data
+            CompletableFuture<List<ProduceLogResultForBucket>> future = new 
CompletableFuture<>();
+            leaderRM.appendRecordsToLog(
+                    1000,
+                    1,
+                    Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                    null,
+                    future::complete);
+            assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
+
+            // Configure the endpoint to delay responses by 3 seconds (longer 
than 1s timeout)
+            testingEndpoint.setFetchDelay(scheduler, 3000);
+
+            // Create a fetcher with a very short timeout (1 second) to 
trigger timeout quickly
+            ReplicaFetcherThread timeoutFetcher =
+                    new ReplicaFetcherThread(
+                            "test-timeout-fetcher",
+                            followerRM,
+                            testingEndpoint,
+                            1000,
+                            1 /* 1 second timeout */);
+
+            timeoutFetcher.addBuckets(
+                    Collections.singletonMap(
+                            tb,
+                            new InitialFetchStatus(
+                                    DATA1_TABLE_ID, DATA1_TABLE_PATH, 
leader.id(), 0L)));
+
+            // Start the fetcher - it will send fetches, each timing out after 
1s,
+            // then the delayed responses arrive after 3s
+            timeoutFetcher.start();
+
+            // Wait for at least one timeout + delayed response cycle to 
complete
+            Thread.sleep(5000);
+
+            // Shutdown the fetcher to stop new requests
+            timeoutFetcher.shutdown();
+
+            // Wait a bit more for any remaining delayed responses to arrive 
and be cleaned up
+            Thread.sleep(2000);

Review Comment:
   On slower CI hosts, or if the fetcher starts one more delayed request just 
before `shutdown()`, these fixed `Thread.sleep(5000)` / `Thread.sleep(2000)` 
calls do not guarantee that every scheduled late response has completed before 
the assertions run. In that case `allBufs` can still include an in-flight 
buffer with `refCnt() == 1`, so this new regression test will fail 
nondeterministically even when the production fix is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to