This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0dad6a7c9c0 KAFKA-20512 : Fix flakiness - same output topic is being 
used for all parameterized tests  (#22112)
0dad6a7c9c0 is described below

commit 0dad6a7c9c0a365eed29b62e4331744fab30ccf3
Author: Murali Basani <[email protected]>
AuthorDate: Sat Apr 25 01:39:54 2026 +0200

    KAFKA-20512 : Fix flakiness - same output topic is being used for all 
parameterized tests  (#22112)
    
    - Same output topic is used for all parameterizations. Isolating them
    with creating topic prefixing with app id
    - wait for restoring task is called so that assert works for all of
    those 10 unmatched recs
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../OuterJoinListValueStoreRestorationTest.java    | 68 +++++++++++++++-------
 1 file changed, 47 insertions(+), 21 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
index 98e49fa669b..bdf824290bf 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java
@@ -46,6 +46,9 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
@@ -53,7 +56,7 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.st
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Integration test for verifying ListValueStore deserialization behavior 
after state restoration
@@ -66,20 +69,16 @@ public class OuterJoinListValueStoreRestorationTest {
     private static final int NUM_BROKERS = 1;
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    private static final String LEFT_TOPIC = "left-topic";
-    private static final String RIGHT_TOPIC = "right-topic";
-    private static final String OUTPUT_TOPIC = "output-topic";
-
     private String applicationId;
+    private String leftTopic;
+    private String rightTopic;
+    private String outputTopic;
     private Properties streamsConfig;
     private KafkaStreams streams;
 
     @BeforeAll
     public static void startCluster() throws Exception {
         CLUSTER.start();
-        CLUSTER.createTopic(LEFT_TOPIC, 1, 1);
-        CLUSTER.createTopic(RIGHT_TOPIC, 1, 1);
-        CLUSTER.createTopic(OUTPUT_TOPIC, 1, 1);
     }
 
     @AfterAll
@@ -88,17 +87,24 @@ public class OuterJoinListValueStoreRestorationTest {
     }
 
     @BeforeEach
-    public void before(final TestInfo testInfo) {
+    public void before(final TestInfo testInfo) throws Exception {
         applicationId = "outer-join-restoration-test-" + 
safeUniqueTestName(testInfo);
+        leftTopic = applicationId + "-left";
+        rightTopic = applicationId + "-right";
+        outputTopic = applicationId + "-output";
+        CLUSTER.createTopic(leftTopic, 1, 1);
+        CLUSTER.createTopic(rightTopic, 1, 1);
+        CLUSTER.createTopic(outputTopic, 1, 1);
         streamsConfig = getStreamsConfig();
     }
 
     @AfterEach
-    public void after() {
+    public void after() throws Exception {
         if (streams != null) {
             streams.close(Duration.ofSeconds(30));
             streams.cleanUp();
         }
+        CLUSTER.deleteAllTopics();
     }
 
     private static Stream<Arguments> processingGuaranteeAndStoreFormat() {
@@ -125,15 +131,15 @@ public class OuterJoinListValueStoreRestorationTest {
     private KafkaStreams createOuterJoinTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> leftStream = builder.stream(LEFT_TOPIC);
-        final KStream<String, String> rightStream = 
builder.stream(RIGHT_TOPIC);
+        final KStream<String, String> leftStream = builder.stream(leftTopic);
+        final KStream<String, String> rightStream = builder.stream(rightTopic);
 
         leftStream.outerJoin(
             rightStream,
             (leftValue, rightValue) -> "left=" + leftValue + ", right=" + 
rightValue,
             JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(60)),
             StreamJoined.with(Serdes.String(), Serdes.String(), 
Serdes.String())
-        ).to(OUTPUT_TOPIC);
+        ).to(outputTopic);
 
         return new KafkaStreams(builder.build(), streamsConfig);
     }
@@ -157,7 +163,7 @@ public class OuterJoinListValueStoreRestorationTest {
         long timestamp = 1000L;
         for (int i = 0; i < 10; i++) {
             final String key = "key" + i;
-            produceRecord(LEFT_TOPIC, key, "left-" + i, timestamp);
+            produceRecord(leftTopic, key, "left-" + i, timestamp);
             timestamp += 100;
         }
 
@@ -165,12 +171,12 @@ public class OuterJoinListValueStoreRestorationTest {
         // Wait for processing and commit to changelog
 
         // 1- Use a probe record to verify end-to-end: process + commit
-        produceRecord(LEFT_TOPIC, "probe", "probe-left", timestamp);
-        produceRecord(RIGHT_TOPIC, "probe", "probe-right", timestamp);
+        produceRecord(leftTopic, "probe", "probe-left", timestamp);
+        produceRecord(rightTopic, "probe", "probe-right", timestamp);
         // 2- Wait for the join result - this proves processing happened
         waitUntilMinKeyValueRecordsReceived(
             getConsumerConfig(),
-            OUTPUT_TOPIC,
+            outputTopic,
             1,
             30000
         );
@@ -190,16 +196,36 @@ public class OuterJoinListValueStoreRestorationTest {
 
         // NOW advance window to trigger emitNonJoinedOuterRecords()
         final long timestampBeyondWindow = 62000L; // Beyond 60-second window
-        produceRecord(LEFT_TOPIC, "trigger", "trigger-value", 
timestampBeyondWindow);
+        produceRecord(leftTopic, "trigger", "trigger-value", 
timestampBeyondWindow);
 
         final List<KeyValue<String, String>> results = 
waitUntilMinKeyValueRecordsReceived(
             getConsumerConfig(),
-            OUTPUT_TOPIC,
-            1,
+            outputTopic,
+            10,
             30000
         );
 
-        assertFalse(results.isEmpty(), "Should have received output records");
+        final Set<String> expectedKeys = IntStream.range(0, 10)
+            .mapToObj(i -> "key" + i)
+            .collect(Collectors.toSet());
+
+        final Set<String> unmatchedKeys = results.stream()
+            .filter(kv -> kv.value != null && kv.value.endsWith("right=null"))
+            .map(kv -> kv.key)
+            .collect(Collectors.toSet());
+
+        // assert based on record shape
+        assertEquals(expectedKeys, unmatchedKeys,
+            "All 10 unmatched left records should be emitted after restoration 
with right=null shape");
+
+        final Set<String> nonProbeKeys = results.stream()
+            .filter(kv -> !"probe".equals(kv.key))
+            .map(kv -> kv.key)
+            .collect(Collectors.toSet());
+
+        // assert based on keys
+        assertEquals(expectedKeys, nonProbeKeys,
+            "No unexpected keys should appear on the output topic after 
restoration");
     }
 
     private void produceRecord(final String topic, final String key, final 
String value, final long timestamp) {

Reply via email to