This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0446e33339 KAFKA-20363 Fix flaky testIdleTimeCallback in
KafkaEventQueueTest (#21920)
a0446e33339 is described below
commit a0446e33339d0111207663c87c69f30489279a15
Author: Murali Basani <[email protected]>
AuthorDate: Fri Apr 10 11:16:28 2026 +0200
KAFKA-20363 Fix flaky testIdleTimeCallback in KafkaEventQueueTest (#21920)
https://issues.apache.org/jira/browse/KAFKA-20363
The test had a race condition between Test thread and Queue thread. We
actually wait until queue thread is sleeping/waiting so it guarantees
the order.
Reviewers: junvelop <[email protected]>,
TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/queue/KafkaEventQueueTest.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 893ef4ba9f5..ba4b562ae5c 100644
---
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -425,6 +425,17 @@ public class KafkaEventQueueTest {
}
}
+ /**
+ * Wait for the queue's event handler thread to enter cond.await(),
ensuring
+ * that startIdleMs has been captured before the test advances MockTime.
+ */
+ private static void waitForQueueThreadToBeIdle(Thread queueThread) throws
InterruptedException {
+ TestUtils.waitForCondition(
+ () -> queueThread.getState() == Thread.State.WAITING,
+ "Queue thread should be waiting"
+ );
+ }
+
@Test
public void testIdleTimeCallback() throws Exception {
MockTime time = new MockTime();
@@ -440,6 +451,11 @@ public class KafkaEventQueueTest {
lastIdleTimeMs.set(idleDuration);
lastCurrentTimeMs.set(currentTime);
})) {
+ // Capture the queue's event handler thread so we can wait for it
to be idle.
+ CompletableFuture<Thread> queueThreadFuture = new
CompletableFuture<>();
+ queue.append(() ->
queueThreadFuture.complete(Thread.currentThread()));
+ Thread queueThread = queueThreadFuture.get();
+
time.sleep(2);
assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be
0ms");
@@ -451,6 +467,8 @@ public class KafkaEventQueueTest {
}));
assertEquals("event1-processed", event1.get());
+ waitForQueueThreadToBeIdle(queueThread);
+
long timeBeforeWait = time.milliseconds();
long waitTime5Ms = 5;
time.sleep(waitTime5Ms);
@@ -464,6 +482,8 @@ public class KafkaEventQueueTest {
assertEquals(timeBeforeWait + waitTime5Ms,
lastCurrentTimeMs.get(), "Current time should be " + (timeBeforeWait +
waitTime5Ms) + "ms, was: " + lastCurrentTimeMs.get());
// Test 2: Deferred event
+ waitForQueueThreadToBeIdle(queueThread);
+
long timeBeforeDeferred = time.milliseconds();
long waitTime2Ms = 2;
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();