This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new caa71a3e748 MINOR: Deflake Headers store upgrade integration test
(#21808)
caa71a3e748 is described below
commit caa71a3e748f56fc3bb4f08d8ab3f07a0df79466
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Mar 18 16:44:44 2026 +0100
MINOR: Deflake Headers store upgrade integration test (#21808)
This PR
- Eliminates race condition: Application fully initializes before store
queries
- More deterministic tests: Waits for RUNNING state instead of hoping
timing works out
- Especially helps in-memory stores: They need time to rebuild state
from scratch
- Follows Kafka Streams test best practices: Aligns with patterns used
elsewhere in the codebase
Reviewers: TengYao Chi <[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 60 +++++++++++-----------
1 file changed, 30 insertions(+), 30 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index f9614f5909a..25fb33dd182 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -157,7 +157,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
@@ -177,7 +177,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// Verify legacy data can be read with empty headers
verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
@@ -208,7 +208,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
@@ -230,7 +230,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// Verify legacy data can be read with empty headers
verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
@@ -271,7 +271,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
processKeyValueAndVerifyValue("key1", "value1");
final long lastUpdateKeyOne = persistentStore ? -1L :
CLUSTER.time.milliseconds() - 1L;
@@ -296,7 +296,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// Verify legacy data can be read with empty headers and timestamp
verifyLegacyValuesWithEmptyHeaders("key1", "value1", lastUpdateKeyOne);
@@ -327,7 +327,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
processKeyValueAndVerifyValue("key1", "value1");
processKeyValueAndVerifyValue("key2", "value2");
@@ -349,7 +349,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// Verify legacy data can be read with empty headers
verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1L);
@@ -636,7 +636,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long baseTime = CLUSTER.time.milliseconds();
processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime +
100);
@@ -659,7 +659,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1",
baseTime + 100, persistentStore ? -1L : baseTime + 100);
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2",
baseTime + 200, persistentStore ? -1L : baseTime + 200);
@@ -688,7 +688,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long baseTime = CLUSTER.time.milliseconds();
processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime +
100);
@@ -709,7 +709,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1",
baseTime + 100, -1L);
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2",
baseTime + 200, -1L);
@@ -757,7 +757,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long baseTime = CLUSTER.time.milliseconds();
processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
@@ -779,7 +779,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
@@ -808,7 +808,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long baseTime = CLUSTER.time.milliseconds();
processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
@@ -829,7 +829,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
@@ -1184,7 +1184,7 @@ public class HeadersStoreUpgradeIntegrationTest {
boolean exceptionThrown = false;
try {
- kafkaStreams.start();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
} catch (final Exception e) {
Throwable cause = e;
while (cause != null) {
@@ -1228,7 +1228,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(KeyValueProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
processKeyValueAndVerifyValue("key3", "value3");
processKeyValueAndVerifyValue("key4", "value4");
@@ -1256,7 +1256,7 @@ public class HeadersStoreUpgradeIntegrationTest {
boolean exceptionThrown = false;
try {
- kafkaStreams.start();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
} catch (final Exception e) {
Throwable cause = e;
while (cause != null) {
@@ -1300,7 +1300,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedKeyValueProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// verify legacy key, values
verifyLegacyTimestampedValue("key1", "value1", 11L);
@@ -1335,7 +1335,7 @@ public class HeadersStoreUpgradeIntegrationTest {
boolean exceptionThrown = false;
try {
- kafkaStreams.start();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
} catch (final Exception e) {
Throwable cause = e;
while (cause != null) {
@@ -1384,7 +1384,7 @@ public class HeadersStoreUpgradeIntegrationTest {
boolean exceptionThrown = false;
try {
- kafkaStreams.start();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
} catch (final Exception e) {
Throwable cause = e;
while (cause != null) {
@@ -1431,7 +1431,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long newTime = CLUSTER.time.milliseconds();
processPlainWindowedKeyValueAndVerify("key3", "value3", newTime + 300);
@@ -1461,7 +1461,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long newTime = CLUSTER.time.milliseconds();
processWindowedKeyValueAndVerifyTimestamped("key3", "value3", newTime
+ 300);
@@ -1528,7 +1528,7 @@ public class HeadersStoreUpgradeIntegrationTest {
return baseTime;
}
- private long setupWindowStoreWithHeaders(final Properties props) {
+ private long setupWindowStoreWithHeaders(final Properties props) throws
Exception {
final StreamsBuilder headersBuilder = new StreamsBuilder();
headersBuilder.addStateStore(
Stores.timestampedWindowStoreWithHeadersBuilder(
@@ -1542,7 +1542,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
return CLUSTER.time.milliseconds();
}
@@ -1571,7 +1571,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final Headers headers = new RecordHeaders();
headers.add("source", "test".getBytes());
@@ -1666,7 +1666,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final Properties props = props();
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long baseTime = CLUSTER.time.milliseconds();
processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
@@ -1731,7 +1731,7 @@ public class HeadersStoreUpgradeIntegrationTest {
boolean exceptionThrown = false;
try {
- kafkaStreams.start();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
} catch (final Exception e) {
Throwable cause = e;
while (cause != null) {
@@ -1774,7 +1774,7 @@ public class HeadersStoreUpgradeIntegrationTest {
.process(SessionProcessor::new, SESSION_STORE_NAME);
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
- kafkaStreams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
final long newTime = CLUSTER.time.milliseconds();
processSessionKeyValueAndVerify("key3", "value3", newTime + 300);