This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9037bca4a2 KAFKA-19700 : remove flaky 
testDescribeStreamsGroupWithShortTimeout test (#22000)
f9037bca4a2 is described below

commit f9037bca4a27512c02c60039656130fef3f2e434
Author: Murali Basani <[email protected]>
AuthorDate: Sat May 2 01:14:26 2026 +0200

    KAFKA-19700 : remove flaky testDescribeStreamsGroupWithShortTimeout test 
(#22000)
    
    Removed the flaky test testDescribeStreamsGroupWithShortTimeout.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../tools/streams/DescribeStreamsGroupTest.java    | 72 ++++++++++------------
 1 file changed, 32 insertions(+), 40 deletions(-)

diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index f03805c37d2..0e62f2d7acb 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.tools.streams;
 
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.internals.Exit;
 import org.apache.kafka.streams.GroupProtocol;
@@ -46,7 +45,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -92,6 +90,7 @@ public class DescribeStreamsGroupTest {
     @AfterAll
     public static void closeCluster() {
         streams.close();
+        cluster.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2, 
OUTPUT_TOPIC_2);
         cluster.stop();
         cluster = null;
     }
@@ -215,35 +214,37 @@ public class DescribeStreamsGroupTest {
             "Topic " + INPUT_TOPIC_2 + " not created"
         );
         KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, 
OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
-        startApplicationAndWaitUntilRunning(streams2);
-
-        final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
-        final Set<List<String>> expectedRows1 = Set.of(
-            List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", 
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
-            List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", 
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
-        final Set<List<String>> expectedRows2 = Set.of(
-            List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:", 
"1:[0];", "TARGET-ACTIVE:", "1:[0];"),
-            List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:", 
"0:[0];", "TARGET-ACTIVE:", "0:[0];"));
-        final Map<String, Set<List<String>>> expectedRowsMap = new HashMap<>();
-        expectedRowsMap.put(APP_ID, expectedRows1);
-        expectedRowsMap.put(APP_ID_2, expectedRows2);
-
-        // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
-        // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to 
rebalance timing, so we don't care about them either.
-        final List<Integer> dontCares = List.of(1, 3, 5, 6, 7);
-
-        validateDescribeOutput(
-            List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
-            expectedHeader, expectedRowsMap, dontCares);
-        validateDescribeOutput(
-            List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
-            expectedHeader, expectedRowsMap, dontCares);
-        validateDescribeOutput(
-            List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--verbose", "--members", "--all-groups"),
-            expectedHeader, expectedRowsMap, dontCares);
-
-        streams2.close();
-        streams2.cleanUp();
+        try {
+            startApplicationAndWaitUntilRunning(streams2);
+
+            final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+            final Set<List<String>> expectedRows1 = Set.of(
+                List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", 
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+                List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", 
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
+            final Set<List<String>> expectedRows2 = Set.of(
+                List.of(APP_ID_2, "", "0", "", "streams", "", "", "", 
"ACTIVE:", "1:[0];", "TARGET-ACTIVE:", "1:[0];"),
+                List.of(APP_ID_2, "", "0", "", "streams", "", "", "", 
"ACTIVE:", "0:[0];", "TARGET-ACTIVE:", "0:[0];"));
+            final Map<String, Set<List<String>>> expectedRowsMap = new 
HashMap<>();
+            expectedRowsMap.put(APP_ID, expectedRows1);
+            expectedRowsMap.put(APP_ID_2, expectedRows2);
+
+            // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
+            // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to 
rebalance timing, so we don't care about them either.
+            final List<Integer> dontCares = List.of(1, 3, 5, 6, 7);
+
+            validateDescribeOutput(
+                List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
+                expectedHeader, expectedRowsMap, dontCares);
+            validateDescribeOutput(
+                List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
+                expectedHeader, expectedRowsMap, dontCares);
+            validateDescribeOutput(
+                List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--verbose", "--members", "--all-groups"),
+                expectedHeader, expectedRowsMap, dontCares);
+        } finally {
+            streams2.close();
+            streams2.cleanUp();
+        }
     }
 
     @Test
@@ -259,15 +260,6 @@ public class DescribeStreamsGroupTest {
             List.of("--bootstrap-server", bootstrapServers, "--describe", 
"--verbose", "--members", "--group", nonExistingGroup), errorMessage);
     }
 
-    @Test
-    public void testDescribeStreamsGroupWithShortTimeout() {
-        // Note: 1ms timeout may not always trigger timeout on fast machines 
with warm groups
-        // Using 0ms to ensure timeout
-        List<String> args = List.of("--bootstrap-server", bootstrapServers, 
"--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "0");
-        Throwable e = assertThrows(ExecutionException.class, () -> 
getStreamsGroupService(args.toArray(new String[0])).describeGroups());
-        assertEquals(TimeoutException.class, e.getCause().getClass());
-    }
-
     private static Topology topology(String inputTopic, String outputTopic) {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic, Consumed.with(Serdes.String(), 
Serdes.String()))

Reply via email to