This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba96e092fec KAFKA-20211, KAFKA-20345: Fix group coordinator background
metrics test flake (#21770)
ba96e092fec is described below
commit ba96e092fec2a31b27c9d8e8b85e64dd56bc0996
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 25 21:50:18 2026 -0500
KAFKA-20211, KAFKA-20345: Fix group coordinator background metrics test
flake (#21770)
Executor task futures complete before metrics are recorded, so we have
to take care to not advance the clock until after metrics are recorded.
Reviewers: Zheguang Zhao <[email protected]>, ChickenchickenLove
<[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
...CoordinatorBackgroundThreadPoolExecutorTest.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorBackgroundThreadPoolExecutorTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorBackgroundThreadPoolExecutorTest.java
index 61266537c7d..f021cfe2447 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorBackgroundThreadPoolExecutorTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorBackgroundThreadPoolExecutorTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -80,7 +81,7 @@ public class CoordinatorBackgroundThreadPoolExecutorTest {
task1Unblocked.countDown();
task1.get(5, TimeUnit.SECONDS);
- // Task 3 starts.
+ // Task 3 starts after task 1's metrics are recorded.
task3Started.await();
// Task 2 takes 500 ms.
@@ -88,20 +89,24 @@ public class CoordinatorBackgroundThreadPoolExecutorTest {
task2Unblocked.countDown();
task2.get(5, TimeUnit.SECONDS);
+ // Wait until the metrics are recorded before advancing the clock.
+ verify(metrics,
timeout(5000).times(1)).recordBackgroundProcessingTime(500L);
+ verify(metrics,
timeout(5000).times(1)).recordBackgroundThreadBusyTime(250.0);
+
// Task 3 takes 500 ms.
mockTime.sleep(100);
task3Unblocked.countDown();
task3.get(5, TimeUnit.SECONDS);
-
- verify(metrics, times(2)).recordBackgroundQueueTime(0);
- verify(metrics, times(1)).recordBackgroundQueueTime(100);
- verify(metrics, times(1)).recordBackgroundProcessingTime(100);
- verify(metrics, times(2)).recordBackgroundProcessingTime(500);
- verify(metrics, times(1)).recordBackgroundThreadBusyTime(50.0);
- verify(metrics, times(2)).recordBackgroundThreadBusyTime(250.0);
} finally {
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS);
}
+
+ verify(metrics, times(2)).recordBackgroundQueueTime(0L);
+ verify(metrics, times(1)).recordBackgroundQueueTime(100L);
+ verify(metrics, times(1)).recordBackgroundProcessingTime(100L);
+ verify(metrics, times(2)).recordBackgroundProcessingTime(500L);
+ verify(metrics, times(1)).recordBackgroundThreadBusyTime(50.0);
+ verify(metrics, times(2)).recordBackgroundThreadBusyTime(250.0);
}
}