This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new c3167f641a8 KAFKA-20512 : Fix flakiness - same output topic is being
used for all parameterized tests (#22112)
c3167f641a8 is described below
commit c3167f641a8a97fae1c96c08c5bb6990629560bc
Author: Murali Basani <[email protected]>
AuthorDate: Sat Apr 25 01:39:54 2026 +0200
KAFKA-20512 : Fix flakiness - same output topic is being used for all
parameterized tests (#22112)
- Same output topic is used for all parameterizations. Isolating them
with creating topic prefixing with app id
- wait for restoring task is called so that assert works for all of
those 10 unmatched recs
Reviewers: Matthias J. Sax <[email protected]>
---
.../OuterJoinListValueStoreRestorationTest.java | 68 +++++++++++++++-------
1 file changed, 47 insertions(+), 21 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
index 98e49fa669b..bdf824290bf 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
@@ -46,6 +46,9 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
@@ -53,7 +56,7 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.st
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Integration test for verifying ListValueStore deserialization behavior
after state restoration
@@ -66,20 +69,16 @@ public class OuterJoinListValueStoreRestorationTest {
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- private static final String LEFT_TOPIC = "left-topic";
- private static final String RIGHT_TOPIC = "right-topic";
- private static final String OUTPUT_TOPIC = "output-topic";
-
private String applicationId;
+ private String leftTopic;
+ private String rightTopic;
+ private String outputTopic;
private Properties streamsConfig;
private KafkaStreams streams;
@BeforeAll
public static void startCluster() throws Exception {
CLUSTER.start();
- CLUSTER.createTopic(LEFT_TOPIC, 1, 1);
- CLUSTER.createTopic(RIGHT_TOPIC, 1, 1);
- CLUSTER.createTopic(OUTPUT_TOPIC, 1, 1);
}
@AfterAll
@@ -88,17 +87,24 @@ public class OuterJoinListValueStoreRestorationTest {
}
@BeforeEach
- public void before(final TestInfo testInfo) {
+ public void before(final TestInfo testInfo) throws Exception {
applicationId = "outer-join-restoration-test-" +
safeUniqueTestName(testInfo);
+ leftTopic = applicationId + "-left";
+ rightTopic = applicationId + "-right";
+ outputTopic = applicationId + "-output";
+ CLUSTER.createTopic(leftTopic, 1, 1);
+ CLUSTER.createTopic(rightTopic, 1, 1);
+ CLUSTER.createTopic(outputTopic, 1, 1);
streamsConfig = getStreamsConfig();
}
@AfterEach
- public void after() {
+ public void after() throws Exception {
if (streams != null) {
streams.close(Duration.ofSeconds(30));
streams.cleanUp();
}
+ CLUSTER.deleteAllTopics();
}
private static Stream<Arguments> processingGuaranteeAndStoreFormat() {
@@ -125,15 +131,15 @@ public class OuterJoinListValueStoreRestorationTest {
private KafkaStreams createOuterJoinTopology() {
final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> leftStream = builder.stream(LEFT_TOPIC);
- final KStream<String, String> rightStream =
builder.stream(RIGHT_TOPIC);
+ final KStream<String, String> leftStream = builder.stream(leftTopic);
+ final KStream<String, String> rightStream = builder.stream(rightTopic);
leftStream.outerJoin(
rightStream,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" +
rightValue,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(60)),
StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
- ).to(OUTPUT_TOPIC);
+ ).to(outputTopic);
return new KafkaStreams(builder.build(), streamsConfig);
}
@@ -157,7 +163,7 @@ public class OuterJoinListValueStoreRestorationTest {
long timestamp = 1000L;
for (int i = 0; i < 10; i++) {
final String key = "key" + i;
- produceRecord(LEFT_TOPIC, key, "left-" + i, timestamp);
+ produceRecord(leftTopic, key, "left-" + i, timestamp);
timestamp += 100;
}
@@ -165,12 +171,12 @@ public class OuterJoinListValueStoreRestorationTest {
// Wait for processing and commit to changelog
// 1- Use a probe record to verify end-to-end: process + commit
- produceRecord(LEFT_TOPIC, "probe", "probe-left", timestamp);
- produceRecord(RIGHT_TOPIC, "probe", "probe-right", timestamp);
+ produceRecord(leftTopic, "probe", "probe-left", timestamp);
+ produceRecord(rightTopic, "probe", "probe-right", timestamp);
// 2- Wait for the join result - this proves processing happened
waitUntilMinKeyValueRecordsReceived(
getConsumerConfig(),
- OUTPUT_TOPIC,
+ outputTopic,
1,
30000
);
@@ -190,16 +196,36 @@ public class OuterJoinListValueStoreRestorationTest {
// NOW advance window to trigger emitNonJoinedOuterRecords()
final long timestampBeyondWindow = 62000L; // Beyond 60-second window
- produceRecord(LEFT_TOPIC, "trigger", "trigger-value",
timestampBeyondWindow);
+ produceRecord(leftTopic, "trigger", "trigger-value",
timestampBeyondWindow);
final List<KeyValue<String, String>> results =
waitUntilMinKeyValueRecordsReceived(
getConsumerConfig(),
- OUTPUT_TOPIC,
- 1,
+ outputTopic,
+ 10,
30000
);
- assertFalse(results.isEmpty(), "Should have received output records");
+ final Set<String> expectedKeys = IntStream.range(0, 10)
+ .mapToObj(i -> "key" + i)
+ .collect(Collectors.toSet());
+
+ final Set<String> unmatchedKeys = results.stream()
+ .filter(kv -> kv.value != null && kv.value.endsWith("right=null"))
+ .map(kv -> kv.key)
+ .collect(Collectors.toSet());
+
+ // assert based on record shape
+ assertEquals(expectedKeys, unmatchedKeys,
+ "All 10 unmatched left records should be emitted after restoration
with right=null shape");
+
+ final Set<String> nonProbeKeys = results.stream()
+ .filter(kv -> !"probe".equals(kv.key))
+ .map(kv -> kv.key)
+ .collect(Collectors.toSet());
+
+ // assert based on keys
+ assertEquals(expectedKeys, nonProbeKeys,
+ "No unexpected keys should appear on the output topic after
restoration");
}
private void produceRecord(final String topic, final String key, final
String value, final long timestamp) {