This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b803e8be979 KAFKA-20329: Test headers dsl.store.format further (2/N) 
(#21812)
b803e8be979 is described below

commit b803e8be979175e950fea2b08ebd1561b5e4290b
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Apr 1 00:39:34 2026 +0200

    KAFKA-20329: Test headers dsl.store.format further (2/N) (#21812)
    
    This PR adds withHeaders parameterization to Kafka Streams integration
    Store & Query tests to verify    that Interactive Query operations work
    correctly with both DSL store format configurations (regular    and
    headers-based format).  Due to current IQv2 limitations with headers
    stores, `headers` store types are not added to
    `IQv2StoreIntegrationTest.StoresToTest`.
    
      Changes:    - Converted @Test methods to @ParameterizedTest with
    withHeaders parameter    - When withHeaders=true, tests configure
    StreamsConfig.DSL_STORE_FORMAT_CONFIG to
    StreamsConfig.DSL_STORE_FORMAT_HEADERS
    
      This ensures comprehensive test coverage for Interactive Queries
    across different    store formats.
---
 .../integration/IQv2StoreIntegrationTest.java      | 20 ++++--
 .../integration/StoreQueryIntegrationTest.java     | 76 +++++++++++++---------
 2 files changed, 57 insertions(+), 39 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 06eb80ebd2a..8f27ff9bb90 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -362,7 +362,9 @@ public class IQv2StoreIntegrationTest {
                 for (final StoresToTest toTest : StoresToTest.values()) {
                     for (final String kind : Arrays.asList("DSL", "PAPI")) {
                         for (final String groupProtocol : 
Arrays.asList("classic", "streams")) {
-                            values.add(Arguments.of(cacheEnabled, logEnabled, 
toTest.name(), kind, groupProtocol));
+                            for (final boolean withHeaders : 
Arrays.asList(true, false)) {
+                                values.add(Arguments.of(cacheEnabled, 
logEnabled, toTest.name(), kind, groupProtocol, withHeaders));
+                            }
                         }
                     }
                 }
@@ -429,14 +431,15 @@ public class IQv2StoreIntegrationTest {
         ));
     }
 
-    public void setup(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind, final String groupProtocol) {
+    public void setup(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind, final String groupProtocol, final 
boolean withHeaders) {
         final StoreSupplier<?> supplier = storeToTest.supplier();
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
             storeToTest.name(),
             kind,
-            groupProtocol
+            groupProtocol,
+            withHeaders
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -769,8 +772,8 @@ public class IQv2StoreIntegrationTest {
 
     @ParameterizedTest
     @MethodSource("data")
-    public void verifyStore(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind, final String groupProtocol) {
-        setup(cache, log, storeToTest, kind, groupProtocol);
+    public void verifyStore(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind, final String groupProtocol, final 
boolean withHeaders) {
+        setup(cache, log, storeToTest, kind, groupProtocol, withHeaders);
         try {
             if (storeToTest.global()) {
                 // See KAFKA-13523
@@ -2026,10 +2029,10 @@ public class IQv2StoreIntegrationTest {
     }
 
     private static Properties streamsConfiguration(final boolean cache, final 
boolean log,
-                                                   final String supplier, 
final String kind, final String groupProtocol) {
+                                                   final String supplier, 
final String kind, final String groupProtocol, final boolean withHeaders) {
         final String safeTestName =
             IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log 
+ "-" + supplier
-                + "-" + kind + "-" + groupProtocol + "-" + RANDOM.nextInt();
+                + "-" + kind + "-" + groupProtocol + "-" + withHeaders + "-" + 
RANDOM.nextInt();
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
@@ -2045,6 +2048,9 @@ public class IQv2StoreIntegrationTest {
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
+        if (withHeaders) {
+            config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         return config;
     }
 }
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index da7a9076db7..b417d84a055 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -50,9 +50,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,8 +130,9 @@ public class StoreQueryIntegrationTest {
         CLUSTER.stop();
     }
 
-    @Test
-    public void shouldQueryOnlyActivePartitionStoresByDefault() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldQueryOnlyActivePartitionStoresByDefault(final boolean 
withHeaders) throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
@@ -138,9 +140,9 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
         final int kafkaStreams1Port = port;
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
@@ -177,8 +179,9 @@ public class StoreQueryIntegrationTest {
         });
     }
 
-    @Test
-    public void shouldQuerySpecificActivePartitionStores() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldQuerySpecificActivePartitionStores(final boolean 
withHeaders) throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
@@ -186,9 +189,9 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
         final int kafkaStreams1Port = port;
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
@@ -263,8 +266,9 @@ public class StoreQueryIntegrationTest {
         });
     }
 
-    @Test
-    public void shouldQueryAllStalePartitionStores() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldQueryAllStalePartitionStores(final boolean withHeaders) 
throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
@@ -272,8 +276,8 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
@@ -296,8 +300,9 @@ public class StoreQueryIntegrationTest {
         }, "store2 cannot find results for key");
     }
 
-    @Test
-    public void shouldQuerySpecificStalePartitionStores() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldQuerySpecificStalePartitionStores(final boolean 
withHeaders) throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
@@ -305,8 +310,8 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
@@ -351,8 +356,9 @@ public class StoreQueryIntegrationTest {
         assertThat(store4.get(key), is(nullValue()));
     }
 
-    @Test
-    public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() 
throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
shouldQuerySpecificStalePartitionStoresMultiStreamThreads(final boolean 
withHeaders) throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
@@ -361,10 +367,10 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final Properties streamsConfiguration1 = streamsConfiguration();
+        final Properties streamsConfiguration1 = 
streamsConfiguration(withHeaders);
         streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
 
-        final Properties streamsConfiguration2 = streamsConfiguration();
+        final Properties streamsConfiguration2 = 
streamsConfiguration(withHeaders);
         streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration1);
@@ -415,18 +421,19 @@ public class StoreQueryIntegrationTest {
         assertThat(store4.get(key), is(nullValue()));
     }
 
-    @Test
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
     @SuppressWarnings("deprecation")
-    public void 
shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology() throws 
Exception {
+    public void 
shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopology(final 
boolean withHeaders) throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
         final int numStreamThreads = 2;
 
-        final Properties streamsConfiguration1 = streamsConfiguration();
+        final Properties streamsConfiguration1 = 
streamsConfiguration(withHeaders);
         streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
 
-        final Properties streamsConfiguration2 = streamsConfiguration();
+        final Properties streamsConfiguration2 = 
streamsConfiguration(withHeaders);
         streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
 
         final String topologyA = "topology-A";
@@ -487,8 +494,9 @@ public class StoreQueryIntegrationTest {
         assertThat(store4.get(key), is(nullValue()));
     }
 
-    @Test
-    public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldQueryStoresAfterAddingAndRemovingStreamThread(final 
boolean withHeaders) throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final int key2 = 2;
@@ -498,7 +506,7 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final Properties streamsConfiguration1 = streamsConfiguration();
+        final Properties streamsConfiguration1 = 
streamsConfiguration(withHeaders);
         streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration1);
@@ -555,8 +563,9 @@ public class StoreQueryIntegrationTest {
         });
     }
 
-    @Test
-    public void 
shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions()
 throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions(final
 boolean withHeaders) throws Exception {
 
         class BroadcastingPartitioner implements StreamPartitioner<Integer, 
String> {
             @Override
@@ -572,7 +581,7 @@ public class StoreQueryIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         getStreamsBuilderWithTopology(builder, semaphore);
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(withHeaders));
 
         
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), 
Duration.ofSeconds(60));
         produceValueRange(key, 0, batch1NumMessages);
@@ -658,7 +667,7 @@ public class StoreQueryIntegrationTest {
             mockTime);
     }
 
-    private Properties streamsConfiguration() {
+    private Properties streamsConfiguration(final boolean withHeaders) {
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + appId);
@@ -672,6 +681,9 @@ public class StoreQueryIntegrationTest {
         config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        if (withHeaders) {
+            config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         return config;
     }
 }

Reply via email to