This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b803e8be979 KAFKA-20329: Test headers dsl.store.format further (2/N)
(#21812)
b803e8be979 is described below
commit b803e8be979175e950fea2b08ebd1561b5e4290b
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Apr 1 00:39:34 2026 +0200
KAFKA-20329: Test headers dsl.store.format further (2/N) (#21812)
This PR adds withHeaders parameterization to Kafka Streams integration
Store & Query tests to verify that Interactive Query operations work
correctly with both DSL store format configurations (regular and
headers-based format). Due to current IQv2 limitations with headers
stores, `headers` store types are not added to
`IQv2StoreIntegrationTest.StoresToTest`.
Changes: - Converted @Test methods to @ParameterizedTest with
withHeaders parameter - When withHeaders=true, tests configure
StreamsConfig.DSL_STORE_FORMAT_CONFIG to
StreamsConfig.DSL_STORE_FORMAT_HEADERS
This ensures comprehensive test coverage for Interactive Queries
across different store formats.
---
.../integration/IQv2StoreIntegrationTest.java | 20 ++++--
.../integration/StoreQueryIntegrationTest.java | 76 +++++++++++++---------
2 files changed, 57 insertions(+), 39 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 06eb80ebd2a..8f27ff9bb90 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -362,7 +362,9 @@ public class IQv2StoreIntegrationTest {
for (final StoresToTest toTest : StoresToTest.values()) {
for (final String kind : Arrays.asList("DSL", "PAPI")) {
for (final String groupProtocol :
Arrays.asList("classic", "streams")) {
- values.add(Arguments.of(cacheEnabled, logEnabled,
toTest.name(), kind, groupProtocol));
+ for (final boolean withHeaders :
Arrays.asList(true, false)) {
+ values.add(Arguments.of(cacheEnabled,
logEnabled, toTest.name(), kind, groupProtocol, withHeaders));
+ }
}
}
}
@@ -429,14 +431,15 @@ public class IQv2StoreIntegrationTest {
));
}
- public void setup(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind, final String groupProtocol) {
+ public void setup(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind, final String groupProtocol, final
boolean withHeaders) {
final StoreSupplier<?> supplier = storeToTest.supplier();
final Properties streamsConfig = streamsConfiguration(
cache,
log,
storeToTest.name(),
kind,
- groupProtocol
+ groupProtocol,
+ withHeaders
);
final StreamsBuilder builder = new StreamsBuilder();
@@ -769,8 +772,8 @@ public class IQv2StoreIntegrationTest {
@ParameterizedTest
@MethodSource("data")
- public void verifyStore(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind, final String groupProtocol) {
- setup(cache, log, storeToTest, kind, groupProtocol);
+ public void verifyStore(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind, final String groupProtocol, final
boolean withHeaders) {
+ setup(cache, log, storeToTest, kind, groupProtocol, withHeaders);
try {
if (storeToTest.global()) {
// See KAFKA-13523
@@ -2026,10 +2029,10 @@ public class IQv2StoreIntegrationTest {
}
private static Properties streamsConfiguration(final boolean cache, final
boolean log,
- final String supplier,
final String kind, final String groupProtocol) {
+ final String supplier,
final String kind, final String groupProtocol, final boolean withHeaders) {
final String safeTestName =
IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log
+ "-" + supplier
- + "-" + kind + "-" + groupProtocol + "-" + RANDOM.nextInt();
+ + "-" + kind + "-" + groupProtocol + "-" + withHeaders + "-" +
RANDOM.nextInt();
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
@@ -2045,6 +2048,9 @@ public class IQv2StoreIntegrationTest {
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
+ if (withHeaders) {
+ config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
return config;
}
}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index da7a9076db7..b417d84a055 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -50,9 +50,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,8 +130,9 @@ public class StoreQueryIntegrationTest {
CLUSTER.stop();
}
- @Test
- public void shouldQueryOnlyActivePartitionStoresByDefault() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldQueryOnlyActivePartitionStoresByDefault(final boolean
withHeaders) throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final Semaphore semaphore = new Semaphore(0);
@@ -138,9 +140,9 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
final int kafkaStreams1Port = port;
- final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
final List<KafkaStreams> kafkaStreamsList =
Arrays.asList(kafkaStreams1, kafkaStreams2);
startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
@@ -177,8 +179,9 @@ public class StoreQueryIntegrationTest {
});
}
- @Test
- public void shouldQuerySpecificActivePartitionStores() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldQuerySpecificActivePartitionStores(final boolean
withHeaders) throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final Semaphore semaphore = new Semaphore(0);
@@ -186,9 +189,9 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
final int kafkaStreams1Port = port;
- final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
final List<KafkaStreams> kafkaStreamsList =
Arrays.asList(kafkaStreams1, kafkaStreams2);
startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
@@ -263,8 +266,9 @@ public class StoreQueryIntegrationTest {
});
}
- @Test
- public void shouldQueryAllStalePartitionStores() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldQueryAllStalePartitionStores(final boolean withHeaders)
throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final Semaphore semaphore = new Semaphore(0);
@@ -272,8 +276,8 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
- final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
+ final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
final List<KafkaStreams> kafkaStreamsList =
Arrays.asList(kafkaStreams1, kafkaStreams2);
startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
@@ -296,8 +300,9 @@ public class StoreQueryIntegrationTest {
}, "store2 cannot find results for key");
}
- @Test
- public void shouldQuerySpecificStalePartitionStores() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldQuerySpecificStalePartitionStores(final boolean
withHeaders) throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final Semaphore semaphore = new Semaphore(0);
@@ -305,8 +310,8 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
- final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
+ final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
final List<KafkaStreams> kafkaStreamsList =
Arrays.asList(kafkaStreams1, kafkaStreams2);
startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
@@ -351,8 +356,9 @@ public class StoreQueryIntegrationTest {
assertThat(store4.get(key), is(nullValue()));
}
- @Test
- public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
shouldQuerySpecificStalePartitionStoresMultiStreamThreads(final boolean
withHeaders) throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final Semaphore semaphore = new Semaphore(0);
@@ -361,10 +367,10 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final Properties streamsConfiguration1 = streamsConfiguration();
+ final Properties streamsConfiguration1 =
streamsConfiguration(withHeaders);
streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numStreamThreads);
- final Properties streamsConfiguration2 = streamsConfiguration();
+ final Properties streamsConfiguration2 =
streamsConfiguration(withHeaders);
streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numStreamThreads);
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration1);
@@ -415,18 +421,19 @@ public class StoreQueryIntegrationTest {
assertThat(store4.get(key), is(nullValue()));
}
- @Test
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
@SuppressWarnings("deprecation")
- public void
shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology() throws
Exception {
+ public void
shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology(final
boolean withHeaders) throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final Semaphore semaphore = new Semaphore(0);
final int numStreamThreads = 2;
- final Properties streamsConfiguration1 = streamsConfiguration();
+ final Properties streamsConfiguration1 =
streamsConfiguration(withHeaders);
streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numStreamThreads);
- final Properties streamsConfiguration2 = streamsConfiguration();
+ final Properties streamsConfiguration2 =
streamsConfiguration(withHeaders);
streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numStreamThreads);
final String topologyA = "topology-A";
@@ -487,8 +494,9 @@ public class StoreQueryIntegrationTest {
assertThat(store4.get(key), is(nullValue()));
}
- @Test
- public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldQueryStoresAfterAddingAndRemovingStreamThread(final
boolean withHeaders) throws Exception {
final int batch1NumMessages = 100;
final int key = 1;
final int key2 = 2;
@@ -498,7 +506,7 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final Properties streamsConfiguration1 = streamsConfiguration();
+ final Properties streamsConfiguration1 =
streamsConfiguration(withHeaders);
streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration1);
@@ -555,8 +563,9 @@ public class StoreQueryIntegrationTest {
});
}
- @Test
- public void
shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions(final
boolean withHeaders) throws Exception {
class BroadcastingPartitioner implements StreamPartitioner<Integer,
String> {
@Override
@@ -572,7 +581,7 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
getStreamsBuilderWithTopology(builder, semaphore);
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration(withHeaders));
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1),
Duration.ofSeconds(60));
produceValueRange(key, 0, batch1NumMessages);
@@ -658,7 +667,7 @@ public class StoreQueryIntegrationTest {
mockTime);
}
- private Properties streamsConfiguration() {
+ private Properties streamsConfiguration(final boolean withHeaders) {
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + appId);
@@ -672,6 +681,9 @@ public class StoreQueryIntegrationTest {
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+ if (withHeaders) {
+ config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
return config;
}
}