This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0446e33339 KAFKA-20363 Fix flaky testIdleTimeCallback in 
KafkaEventQueueTest (#21920)
a0446e33339 is described below

commit a0446e33339d0111207663c87c69f30489279a15
Author: Murali Basani <[email protected]>
AuthorDate: Fri Apr 10 11:16:28 2026 +0200

    KAFKA-20363 Fix flaky testIdleTimeCallback in KafkaEventQueueTest (#21920)
    
    https://issues.apache.org/jira/browse/KAFKA-20363
    
    The test had a race condition between Test thread and Queue thread.  We
    actually wait until queue thread is sleeping/waiting so it guarantees
    the order.
    
    Reviewers: junvelop <[email protected]>,
     TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/queue/KafkaEventQueueTest.java  | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 893ef4ba9f5..ba4b562ae5c 100644
--- 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -425,6 +425,17 @@ public class KafkaEventQueueTest {
         }
     }
 
+    /**
+     * Wait for the queue's event handler thread to enter cond.await(), 
ensuring
+     * that startIdleMs has been captured before the test advances MockTime.
+     */
+    private static void waitForQueueThreadToBeIdle(Thread queueThread) throws 
InterruptedException {
+        TestUtils.waitForCondition(
+                () -> queueThread.getState() == Thread.State.WAITING,
+                "Queue thread should be waiting"
+        );
+    }
+
     @Test
     public void testIdleTimeCallback() throws Exception {
         MockTime time = new MockTime();
@@ -440,6 +451,11 @@ public class KafkaEventQueueTest {
                     lastIdleTimeMs.set(idleDuration);
                     lastCurrentTimeMs.set(currentTime);
                 })) {
+            // Capture the queue's event handler thread so we can wait for it 
to be idle.
+            CompletableFuture<Thread> queueThreadFuture = new 
CompletableFuture<>();
+            queue.append(() -> 
queueThreadFuture.complete(Thread.currentThread()));
+            Thread queueThread = queueThreadFuture.get();
+
             time.sleep(2);
             assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 
0ms");
 
@@ -451,6 +467,8 @@ public class KafkaEventQueueTest {
             }));
             assertEquals("event1-processed", event1.get());
 
+            waitForQueueThreadToBeIdle(queueThread);
+
             long timeBeforeWait = time.milliseconds();
             long waitTime5Ms = 5;
             time.sleep(waitTime5Ms);
@@ -464,6 +482,8 @@ public class KafkaEventQueueTest {
             assertEquals(timeBeforeWait + waitTime5Ms, 
lastCurrentTimeMs.get(), "Current time should be " + (timeBeforeWait + 
waitTime5Ms) + "ms, was: " + lastCurrentTimeMs.get());
 
             // Test 2: Deferred event
+            waitForQueueThreadToBeIdle(queueThread);
+
             long timeBeforeDeferred = time.milliseconds();
             long waitTime2Ms = 2;
             CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();

Reply via email to