This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f9037bca4a2 KAFKA-19700 : remove flaky
testDescribeStreamsGroupWithShortTimeout test (#22000)
f9037bca4a2 is described below
commit f9037bca4a27512c02c60039656130fef3f2e434
Author: Murali Basani <[email protected]>
AuthorDate: Sat May 2 01:14:26 2026 +0200
KAFKA-19700 : remove flaky testDescribeStreamsGroupWithShortTimeout test
(#22000)
Removed the flaky test testDescribeStreamsGroupWithShortTimeout.
Reviewers: Matthias J. Sax <[email protected]>
---
.../tools/streams/DescribeStreamsGroupTest.java | 72 ++++++++++------------
1 file changed, 32 insertions(+), 40 deletions(-)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index f03805c37d2..0e62f2d7acb 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.internals.Exit;
import org.apache.kafka.streams.GroupProtocol;
@@ -46,7 +45,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -92,6 +90,7 @@ public class DescribeStreamsGroupTest {
@AfterAll
public static void closeCluster() {
streams.close();
+ cluster.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2,
OUTPUT_TOPIC_2);
cluster.stop();
cluster = null;
}
@@ -215,35 +214,37 @@ public class DescribeStreamsGroupTest {
"Topic " + INPUT_TOPIC_2 + " not created"
);
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2,
OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
- startApplicationAndWaitUntilRunning(streams2);
-
- final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
- final Set<List<String>> expectedRows1 = Set.of(
- List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
- List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
- final Set<List<String>> expectedRows2 = Set.of(
- List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:",
"1:[0];", "TARGET-ACTIVE:", "1:[0];"),
- List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[0];", "TARGET-ACTIVE:", "0:[0];"));
- final Map<String, Set<List<String>>> expectedRowsMap = new HashMap<>();
- expectedRowsMap.put(APP_ID, expectedRows1);
- expectedRowsMap.put(APP_ID_2, expectedRows2);
-
- // The member and process names as well as client-id are not
deterministic, so we don't care about them.
- // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to
rebalance timing, so we don't care about them either.
- final List<Integer> dontCares = List.of(1, 3, 5, 6, 7);
-
- validateDescribeOutput(
- List.of("--bootstrap-server", bootstrapServers, "--describe",
"--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
- expectedHeader, expectedRowsMap, dontCares);
- validateDescribeOutput(
- List.of("--bootstrap-server", bootstrapServers, "--describe",
"--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
- expectedHeader, expectedRowsMap, dontCares);
- validateDescribeOutput(
- List.of("--bootstrap-server", bootstrapServers, "--describe",
"--verbose", "--members", "--all-groups"),
- expectedHeader, expectedRowsMap, dontCares);
-
- streams2.close();
- streams2.cleanUp();
+ try {
+ startApplicationAndWaitUntilRunning(streams2);
+
+ final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+ final Set<List<String>> expectedRows1 = Set.of(
+ List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+ List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
+ final Set<List<String>> expectedRows2 = Set.of(
+ List.of(APP_ID_2, "", "0", "", "streams", "", "", "",
"ACTIVE:", "1:[0];", "TARGET-ACTIVE:", "1:[0];"),
+ List.of(APP_ID_2, "", "0", "", "streams", "", "", "",
"ACTIVE:", "0:[0];", "TARGET-ACTIVE:", "0:[0];"));
+ final Map<String, Set<List<String>>> expectedRowsMap = new
HashMap<>();
+ expectedRowsMap.put(APP_ID, expectedRows1);
+ expectedRowsMap.put(APP_ID_2, expectedRows2);
+
+ // The member and process names as well as client-id are not
deterministic, so we don't care about them.
+ // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to
rebalance timing, so we don't care about them either.
+ final List<Integer> dontCares = List.of(1, 3, 5, 6, 7);
+
+ validateDescribeOutput(
+ List.of("--bootstrap-server", bootstrapServers, "--describe",
"--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
+ expectedHeader, expectedRowsMap, dontCares);
+ validateDescribeOutput(
+ List.of("--bootstrap-server", bootstrapServers, "--describe",
"--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
+ expectedHeader, expectedRowsMap, dontCares);
+ validateDescribeOutput(
+ List.of("--bootstrap-server", bootstrapServers, "--describe",
"--verbose", "--members", "--all-groups"),
+ expectedHeader, expectedRowsMap, dontCares);
+ } finally {
+ streams2.close();
+ streams2.cleanUp();
+ }
}
@Test
@@ -259,15 +260,6 @@ public class DescribeStreamsGroupTest {
List.of("--bootstrap-server", bootstrapServers, "--describe",
"--verbose", "--members", "--group", nonExistingGroup), errorMessage);
}
- @Test
- public void testDescribeStreamsGroupWithShortTimeout() {
- // Note: 1ms timeout may not always trigger timeout on fast machines
with warm groups
- // Using 0ms to ensure timeout
- List<String> args = List.of("--bootstrap-server", bootstrapServers,
"--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "0");
- Throwable e = assertThrows(ExecutionException.class, () ->
getStreamsGroupService(args.toArray(new String[0])).describeGroups());
- assertEquals(TimeoutException.class, e.getCause().getClass());
- }
-
private static Topology topology(String inputTopic, String outputTopic) {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(Serdes.String(),
Serdes.String()))