This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6c651fbfa0a KAFKA-20329: Test headers dsl.store.format further (3/N) 
(#21820)
6c651fbfa0a is described below

commit 6c651fbfa0a7b661d64a3e62ccd80b1c7750ae59
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Apr 14 15:19:20 2026 +0200

    KAFKA-20329: Test headers dsl.store.format further (3/N) (#21820)
    
    This PR parametrizes 10 integration test classes (49 test methods total)
    to ensure comprehensive testing coverage for both default and
    header-based DSL store    formats. Each test now runs twice: once with
    the default store format and once with dsl.store.format=headers.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
    <[email protected]>, TengYao Chi <[email protected]>
---
 .../GlobalKTableEOSIntegrationTest.java            | 36 ++++++----
 .../integration/GlobalKTableIntegrationTest.java   | 69 +++++++++++++------
 .../integration/InternalTopicIntegrationTest.java  | 31 +++++----
 .../integration/MetricsIntegrationTest.java        | 23 +++++--
 .../integration/RestoreIntegrationTest.java        | 78 +++++++++++++++-------
 .../integration/RocksDBMetricsIntegrationTest.java |  9 ++-
 .../StandbyTaskCreationIntegrationTest.java        | 21 +++---
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 58 ++++++++--------
 .../SuppressionDurabilityIntegrationTest.java      |  9 ++-
 .../integration/SuppressionIntegrationTest.java    | 60 ++++++++++-------
 10 files changed, 249 insertions(+), 145 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index cf98c0226bf..11167c84fc1 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -48,9 +48,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -136,12 +137,13 @@ public class GlobalKTableEOSIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-    @Test
-    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) 
throws Exception {
         final KStream<String, String> streamTableJoin = 
stream.leftJoin(globalTable, keyMapper, joiner);
         streamTableJoin.foreach(foreachAction);
         produceInitialGlobalTableValues();
-        startStreams();
+        startStreams(withHeaders);
         produceTopicValues(streamTopic);
 
         final Map<String, String> expected = new HashMap<>();
@@ -207,12 +209,13 @@ public class GlobalKTableEOSIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldKStreamGlobalKTableJoin() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) 
throws Exception {
         final KStream<String, String> streamTableJoin = 
stream.join(globalTable, keyMapper, joiner);
         streamTableJoin.foreach(foreachAction);
         produceInitialGlobalTableValues();
-        startStreams();
+        startStreams(withHeaders);
         produceTopicValues(streamTopic);
 
         final Map<String, String> expected = new HashMap<>();
@@ -277,11 +280,12 @@ public class GlobalKTableEOSIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldRestoreTransactionalMessages() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldRestoreTransactionalMessages(final boolean withHeaders) 
throws Exception {
         produceInitialGlobalTableValues();
 
-        startStreams();
+        startStreams(withHeaders);
 
         final Map<Long, String> expected = new HashMap<>();
         expected.put(1L, "A");
@@ -309,13 +313,14 @@ public class GlobalKTableEOSIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldNotRestoreAbortedMessages() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldNotRestoreAbortedMessages(final boolean withHeaders) 
throws Exception {
         produceAbortedMessages();
         produceInitialGlobalTableValues();
         produceAbortedMessages();
 
-        startStreams();
+        startStreams(withHeaders);
         
         final Map<Long, String> expected = new HashMap<>();
         expected.put(1L, "A");
@@ -350,7 +355,10 @@ public class GlobalKTableEOSIntegrationTest {
         CLUSTER.createTopic(globalTableTopic, 2, 1);
     }
     
-    private void startStreams() {
+    private void startStreams(final boolean withHeaders) {
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         startStreams(null);
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 9730eff0490..4fc0cf85eab 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -54,9 +54,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -139,8 +140,10 @@ public class GlobalKTableIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-    @Test
-    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) 
throws Exception {
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         final KStream<String, String> streamTableJoin = 
stream.leftJoin(globalTable, keyMapper, joiner);
         streamTableJoin.process(supplier);
         produceInitialGlobalTableValues();
@@ -224,8 +227,10 @@ public class GlobalKTableIntegrationTest {
             "waiting for final values");
     }
 
-    @Test
-    public void shouldKStreamGlobalKTableJoin() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) 
throws Exception {
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         final KStream<String, String> streamTableJoin = 
stream.join(globalTable, keyMapper, joiner);
         streamTableJoin.process(supplier);
         produceInitialGlobalTableValues();
@@ -309,8 +314,10 @@ public class GlobalKTableIntegrationTest {
             "waiting for final values");
     }
 
-    @Test
-    public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldRestoreGlobalInMemoryKTableOnRestart(final boolean 
withHeaders) throws Exception {
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         builder = new StreamsBuilder();
         globalTable = builder.globalTable(
             globalTableTopic,
@@ -340,8 +347,10 @@ public class GlobalKTableIntegrationTest {
         assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
     }
 
-    @Test
-    public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldGetToRunningWithOnlyGlobalTopology(final boolean 
withHeaders) throws Exception {
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         builder = new StreamsBuilder();
         globalTable = builder.globalTable(
             globalTableTopic,
@@ -375,8 +384,9 @@ public class GlobalKTableIntegrationTest {
         );
     }
 
-    @Test
-    public void 
testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testProcessingExceptionHandlerContinueEnabledRestorationPhase(final boolean 
withHeaders) throws Exception {
         createBuilderWithFailedProcessor();
         // enable processing exception handler invoked config
         TestGlobalProcessingExceptionHandler.shouldResume = true;
@@ -384,6 +394,8 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
             TestGlobalProcessingExceptionHandler.class);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         produceInitialGlobalTableValues();
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
@@ -391,8 +403,9 @@ public class GlobalKTableIntegrationTest {
         assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
     }
 
-    @Test
-    public void testProcessingExceptionHandlerFailEnabledRestorationPhase() 
throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testProcessingExceptionHandlerFailEnabledRestorationPhase(final boolean 
withHeaders) throws Exception {
         createBuilderWithFailedProcessor();
         // enable processing exception handler invoked config
         TestGlobalProcessingExceptionHandler.shouldResume = false;
@@ -400,6 +413,8 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         produceInitialGlobalTableValues();
         assertThrows(StreamsException.class, () -> {
             startStreams();
@@ -408,8 +423,9 @@ public class GlobalKTableIntegrationTest {
 
     }
 
-    @Test
-    public void testProcessingExceptionHandlerDisabledRestorationPhase() 
throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testProcessingExceptionHandlerDisabledRestorationPhase(final 
boolean withHeaders) throws Exception {
         createBuilderWithFailedProcessor();
         // disable processing exception handler invoked config
         TestGlobalProcessingExceptionHandler.shouldResume = false;
@@ -417,6 +433,8 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         produceInitialGlobalTableValues();
         assertThrows(StreamsException.class, () -> {
             startStreams();
@@ -425,8 +443,9 @@ public class GlobalKTableIntegrationTest {
 
     }
 
-    @Test
-    public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() 
throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testProcessingExceptionHandlerContinueEnabledRunTimePhase(final boolean 
withHeaders) throws Exception {
         createBuilderWithFailedProcessor();
         // enable processing exception handler invoked config
         TestGlobalProcessingExceptionHandler.shouldResume = true;
@@ -434,6 +453,8 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
         produceInitialGlobalTableValues();
@@ -445,14 +466,17 @@ public class GlobalKTableIntegrationTest {
         );
     }
 
-    @Test
-    public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testProcessingExceptionHandlerFailEnabledRunTimePhase(final 
boolean withHeaders) throws Exception {
         createBuilderWithFailedProcessor();
         // enable processing exception handler invoked config
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
 true);
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
         produceInitialGlobalTableValues();
@@ -460,14 +484,17 @@ public class GlobalKTableIntegrationTest {
         assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
     }
 
-    @Test
-    public void testProcessingExceptionHandlerDisabledRunTimePhase() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testProcessingExceptionHandlerDisabledRunTimePhase(final 
boolean withHeaders) throws Exception {
         createBuilderWithFailedProcessor();
         // disable processing exception handler invoked config
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
 false);
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
         produceInitialGlobalTableValues();
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 31e39a4c8b7..b197aa73650 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -51,7 +51,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -149,11 +149,14 @@ public class InternalTopicIntegrationTest {
         return Admin.create(adminClientConfig);
     }
 
-    private void configureStreams(final boolean streamsProtocolEnabled, final 
String appID) {
+    private void configureStreams(final boolean streamsProtocolEnabled, final 
boolean withHeaders, final String appID) {
         streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
         if (streamsProtocolEnabled) {
             streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
+        if (withHeaders) {
+            streamsProp.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
     }
 
     /*
@@ -161,10 +164,10 @@ public class InternalTopicIntegrationTest {
      * for internal repartition topics. See KAFKA-10689
      */
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldGetToRunningWithWindowedTableInFKJ(final boolean 
streamsProtocolEnabled) throws Exception {
-        final String appID = APP_ID + "-windowed-FKJ-" + 
streamsProtocolEnabled;
-        configureStreams(streamsProtocolEnabled, appID);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldGetToRunningWithWindowedTableInFKJ(final boolean 
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+        final String appID = APP_ID + "-windowed-FKJ-" + 
streamsProtocolEnabled + "-" + withHeaders;
+        configureStreams(streamsProtocolEnabled, withHeaders, appID);
 
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
         final KStream<String, String> inputTopic = 
streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
@@ -191,10 +194,10 @@ public class InternalTopicIntegrationTest {
 
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean 
streamsProtocolEnabled) throws Exception {
-        final String appID = APP_ID + "-compact-" + streamsProtocolEnabled;
-        configureStreams(streamsProtocolEnabled, appID);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean 
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+        final String appID = APP_ID + "-compact-" + streamsProtocolEnabled + 
"-" + withHeaders;
+        configureStreams(streamsProtocolEnabled, withHeaders, appID);
 
         //
         // Step 1: Configure and start a simple word count topology
@@ -229,10 +232,10 @@ public class InternalTopicIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final 
boolean streamsProtocolEnabled) throws Exception {
-        final String appID = APP_ID + "-compact-delete-" + 
streamsProtocolEnabled;
-        configureStreams(streamsProtocolEnabled, appID);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final 
boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+        final String appID = APP_ID + "-compact-delete-" + 
streamsProtocolEnabled + "-" + withHeaders;
+        configureStreams(streamsProtocolEnabled, withHeaders, appID);
 
         //
         // Step 1: Configure and start a simple word count topology
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index b31cee581b0..738e5cb5aa3 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -52,7 +52,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -343,11 +343,14 @@ public class MetricsIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldAddMetricsOnAllLevels(final boolean 
streamsProtocolEnabled) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldAddMetricsOnAllLevels(final boolean 
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), 
Serdes.String()));
@@ -382,11 +385,14 @@ public class MetricsIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final 
boolean streamsProtocolEnabled) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final 
boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         final Duration windowSize = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -415,11 +421,14 @@ public class MetricsIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldAddMetricsForSessionStore(final boolean 
streamsProtocolEnabled) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldAddMetricsForSessionStore(final boolean 
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         final Duration inactivityGap = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index b58dd655a2d..1681a7cceb5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -76,7 +76,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -199,8 +199,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldRestoreNullRecord(final boolean useNewProtocol) throws 
Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldRestoreNullRecord(final boolean useNewProtocol, final 
boolean withHeaders) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String applicationId = appId;
@@ -220,6 +220,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         CLUSTER.createTopics(inputTopic);
         CLUSTER.createTopics(outputTopic);
@@ -269,8 +272,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final 
boolean useNewProtocol) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final 
boolean useNewProtocol, final boolean withHeaders) throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final Topology topology = new Topology();
 
@@ -278,6 +281,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         // restoring from 1000 to 4000 (committed), and then process from 4000 
to 5000 on each of the two partitions
         final int offsetLimitDelta = 1000;
@@ -330,8 +336,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean 
useNewProtocol) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean 
useNewProtocol, final boolean withHeaders) throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -340,6 +346,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         // restoring from 1000 to 4000 (committed), and then process from 4000 
to 5000 on each of the two partitions
         final int offsetLimitDelta = 1000;
@@ -395,8 +404,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldRestoreStateFromChangelogTopic(final boolean 
useNewProtocol) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldRestoreStateFromChangelogTopic(final boolean 
useNewProtocol, final boolean withHeaders) throws Exception {
         final String changelog = appId + "-store-changelog";
         CLUSTER.createTopic(changelog, 2, 1);
 
@@ -408,6 +417,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         // restoring from 1000 to 5000, and then process from 5000 to 10000 on 
each of the two partitions
         final int offsetCheckpointed = 1000;
@@ -446,8 +458,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean 
useNewProtocol) throws InterruptedException {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean 
useNewProtocol, final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Integer, Integer> stream = builder.stream(inputStream);
@@ -461,6 +473,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         kafkaStreams = new KafkaStreams(builder.build(), props);
         try {
             startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -470,8 +485,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean 
useNewProtocol) throws InterruptedException {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean 
useNewProtocol, final boolean withHeaders) throws InterruptedException {
         IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
                 asList(KeyValue.pair(1, 1),
                         KeyValue.pair(2, 2),
@@ -504,6 +519,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         kafkaStreams = new KafkaStreams(topology, props);
 
         final CountDownLatch latch = new CountDownLatch(1);
@@ -520,8 +538,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final 
boolean useNewProtocol) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final 
boolean useNewProtocol, final boolean withHeaders) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(
                 inputStream,
@@ -539,6 +557,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props1.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props1.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         purgeLocalStreamsState(props1);
         final KafkaStreams streams1 = new KafkaStreams(builder.build(), 
props1);
 
@@ -548,6 +569,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props2.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props2.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         purgeLocalStreamsState(props2);
         final KafkaStreams streams2 = new KafkaStreams(builder.build(), 
props2);
 
@@ -603,8 +627,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldInvokeUserDefinedGlobalStateRestoreListener(final 
boolean useNewProtocol) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldInvokeUserDefinedGlobalStateRestoreListener(final 
boolean useNewProtocol, final boolean withHeaders) throws Exception {
         final String inputTopic = "inputTopic";
         final String outputTopic = "outputTopic";
         CLUSTER.createTopic(inputTopic, 5, 1);
@@ -633,7 +657,7 @@ public class RestoreIntegrationTest {
 
         sendEvents(inputTopic, sampleData);
 
-        kafkaStreams = startKafkaStreams(builder, null, 
kafkaStreams1Configuration, useNewProtocol);
+        kafkaStreams = startKafkaStreams(builder, null, 
kafkaStreams1Configuration, useNewProtocol, withHeaders);
 
         validateReceivedMessages(sampleData, outputTopic);
 
@@ -642,7 +666,7 @@ public class RestoreIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
 
         final TestStateRestoreListener kafkaStreams1StateRestoreListener = new 
TestStateRestoreListener("ks1", RESTORATION_DELAY);
-        kafkaStreams = startKafkaStreams(builder, 
kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol);
+        kafkaStreams = startKafkaStreams(builder, 
kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol, 
withHeaders);
 
         // Ensure all the restoring tasks are in active state before starting 
the new instance.
         // Otherwise, the tasks which assigned to first kafka streams won't 
encounter "restoring suspend" after being reassigned to the second instance.
@@ -659,7 +683,8 @@ public class RestoreIntegrationTest {
         try (final KafkaStreams kafkaStreams2 = startKafkaStreams(builder,
                                                                   
kafkaStreams2StateRestoreListener,
                                                                   
kafkaStreams2Configuration,
-                                                                  
useNewProtocol)) {
+                                                                  
useNewProtocol,
+                                                                  
withHeaders)) {
 
             waitForCondition(() -> State.RUNNING == kafkaStreams2.state(),
                              90_000,
@@ -675,8 +700,8 @@ public class RestoreIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldRecordRestoreMetrics(final boolean useNewProtocol) 
throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldRecordRestoreMetrics(final boolean useNewProtocol, final 
boolean withHeaders) throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -685,6 +710,9 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
 
@@ -745,11 +773,15 @@ public class RestoreIntegrationTest {
     private KafkaStreams startKafkaStreams(final StreamsBuilder streamsBuilder,
                                            final StateRestoreListener 
stateRestoreListener,
                                            final Map<String, Object> 
extraConfiguration,
-                                           final boolean useNewProtocol) {
+                                           final boolean useNewProtocol,
+                                           final boolean withHeaders) {
         final Properties streamsConfiguration = 
props(mkObjectProperties(extraConfiguration));
         if (useNewProtocol) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         final KafkaStreams kafkaStreams = new 
KafkaStreams(streamsBuilder.build(), streamsConfiguration);
 
         kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index d77355831bc..338b1b6529c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -46,7 +46,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -146,12 +146,15 @@ public class RocksDBMetricsIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean 
streamsProtocolEnabled, final TestInfo testInfo) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean 
streamsProtocolEnabled, final boolean withHeaders, final TestInfo testInfo) 
throws Exception {
         final Properties streamsConfiguration = streamsConfig(testInfo);
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
         final StreamsBuilder builder = builderForStateStores();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 8c8ef3dae9c..c83dfe6d10d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -42,7 +42,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -90,7 +90,7 @@ public class StandbyTaskCreationIntegrationTest {
         client2.close(Duration.ofSeconds(60));
     }
 
-    private Properties streamsConfiguration(final boolean 
streamsProtocolEnabled) {
+    private Properties streamsConfiguration(final boolean 
streamsProtocolEnabled, final boolean withHeaders) {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -103,12 +103,15 @@ public class StandbyTaskCreationIntegrationTest {
         } else {
             
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         }
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
         return streamsConfiguration;
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean 
streamsProtocolEnabled) throws Exception {
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean 
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final String stateStoreName = "myTransformState";
         final StoreBuilder<KeyValueStore<Integer, Integer>> 
keyValueStoreBuilder =
@@ -127,7 +130,7 @@ public class StandbyTaskCreationIntegrationTest {
 
 
         final Topology topology = builder.build();
-        createClients(topology, streamsConfiguration(streamsProtocolEnabled), 
topology, streamsConfiguration(streamsProtocolEnabled));
+        createClients(topology, streamsConfiguration(streamsProtocolEnabled, 
withHeaders), topology, streamsConfiguration(streamsProtocolEnabled, 
withHeaders));
 
         setStateListenersForVerification(thread -> 
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
 
@@ -139,11 +142,11 @@ public class StandbyTaskCreationIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean 
streamsProtocolEnabled) throws Exception {
-        final Properties streamsConfiguration1 = 
streamsConfiguration(streamsProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean 
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+        final Properties streamsConfiguration1 = 
streamsConfiguration(streamsProtocolEnabled, withHeaders);
         streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        final Properties streamsConfiguration2 = 
streamsConfiguration(streamsProtocolEnabled);
+        final Properties streamsConfiguration2 = 
streamsConfiguration(streamsProtocolEnabled, withHeaders);
         streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
 
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 6f29b7b81f5..d588f607263 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -56,7 +56,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -114,14 +114,14 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
 
     private Properties properties;
 
-    private Properties basicProps(final boolean 
streamsRebalanceProtocolEnabled) {
+    private Properties basicProps(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) {
         final String protocol;
         if (streamsRebalanceProtocolEnabled) {
             protocol = 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault());
         } else {
             protocol = 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.getDefault());
         }
-        return mkObjectProperties(
+        final Properties props = mkObjectProperties(
             mkMap(
                 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
                 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
@@ -133,6 +133,10 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
                 mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol)
             )
         );
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
+        return props;
     }
 
     @BeforeEach
@@ -154,9 +158,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldShutdownClient(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldShutdownClient(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
 
             kafkaStreams.setUncaughtExceptionHandler(exception -> 
SHUTDOWN_CLIENT);
@@ -171,38 +175,38 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldReplaceThreads(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldReplaceThreads(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         testReplaceThreads(2);
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldReplaceThreadsWithoutJavaHandler(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldReplaceThreadsWithoutJavaHandler(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception 
thrown"));
         testReplaceThreads(2);
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldReplaceSingleThread(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldReplaceSingleThread(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         testReplaceThreads(1);
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldShutdownMultipleThreadApplication(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldShutdownMultipleThreadApplication(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         testShutdownApplication(2);
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldShutdownSingleThreadApplication(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldShutdownSingleThreadApplication(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         testShutdownApplication(1);
     }
 
@@ -234,9 +238,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void 
shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void 
shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         builder.addGlobalStore(
                 new KeyValueStoreBuilder<>(
                         Stores.persistentKeyValueStore("globalStore"),
@@ -263,9 +267,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void shouldEmitSameRecordAfterFailover(final boolean 
streamsRebalanceProtocolEnabled) throws Exception {
-        properties = basicProps(streamsRebalanceProtocolEnabled);
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    public void shouldEmitSameRecordAfterFailover(final boolean 
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+        properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L);
         properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 7cb8108a2de..dbb703b35e5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -47,9 +47,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,8 +102,9 @@ public class SuppressionDurabilityIntegrationTest {
     private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
     private static final long COMMIT_INTERVAL = 100L;
 
-    @Test
-    public void shouldRecoverBufferAfterShutdown(final TestInfo testInfo) {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldRecoverBufferAfterShutdown(final boolean withHeaders, 
final TestInfo testInfo) {
         final String testId = safeUniqueTestName(testInfo);
         final String appId = "appId_" + testId;
         final String input = "input" + testId;
@@ -149,6 +151,7 @@ public class SuppressionDurabilityIntegrationTest {
         ));
 
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
         try {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 3d7c141129f..8c4000b9002 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -48,8 +48,9 @@ import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Instant;
@@ -113,8 +114,9 @@ public class SuppressionIntegrationTest {
             .count(Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
     }
 
-    @Test
-    public void shouldUseDefaultSerdes() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldUseDefaultSerdes(final boolean withHeaders) {
         final String testId = "-shouldInheritSerdes";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -140,7 +142,7 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw);
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
 
@@ -165,8 +167,9 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldInheritSerdes() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldInheritSerdes(final boolean withHeaders) {
         final String testId = "-shouldInheritSerdes";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -193,7 +196,7 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw);
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
 
@@ -246,8 +249,9 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldShutdownWhenRecordConstraintIsViolated() throws 
InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldShutdownWhenRecordConstraintIsViolated(final boolean 
withHeaders) throws InterruptedException {
         final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -268,7 +272,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
+
         final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
@@ -287,8 +292,9 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldShutdownWhenBytesConstraintIsViolated() throws 
InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldShutdownWhenBytesConstraintIsViolated(final boolean 
withHeaders) throws InterruptedException {
         final String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -310,7 +316,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
+
         final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
@@ -329,8 +336,9 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldAllowOverridingChangelogConfig() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldAllowOverridingChangelogConfig(final boolean 
withHeaders) {
         final String testId = "-shouldAllowOverridingChangelogConfig";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -360,7 +368,7 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw);
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
 
@@ -389,8 +397,9 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldCreateChangelogByDefault() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldCreateChangelogByDefault(final boolean withHeaders) {
         final String testId = "-shouldCreateChangelogByDefault";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -418,7 +427,7 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw);
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
 
@@ -445,8 +454,9 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldAllowDisablingChangelog() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldAllowDisablingChangelog(final boolean withHeaders) {
         final String testId = "-shouldAllowDisablingChangelog";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
@@ -474,7 +484,7 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw);
 
-        final Properties streamsConfig = getStreamsConfig(appId);
+        final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
 
@@ -506,8 +516,8 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    private static Properties getStreamsConfig(final String appId) {
-        return mkProperties(mkMap(
+    private static Properties getStreamsConfig(final String appId, final 
boolean withHeaders) {
+        final Properties props = mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
             mkEntry(StreamsConfig.POLL_MS_CONFIG, 
Integer.toString(COMMIT_INTERVAL)),
@@ -515,6 +525,8 @@ public class SuppressionIntegrationTest {
             mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath())
         ));
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        return props;
     }
 
     /**

Reply via email to