This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new caa71a3e748 MINOR: Deflake Headers store upgrade integration test 
(#21808)
caa71a3e748 is described below

commit caa71a3e748f56fc3bb4f08d8ab3f07a0df79466
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Mar 18 16:44:44 2026 +0100

    MINOR: Deflake Headers store upgrade integration test (#21808)
    
    This PR
    - Eliminates race condition: Application fully initializes before store
    queries
    - More deterministic tests: Waits for RUNNING state instead of hoping
    timing works out
    - Especially helps in-memory stores: They need time to rebuild state
    from scratch
    - Follows Kafka Streams test best practices: Aligns with patterns used
    elsewhere in the codebase
    
    Reviewers: TengYao Chi <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 60 +++++++++++-----------
 1 file changed, 30 insertions(+), 30 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index f9614f5909a..25fb33dd182 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -157,7 +157,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
         processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
@@ -177,7 +177,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // Verify legacy data can be read with empty headers
         verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
@@ -208,7 +208,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
         processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
@@ -230,7 +230,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // Verify legacy data can be read with empty headers
         verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
@@ -271,7 +271,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         processKeyValueAndVerifyValue("key1", "value1");
         final long lastUpdateKeyOne = persistentStore ? -1L : 
CLUSTER.time.milliseconds() - 1L;
@@ -296,7 +296,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // Verify legacy data can be read with empty headers and timestamp
         verifyLegacyValuesWithEmptyHeaders("key1", "value1", lastUpdateKeyOne);
@@ -327,7 +327,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         processKeyValueAndVerifyValue("key1", "value1");
         processKeyValueAndVerifyValue("key2", "value2");
@@ -349,7 +349,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // Verify legacy data can be read with empty headers
         verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1L);
@@ -636,7 +636,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long baseTime = CLUSTER.time.milliseconds();
         processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime + 
100);
@@ -659,7 +659,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(newBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1", 
baseTime + 100, persistentStore ? -1L : baseTime + 100);
         verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2", 
baseTime + 200, persistentStore ? -1L : baseTime + 200);
@@ -688,7 +688,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long baseTime = CLUSTER.time.milliseconds();
         processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime + 
100);
@@ -709,7 +709,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(newBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1", 
baseTime + 100, -1L);
         verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2", 
baseTime + 200, -1L);
@@ -757,7 +757,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long baseTime = CLUSTER.time.milliseconds();
         processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
@@ -779,7 +779,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(newBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
         verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
@@ -808,7 +808,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long baseTime = CLUSTER.time.milliseconds();
         processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
@@ -829,7 +829,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(newBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
         verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
@@ -1184,7 +1184,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         boolean exceptionThrown = false;
         try {
-            kafkaStreams.start();
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
         } catch (final Exception e) {
             Throwable cause = e;
             while (cause != null) {
@@ -1228,7 +1228,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(KeyValueProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         processKeyValueAndVerifyValue("key3", "value3");
         processKeyValueAndVerifyValue("key4", "value4");
@@ -1256,7 +1256,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         boolean exceptionThrown = false;
         try {
-            kafkaStreams.start();
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
         } catch (final Exception e) {
             Throwable cause = e;
             while (cause != null) {
@@ -1300,7 +1300,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedKeyValueProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // verify legacy key, values
         verifyLegacyTimestampedValue("key1", "value1", 11L);
@@ -1335,7 +1335,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         boolean exceptionThrown = false;
         try {
-            kafkaStreams.start();
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
         } catch (final Exception e) {
             Throwable cause = e;
             while (cause != null) {
@@ -1384,7 +1384,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         boolean exceptionThrown = false;
         try {
-            kafkaStreams.start();
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
         } catch (final Exception e) {
             Throwable cause = e;
             while (cause != null) {
@@ -1431,7 +1431,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long newTime = CLUSTER.time.milliseconds();
         processPlainWindowedKeyValueAndVerify("key3", "value3", newTime + 300);
@@ -1461,7 +1461,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long newTime = CLUSTER.time.milliseconds();
         processWindowedKeyValueAndVerifyTimestamped("key3", "value3", newTime 
+ 300);
@@ -1528,7 +1528,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         return baseTime;
     }
 
-    private long setupWindowStoreWithHeaders(final Properties props) {
+    private long setupWindowStoreWithHeaders(final Properties props) throws 
Exception {
         final StreamsBuilder headersBuilder = new StreamsBuilder();
         headersBuilder.addStateStore(
                 Stores.timestampedWindowStoreWithHeadersBuilder(
@@ -1542,7 +1542,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         return CLUSTER.time.milliseconds();
     }
@@ -1571,7 +1571,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
         kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final Headers headers = new RecordHeaders();
         headers.add("source", "test".getBytes());
@@ -1666,7 +1666,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         final Properties props = props();
         kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long baseTime = CLUSTER.time.milliseconds();
         processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
@@ -1731,7 +1731,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         boolean exceptionThrown = false;
         try {
-            kafkaStreams.start();
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
         } catch (final Exception e) {
             Throwable cause = e;
             while (cause != null) {
@@ -1774,7 +1774,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             .process(SessionProcessor::new, SESSION_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
-        kafkaStreams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         final long newTime = CLUSTER.time.milliseconds();
         processSessionKeyValueAndVerify("key3", "value3", newTime + 300);

Reply via email to