This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 433f06a1665 KAFKA-20329: Test `headers` dsl.store.format further (1/N) 
(#21806)
433f06a1665 is described below

commit 433f06a1665399b30998c49ac546b2a3c016bdff
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Apr 9 00:19:18 2026 +0200

    KAFKA-20329: Test `headers` dsl.store.format further (1/N) (#21806)
    
    This PR adds withHeaders parameterization to Kafka Streams integration
    Tests to verify  that stream operations work correctly with both DSL
    store format configurations (regular  and headers-based format).
    
    Changes:    - Added `withHeaders` boolean parameter to test methods    -
    Tests now run with both `withHeaders=false` and `withHeaders=true`    -
    When `withHeaders=true`, tests configure
    `StreamsConfig.DSL_STORE_FORMAT_CONFIG` to
    `StreamsConfig.DSL_STORE_FORMAT_HEADERS`    - For tests that produce
    data in @BeforeAll, moved data production to @BeforeEach to ensure fresh
    data for each parameterized test run    - Added topic cleanup in
    @AfterEach for proper isolation between test runs
    
      This ensures comprehensive test coverage for foreign key joins across
    different store formats.
    
    Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../JoinGracePeriodDurabilityIntegrationTest.java  |  22 +++-
 .../integration/JoinStoreIntegrationTest.java      |  16 ++-
 .../JoinWithIncompleteMetadataIntegrationTest.java |   2 +-
 .../KStreamAggregationDedupIntegrationTest.java    |  24 ++--
 .../KStreamAggregationIntegrationTest.java         |  80 ++++++++-----
 ...yInnerJoinCustomPartitionerIntegrationTest.java |  25 ++--
 ...bleForeignKeyInnerJoinMultiIntegrationTest.java |  70 ++++++-----
 .../KTableKTableForeignKeyJoinDistributedTest.java |  17 ++-
 .../KTableKTableForeignKeyJoinIntegrationTest.java |  63 ++++++----
 ...reignKeyJoinMaterializationIntegrationTest.java |   6 +-
 .../RelaxedNullKeyRequirementJoinTest.java         |  40 +++++--
 .../SelfJoinUpgradeIntegrationTest.java            |  18 ++-
 .../SlidingWindowedKStreamIntegrationTest.java     |  35 ++++--
 .../StreamStreamJoinIntegrationTest.java           |  51 +++++---
 .../StreamTableJoinIntegrationTest.java            |  27 +++--
 .../StreamTableJoinWithGraceIntegrationTest.java   |  15 ++-
 .../integration/TableTableJoinIntegrationTest.java | 129 ++++++++++++++-------
 .../TimeWindowedKStreamIntegrationTest.java        |  33 +++++-
 .../integration/utils/IntegrationTestUtils.java    |  14 +++
 19 files changed, 467 insertions(+), 220 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index a2d34cb12ba..96544a6d474 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -42,9 +42,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -88,8 +89,9 @@ public class JoinGracePeriodDurabilityIntegrationTest {
     private static final Serde<String> STRING_SERDE = Serdes.String();
     private static final long COMMIT_INTERVAL = 100L;
 
-    @Test
-    public void shouldRecoverBufferAfterShutdown(final TestInfo testInfo) {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldRecoverBufferAfterShutdown(final boolean withHeaders, 
final TestInfo testInfo) {
         final String testId = safeUniqueTestName(testInfo);
         final String appId = "appId_" + testId;
         final String streamInput = "Streaminput" + testId;
@@ -126,7 +128,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
 
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
 
-        KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
+        KafkaStreams driver = startStream(streamsConfig, builder, true, 
withHeaders);
         try {
             produceSynchronouslyToPartitionZero(
                 tableInput,
@@ -170,7 +172,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
             // restart the driver
             driver.close();
             assertThat(driver.state(), is(KafkaStreams.State.NOT_RUNNING));
-            driver = getStartedStreams(streamsConfig, builder, false);
+            driver = startStream(streamsConfig, builder, false, withHeaders);
 
 
             // flush those recovered buffered events out.
@@ -225,4 +227,14 @@ public class JoinGracePeriodDurabilityIntegrationTest {
         ));
         IntegrationTestUtils.produceSynchronously(producerConfig, false, 
topic, Optional.of(0), toProduce);
     }
+
+    private KafkaStreams startStream(
+        final Properties streamsConfig,
+        final StreamsBuilder builder,
+        final boolean clean,
+        final boolean withHeaders) {
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        return getStartedStreams(streamsConfig, builder, clean);
+    }
 }
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index 1041f323ae1..95a2725c8db 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -39,8 +39,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -101,8 +102,9 @@ public class JoinStoreIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
 
-    @Test
-    public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueryable() 
throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
providingAJoinStoreNameShouldNotMakeTheJoinResultQueryable(final boolean 
withHeaders) throws InterruptedException {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-no-store-access");
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -116,6 +118,7 @@ public class JoinStoreIntegrationTest {
             JoinWindows.of(ofMillis(100)),
             StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG, 
withHeaders);
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
             kafkaStreams.setStateListener((newState, oldState) -> {
                 if (newState == KafkaStreams.State.RUNNING) {
@@ -137,8 +140,9 @@ public class JoinStoreIntegrationTest {
         }
     }
 
-    @Test
-    public void 
streamJoinChangelogTopicShouldBeConfiguredWithDeleteOnlyCleanupPolicy() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
streamJoinChangelogTopicShouldBeConfiguredWithDeleteOnlyCleanupPolicy(final 
boolean withHeaders) throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-changelog-cleanup-policy");
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -152,6 +156,8 @@ public class JoinStoreIntegrationTest {
             JoinWindows.of(ofMillis(100)),
             StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG, 
withHeaders);
+
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG);
             final Admin admin = Admin.create(ADMIN_CONFIG)) {
             kafkaStreams.setStateListener((newState, oldState) -> {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 291401c82fd..36d48491ceb 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -98,7 +98,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
         final String appId = APP_ID + "-" + (useNewProtocol ? "new" : "old");
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        
+
         if (useNewProtocol) {
             STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index cca699b9493..254915ce880 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -48,9 +48,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -124,14 +125,17 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
 
-    @Test
-    public void shouldReduce(final TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldReduce(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         produceMessages(System.currentTimeMillis());
         groupedStream
                 .reduce(reducer, Materialized.as("reduce-by-key"))
                 .toStream()
                 .to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final long timestamp = System.currentTimeMillis();
@@ -149,8 +153,9 @@ public class KStreamAggregationDedupIntegrationTest {
                 testInfo);
     }
 
-    @Test
-    public void shouldReduceWindowed(final TestInfo testInfo) throws Exception 
{
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldReduceWindowed(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         final long firstBatchTimestamp = System.currentTimeMillis() - 1000;
         produceMessages(firstBatchTimestamp);
         final long secondBatchTimestamp = System.currentTimeMillis();
@@ -163,6 +168,8 @@ public class KStreamAggregationDedupIntegrationTest {
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
@@ -187,8 +194,9 @@ public class KStreamAggregationDedupIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldGroupByKey(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
@@ -199,6 +207,8 @@ public class KStreamAggregationDedupIntegrationTest {
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final long window = timestamp / 500 * 500;
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 738b5cbb53b..baf0e7b3782 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -70,7 +70,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -170,14 +169,17 @@ public class KStreamAggregationIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-    @Test
-    public void shouldReduce(final TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldReduce(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
         groupedStream
             .reduce(reducer, Materialized.as("reduce-by-key"))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         produceMessages(mockTime.milliseconds());
@@ -220,8 +222,9 @@ public class KStreamAggregationIntegrationTest {
         return keyComparison;
     }
 
-    @Test
-    public void shouldReduceWindowed(final TestInfo testInfo) throws Exception 
{
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldReduceWindowed(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
         mockTime.sleep(1000);
         produceMessages(firstBatchTimestamp);
@@ -236,6 +239,8 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput 
= receiveMessages(
@@ -297,8 +302,9 @@ public class KStreamAggregationIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldAggregate(final TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldAggregate(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
         groupedStream.aggregate(
             initializer,
@@ -307,6 +313,8 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         produceMessages(mockTime.milliseconds());
@@ -337,8 +345,9 @@ public class KStreamAggregationIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldAggregateWindowed(final TestInfo testInfo) throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldAggregateWindowed(final boolean withHeaders, final 
TestInfo testInfo) throws Exception {
         final long firstTimestamp = mockTime.milliseconds();
         mockTime.sleep(1000);
         produceMessages(firstTimestamp);
@@ -356,6 +365,8 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final List<KeyValueTimestamp<Windowed<String>, Integer>> 
windowedMessages = receiveMessagesWithTimestamp(
@@ -445,30 +456,37 @@ public class KStreamAggregationIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldCount(final TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldCount(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count(Materialized.as("count-by-key"))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         shouldCountHelper(testInfo);
     }
 
-    @Test
-    public void shouldCountWithInternalStore(final TestInfo testInfo) throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldCountWithInternalStore(final boolean withHeaders, final 
TestInfo testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count()
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         shouldCountHelper(testInfo);
     }
 
-    @Test
-    public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldGroupByKey(final boolean withHeaders, final TestInfo 
testInfo) throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
@@ -478,6 +496,8 @@ public class KStreamAggregationIntegrationTest {
             .count()
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
@@ -506,8 +526,9 @@ public class KStreamAggregationIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldReduceSlidingWindows(final TestInfo testInfo) throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldReduceSlidingWindows(final boolean withHeaders, final 
TestInfo testInfo) throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
         final long timeDifference = 500L;
         produceMessages(firstBatchTimestamp);
@@ -523,6 +544,8 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput 
= receiveMessages(
@@ -613,8 +636,9 @@ public class KStreamAggregationIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldAggregateSlidingWindows(final boolean withHeaders, final 
TestInfo testInfo) throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
         final long timeDifference = 500L;
         produceMessages(firstBatchTimestamp);
@@ -633,6 +657,8 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final List<KeyValueTimestamp<Windowed<String>, Integer>> 
windowedMessages = receiveMessagesWithTimestamp(
@@ -804,9 +830,7 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
 
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -853,9 +877,7 @@ public class KStreamAggregationIntegrationTest {
         final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
 
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -941,8 +963,9 @@ public class KStreamAggregationIntegrationTest {
         }
     }
     
-    @Test
-    public void shouldCountUnlimitedWindows() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldCountUnlimitedWindows(final boolean withHeaders) throws 
Exception {
         final long startTime = mockTime.milliseconds() - 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1;
         final long incrementTime = Duration.ofDays(1).toMillis();
 
@@ -1011,6 +1034,9 @@ public class KStreamAggregationIntegrationTest {
                 results.put(record.key(), KeyValue.pair(record.value(), 
record.timestamp()));
                 latch.countDown();
             });
+
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
         assertTrue(latch.await(30, TimeUnit.SECONDS));
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 615ebac78c1..7782fe20348 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -49,9 +49,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -173,22 +174,28 @@ public class 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, 
streamsConfigTwo, streamsConfigThree));
     }
 
-    @Test
-    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldInnerJoinMultiPartitionQueryable(final boolean 
withHeaders) throws Exception {
         final Set<KeyValue<String, String>> expectedOne = new HashSet<>();
         expectedOne.add(new KeyValue<>("ID123-1", 
"value1=ID123-A1,value2=BBB"));
         expectedOne.add(new KeyValue<>("ID123-2", 
"value1=ID123-A2,value2=BBB"));
         expectedOne.add(new KeyValue<>("ID123-3", 
"value1=ID123-A3,value2=BBB"));
         expectedOne.add(new KeyValue<>("ID123-4", 
"value1=ID123-A4,value2=BBB"));
 
-        verifyKTableKTableJoin(expectedOne);
+        verifyKTableKTableJoin(expectedOne, withHeaders);
     }
 
-    @Test
-    public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()
 throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions(final
 boolean withHeaders) throws Exception {
         final String innerJoinType = "INNER";
         final String queryableName = innerJoinType + "-store1";
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
+
         streams = prepareTopologyWithNonSingletonPartitions(queryableName, 
streamsConfig);
         streamsTwo = prepareTopologyWithNonSingletonPartitions(queryableName, 
streamsConfigTwo);
         streamsThree = 
prepareTopologyWithNonSingletonPartitions(queryableName, streamsConfigThree);
@@ -210,10 +217,14 @@ public class 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         waitForApplicationState(Arrays.asList(streams, streamsTwo, 
streamsThree), KafkaStreams.State.ERROR, ofSeconds(240));
     }
 
-    private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> 
expectedResult) throws Exception {
+    private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> 
expectedResult, final boolean withHeaders) throws Exception {
         final String innerJoinType = "INNER";
         final String queryableName = innerJoinType + "-store1";
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
+
         streams = prepareTopology(queryableName, streamsConfig);
         streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
         streamsThree = prepareTopology(queryableName, streamsConfigThree);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 45a7f6f4534..19adface8c1 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -47,10 +47,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashSet;
@@ -88,14 +88,8 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
     private static final Properties PRODUCER_CONFIG_3 = new Properties();
 
     @BeforeAll
-    public static void startCluster() throws IOException, InterruptedException 
{
+    public static void startCluster() throws Exception {
         CLUSTER.start();
-        //Use multiple partitions to ensure distribution of keys.
-
-        CLUSTER.createTopic(TABLE_1, 3, 1);
-        CLUSTER.createTopic(TABLE_2, 5, 1);
-        CLUSTER.createTopic(TABLE_3, 7, 1);
-        CLUSTER.createTopic(OUTPUT, 11, 1);
 
         PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
@@ -112,6 +106,30 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
         PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, 
"ktable-ktable-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+    }
+
+    @AfterAll
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + 
"-1");
+        streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath 
+ "-2");
+        streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, 
stateDirBasePath + "-3");
+
+        //Use multiple partitions to ensure distribution of keys.
+        CLUSTER.createTopic(TABLE_1, 3, 1);
+        CLUSTER.createTopic(TABLE_2, 5, 1);
+        CLUSTER.createTopic(TABLE_3, 7, 1);
+        CLUSTER.createTopic(OUTPUT, 11, 1);
+
         final List<KeyValue<Integer, Float>> table1 = asList(
             new KeyValue<>(1, 1.33f),
             new KeyValue<>(2, 2.22f),
@@ -141,28 +159,10 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, 
PRODUCER_CONFIG_1, MOCK_TIME);
         IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, 
PRODUCER_CONFIG_2, MOCK_TIME);
         IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, 
PRODUCER_CONFIG_3, MOCK_TIME);
-
-        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, 
"ktable-ktable-consumer");
-        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);
-        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-    }
-
-    @AfterAll
-    public static void closeCluster() {
-        CLUSTER.stop();
-    }
-
-    @BeforeEach
-    public void before() throws IOException {
-        final String stateDirBasePath = TestUtils.tempDirectory().getPath();
-        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + 
"-1");
-        streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath 
+ "-2");
-        streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, 
stateDirBasePath + "-3");
     }
 
     @AfterEach
-    public void after() throws IOException {
+    public void after() throws Exception {
         if (streams != null) {
             streams.close(Duration.ofSeconds(60));
             streams = null;
@@ -176,21 +176,27 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
             streamsThree = null;
         }
         IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, 
streamsConfigTwo, streamsConfigThree));
+        CLUSTER.deleteTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT);
     }
 
-    @Test
-    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldInnerJoinMultiPartitionQueryable(final boolean 
withHeaders) throws Exception {
         final Set<KeyValue<Integer, String>> expectedOne = new HashSet<>();
         expectedOne.add(new KeyValue<>(1, 
"value1=1.33,value2=10,value3=waffle"));
 
-        verifyKTableKTableJoin(expectedOne);
+        verifyKTableKTableJoin(expectedOne, withHeaders);
     }
 
-    private void verifyKTableKTableJoin(final Set<KeyValue<Integer, String>> 
expectedResult) throws Exception {
+    private void verifyKTableKTableJoin(final Set<KeyValue<Integer, String>> 
expectedResult, final boolean withHeaders) throws Exception {
         final String innerJoinType = "INNER";
         final String queryableName = innerJoinType + "-store1";
         final String queryableNameTwo = innerJoinType + "-store2";
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
+
         streams = prepareTopology(queryableName, queryableNameTwo, 
streamsConfig);
         streamsTwo = prepareTopology(queryableName, queryableNameTwo, 
streamsConfigTwo);
         streamsThree = prepareTopology(queryableName, queryableNameTwo, 
streamsConfigThree);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index 84d9c9d0fcd..a86479d0617 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -39,11 +39,11 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -67,7 +67,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
     @BeforeAll
-    public static void startCluster() throws IOException, InterruptedException 
{
+    public static void startCluster() throws Exception {
         CLUSTER.start();
         CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
     }
@@ -121,11 +121,12 @@ public class KTableKTableForeignKeyJoinDistributedTest {
     }
 
     @AfterEach
-    public void after() {
+    public void after() throws InterruptedException {
         client1.close(Duration.ofSeconds(60));
         client2.close(Duration.ofSeconds(60));
         quietlyCleanStateAfterTest(CLUSTER, client1);
         quietlyCleanStateAfterTest(CLUSTER, client2);
+        CLUSTER.deleteTopics(LEFT_TABLE, RIGHT_TABLE, OUTPUT);
     }
 
     public Properties getStreamsConfiguration(final String safeTestName) {
@@ -157,12 +158,16 @@ public class KTableKTableForeignKeyJoinDistributedTest {
                 .to(OUTPUT);
     }
 
-    @Test
-    public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) 
throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldBeInitializedWithDefaultSerde(final boolean withHeaders, 
final TestInfo testInfo) throws Exception {
         final String safeTestName = safeUniqueTestName(testInfo);
         final Properties streamsConfiguration1 = 
getStreamsConfiguration(safeTestName);
         final Properties streamsConfiguration2 = 
getStreamsConfiguration(safeTestName);
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration1, 
withHeaders);
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration2, 
withHeaders);
+
         //Each streams client needs to have it's own StreamsBuilder in order 
to simulate
         //a truly distributed run
         final StreamsBuilder builder1 = new StreamsBuilder();
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index fa289ca5959..3002ca1d638 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -88,11 +89,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         baseTimestamp = time.milliseconds();
     }
 
-    private static Properties getStreamsProperties(final String optimization) {
-        return mkProperties(mkMap(
+    private static Properties getStreamsProperties(final String optimization, 
final boolean withHeaders) {
+        final Properties props = mkProperties(mkMap(
                 mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
                 mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
optimization)
         ));
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        return props;
     }
 
     // versioning is disabled for these tests, even though the code supports 
building a
@@ -105,7 +108,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         final List<Boolean> rejoin = Arrays.asList(true, false);
         final List<Boolean> leftVersioned = Collections.singletonList(false);
         final List<Boolean> rightVersioned = Collections.singletonList(false);
-        return buildParameters(leftJoin, optimization, materialized, rejoin, 
leftVersioned, rightVersioned);
+        final List<Boolean> withHeaders = Arrays.asList(true, false);
+        return buildParameters(leftJoin, optimization, materialized, rejoin, 
leftVersioned, rightVersioned, withHeaders);
     }
 
     // optimizations and rejoin are disabled for these tests, as these tests 
focus on versioning.
@@ -117,7 +121,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         final List<Boolean> rejoin = Collections.singletonList(false);
         final List<Boolean> leftVersioned = Arrays.asList(true, false);
         final List<Boolean> rightVersioned = Arrays.asList(true, false);
-        return buildParameters(leftJoin, optimization, materialized, rejoin, 
leftVersioned, rightVersioned);
+        final List<Boolean> withHeaders = Arrays.asList(true, false);
+        return buildParameters(leftJoin, optimization, materialized, rejoin, 
leftVersioned, rightVersioned, withHeaders);
     }
 
     // deduplicate test cases in data and versionedData
@@ -171,8 +176,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                    final boolean materialized,
                                                    final boolean rejoin,
                                                    final boolean leftVersioned,
-                                                   final boolean 
rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                   final boolean 
rightVersioned,
+                                                   final boolean withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -296,8 +302,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                          final boolean 
materialized,
                                                          final boolean rejoin,
                                                          final boolean 
leftVersioned,
-                                                         final boolean 
rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                         final boolean 
rightVersioned,
+                                                         final boolean 
withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -369,8 +376,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                      final boolean 
materialized,
                                                      final boolean rejoin,
                                                      final boolean 
leftVersioned,
-                                                     final boolean 
rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                     final boolean 
rightVersioned,
+                                                     final boolean 
withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -489,8 +497,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                                  final boolean 
materialized,
                                                                  final boolean 
rejoin,
                                                                  final boolean 
leftVersioned,
-                                                                 final boolean 
rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                                 final boolean 
rightVersioned,
+                                                                 final boolean 
withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -555,8 +564,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                                       final 
boolean materialized,
                                                                       final 
boolean rejoin,
                                                                       final 
boolean leftVersioned,
-                                                                      final 
boolean rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                                      final 
boolean rightVersioned,
+                                                                      final 
boolean withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -587,8 +597,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                                         final 
boolean materialized,
                                                                         final 
boolean rejoin,
                                                                         final 
boolean leftVersioned,
-                                                                        final 
boolean rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                                        final 
boolean rightVersioned,
+                                                                        final 
boolean withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -694,8 +705,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                                   final 
boolean materialized,
                                                                   final 
boolean rejoin,
                                                                   final 
boolean leftVersioned,
-                                                                  final 
boolean rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                                  final 
boolean rightVersioned,
+                                                                  final 
boolean withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -779,8 +791,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                              final boolean 
materialized,
                                                              final boolean 
rejoin,
                                                              final boolean 
leftVersioned,
-                                                             final boolean 
rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                             final boolean 
rightVersioned,
+                                                             final boolean 
withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, true, rejoin, leftVersioned, rightVersioned, value -> null);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -806,7 +819,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                       final boolean 
materialized,
                                                       final boolean rejoin,
                                                       final boolean 
leftVersioned,
-                                                      final boolean 
rightVersioned) {
+                                                      final boolean 
rightVersioned,
+                                                      final boolean 
withHeaders) {
         final Function<String, String> foreignKeyExtractor = value -> {
             final String split = value.split("\\|")[1];
             if (split.equals("returnNull")) {
@@ -817,7 +831,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 return split;
             }
         };
-        final Properties streamsConfig = getStreamsProperties(optimization);
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, true, rejoin, leftVersioned, rightVersioned, 
foreignKeyExtractor);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
@@ -1003,8 +1017,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                                                           final boolean 
materialized,
                                                           final boolean rejoin,
                                                           final boolean 
leftVersioned,
-                                                          final boolean 
rightVersioned) {
-        final Properties streamsConfig = getStreamsProperties(optimization);
+                                                          final boolean 
rightVersioned,
+                                                          final boolean 
withHeaders) {
+        final Properties streamsConfig = getStreamsProperties(optimization, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index e0e51b84b7b..fd42b21d5b0 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -69,8 +70,9 @@ public class 
KTableKTableForeignKeyJoinMaterializationIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"false, false", "true, false", "true, true"})
-    public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean 
materialized, final boolean queryable) {
+    @CsvSource({"false, false, false", "false, false, true", "true, false, 
false", "true, false, true", "true, true, false", "true, true, true"})
+    public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean 
materialized, final boolean queryable, final boolean withHeaders) {
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, "store", 
materialized, queryable);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
index 8789dadc090..08f419e3188 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +33,8 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -56,6 +58,7 @@ public class RelaxedNullKeyRequirementJoinTest {
     private TestInputTopic<String, String> left;
     private TestInputTopic<String, String> right;
     private TestOutputTopic<String, String> out;
+    private boolean withHeaders;
 
     @BeforeEach
     void beforeEach() {
@@ -69,8 +72,10 @@ public class RelaxedNullKeyRequirementJoinTest {
         testDriver.close();
     }
 
-    @Test
-    void testRelaxedLeftStreamStreamJoin() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testRelaxedLeftStreamStreamJoin(final boolean withHeaders) {
+        this.withHeaders = withHeaders;
         leftStream
             .leftJoin(rightStream, JOINER, WINDOW)
             .to(OUT);
@@ -79,8 +84,10 @@ public class RelaxedNullKeyRequirementJoinTest {
         assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
     }
 
-    @Test
-    void testRelaxedLeftStreamTableJoin() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testRelaxedLeftStreamTableJoin(final boolean withHeaders) {
+        this.withHeaders = withHeaders;
         leftStream
             .leftJoin(rightStream.toTable(), JOINER)
             .to(OUT);
@@ -89,8 +96,10 @@ public class RelaxedNullKeyRequirementJoinTest {
         assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
     }
 
-    @Test
-    void testRelaxedOuterStreamStreamJoin() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testRelaxedOuterStreamStreamJoin(final boolean withHeaders) {
+        this.withHeaders = withHeaders;
         leftStream
             .outerJoin(rightStream, JOINER, WINDOW)
             .to(OUT);
@@ -103,8 +112,10 @@ public class RelaxedNullKeyRequirementJoinTest {
         );
     }
 
-    @Test
-    void testRelaxedLeftStreamGlobalTableJoin() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testRelaxedLeftStreamGlobalTableJoin(final boolean withHeaders) {
+        this.withHeaders = withHeaders;
         final GlobalKTable<String, String> global = 
builder.globalTable("global");
         leftStream
             .leftJoin(global, (key, value) -> null, JOINER)
@@ -114,8 +125,10 @@ public class RelaxedNullKeyRequirementJoinTest {
         assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
     }
 
-    @Test
-    void 
testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void 
testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream(final 
boolean withHeaders) {
+        this.withHeaders = withHeaders;
         leftStream
             .repartition()
             .to(OUT);
@@ -125,7 +138,10 @@ public class RelaxedNullKeyRequirementJoinTest {
     }
 
     private void initTopology() {
-        testDriver = new TopologyTestDriver(builder.build(), props());
+        final Properties props = props();
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        testDriver = new TopologyTestDriver(builder.build(), props);
+
         left = testDriver.createInputTopic(
             LEFT,
             new StringSerializer(),
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index c9b498d4054..e9e53ab6cd9 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -41,8 +41,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -56,6 +57,7 @@ import static 
org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
+
 @Tag("integration")
 public class SelfJoinUpgradeIntegrationTest {
     public static final String INPUT_TOPIC = "selfjoin-input";
@@ -112,8 +114,9 @@ public class SelfJoinUpgradeIntegrationTest {
     }
 
 
-    @Test
-    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldUpgradeWithTopologyOptimizationOff(final boolean 
withHeaders) throws Exception {
 
         final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
         final KStream<String, String> leftOld = streamsBuilderOld.stream(
@@ -128,6 +131,7 @@ public class SelfJoinUpgradeIntegrationTest {
 
         final Properties props = props();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
@@ -155,6 +159,7 @@ public class SelfJoinUpgradeIntegrationTest {
         kafkaStreams = null;
 
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
@@ -176,8 +181,9 @@ public class SelfJoinUpgradeIntegrationTest {
         kafkaStreams.close();
     }
 
-    @Test
-    public void shouldRestartWithTopologyOptimizationOn() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldRestartWithTopologyOptimizationOn(final boolean 
withHeaders) throws Exception {
 
         final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
         final KStream<String, String> leftOld = streamsBuilderOld.stream(
@@ -193,6 +199,7 @@ public class SelfJoinUpgradeIntegrationTest {
 
         final Properties props = props();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
@@ -219,6 +226,7 @@ public class SelfJoinUpgradeIntegrationTest {
         kafkaStreams = null;
 
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 2954fa9806d..b68a5f7a9e5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -54,7 +54,8 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -63,6 +64,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Stream;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
@@ -130,9 +132,22 @@ public class SlidingWindowedKStreamIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
+    public static Stream<Arguments> data() {
+        return Stream.of(
+            Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, false),
+            Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, true),
+            Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false),
+            Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, true),
+            Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, false),
+            Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, true),
+            Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false),
+            Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, true)
+        );
+    }
+
     @ParameterizedTest
-    @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false", 
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
-    public void shouldAggregateWindowedWithNoGrace(final StrategyType 
strategyType, final boolean withCache) throws Exception {
+    @MethodSource("data")
+    public void shouldAggregateWindowedWithNoGrace(final StrategyType 
strategyType, final boolean withCache, final boolean withHeaders) throws 
Exception {
         produceMessages(
             streamOneInput,
             new KeyValueTimestamp<>("A", "1", 0),  // Create [0, 10](0+1)
@@ -158,6 +173,8 @@ public class SlidingWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final boolean emitFinal = 
strategyType.equals(StrategyType.ON_WINDOW_CLOSE);
@@ -197,8 +214,8 @@ public class SlidingWindowedKStreamIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false", 
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
-    public void shouldAggregateWindowedWithGrace(final StrategyType 
strategyType, final boolean withCache) throws Exception {
+    @MethodSource("data")
+    public void shouldAggregateWindowedWithGrace(final StrategyType 
strategyType, final boolean withCache, final boolean withHeaders) throws 
Exception {
         produceMessages(
             streamOneInput,
             new KeyValueTimestamp<>("A", "1", 0),  // Create [0, 10](0+1)
@@ -224,6 +241,8 @@ public class SlidingWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final boolean emitFinal = 
strategyType.equals(StrategyType.ON_WINDOW_CLOSE);
@@ -272,8 +291,8 @@ public class SlidingWindowedKStreamIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false", 
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
-    public void shouldRestoreAfterJoinRestart(final StrategyType strategyType, 
final boolean withCache) throws Exception {
+    @MethodSource("data")
+    public void shouldRestoreAfterJoinRestart(final StrategyType strategyType, 
final boolean withCache, final boolean withHeaders) throws Exception {
         produceMessages(
             streamOneInput,
             new KeyValueTimestamp<>("A", "L1", 0),
@@ -314,6 +333,8 @@ public class SlidingWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         final boolean emitFinal = 
strategyType.equals(StrategyType.ON_WINDOW_CLOSE);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index dc3d0bc6d4f..a74965b22b4 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.test.TestRecord;
@@ -26,7 +27,7 @@ import org.apache.kafka.test.MockMapper;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -45,14 +46,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     private static final String APP_ID = "stream-stream-join-integration-test";
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testSelfJoin(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testSelfJoin(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-selfJoin");
         streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-A", 
null, 2L)),
@@ -88,14 +91,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -140,14 +145,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerRepartitioned(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerRepartitioned(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-repartitioned");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -194,14 +201,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeft(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeft(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -247,14 +256,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftRepartitioned(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftRepartitioned(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left-repartitioned");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -302,14 +313,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuter(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuter(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -356,14 +369,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterRepartitioned(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterRepartitioned(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -412,14 +427,16 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testMultiInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testMultiInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KStream<Long, String> rightStream = 
builder.stream(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-multi-inner");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 18ce26abe08..67d663eaa49 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -27,7 +28,7 @@ import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -45,13 +46,15 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     private static final String APP_ID = "stream-table-join-integration-test";
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -84,13 +87,15 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeft(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeft(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT);
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -123,14 +128,16 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerWithVersionedStore(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerWithVersionedStore(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -163,14 +170,16 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftWithVersionedStore(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftWithVersionedStore(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
index 6195cbeb281..7a078947e05 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -31,7 +32,7 @@ import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -51,8 +52,8 @@ public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegra
             Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), 
"Grace", Duration.ofMillis(2));
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerWithVersionedStore(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerWithVersionedStore(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String()));
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), 
Materialized.as(
@@ -60,6 +61,8 @@ public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegra
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
false);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+
         leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, 
Produced.with(Serdes.Long(), Serdes.String()));
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -92,14 +95,16 @@ public class StreamTableJoinWithGraceIntegrationTest 
extends AbstractJoinIntegra
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftWithVersionedStore(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftWithVersionedStore(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, String> leftStream = 
builder.stream(INPUT_TOPIC_LEFT);
         final KTable<Long, String> rightTable = 
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
                 Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
true);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index a7cb0871286..0a6aa71cf1f 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -29,7 +30,7 @@ import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -61,8 +62,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
             .withLoggingDisabled();
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -70,6 +71,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -106,8 +109,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeft(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeft(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -115,6 +118,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -151,8 +156,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuter(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuter(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -160,6 +165,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -196,8 +203,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerWithVersionedStores(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerWithVersionedStores(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("left", 
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -205,6 +212,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("right", 
Duration.ofMinutes(5))).withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         // versioned stores do not support caching, so we expect the same 
result regardless of whether caching is enabled or not
@@ -238,8 +247,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftWithVersionedStores(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftWithVersionedStores(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("left", 
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -247,6 +256,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("right", 
Duration.ofMinutes(5))).withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         // versioned stores do not support caching, so we expect the same 
result regardless of whether caching is enabled or not
@@ -280,8 +291,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterWithVersionedStores(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterWithVersionedStores(final boolean cacheEnabled, final 
boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("left", 
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -289,6 +300,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("right", 
Duration.ofMinutes(5))).withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         // versioned stores do not support caching, so we expect the same 
result regardless of whether caching is enabled or not
@@ -322,8 +335,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerWithLeftVersionedOnly(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerWithLeftVersionedOnly(final boolean cacheEnabled, 
final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("left", 
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -331,6 +344,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -367,8 +382,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftWithLeftVersionedOnly(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftWithLeftVersionedOnly(final boolean cacheEnabled, 
final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("left", 
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -376,6 +391,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -412,8 +429,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterWithLeftVersionedOnly(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterWithLeftVersionedOnly(final boolean cacheEnabled, 
final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("left", 
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -421,6 +438,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -457,8 +476,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerWithRightVersionedOnly(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerWithRightVersionedOnly(final boolean cacheEnabled, 
final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -466,6 +485,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("right", 
Duration.ofMinutes(5))).withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -502,8 +523,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftWithRightVersionedOnly(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftWithRightVersionedOnly(final boolean cacheEnabled, 
final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -511,6 +532,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("right", 
Duration.ofMinutes(5))).withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -547,8 +570,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterWithRightVersionedOnly(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterWithRightVersionedOnly(final boolean cacheEnabled, 
final boolean withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -556,6 +579,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, 
String>as(Stores.persistentVersionedKeyValueStore("right", 
Duration.ofMinutes(5))).withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -592,8 +617,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -601,6 +626,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner)
                  .join(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -643,8 +670,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerLeft(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerLeft(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -652,6 +679,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner)
                  .leftJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -691,8 +720,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testInnerOuter(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testInnerOuter(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -700,6 +729,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner)
                  .outerJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -742,8 +773,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -751,6 +782,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner)
                  .join(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -790,8 +823,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftLeft(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftLeft(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -799,6 +832,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner)
                  .leftJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -840,8 +875,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLeftOuter(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testLeftOuter(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -849,6 +884,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner)
                  .outerJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -890,8 +927,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterInner(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterInner(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -899,6 +936,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-inner");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner)
                  .join(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -940,8 +979,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterLeft(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterLeft(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -949,6 +988,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-left");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner)
                  .leftJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -992,8 +1033,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testOuterOuter(final boolean cacheEnabled) {
+    @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+    public void testOuterOuter(final boolean cacheEnabled, final boolean 
withHeaders) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("left").withLoggingDisabled());
@@ -1001,6 +1042,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
                 Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("right").withLoggingDisabled());
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-outer");
+
+        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner)
                  .outerJoin(rightTable, valueJoiner, materialized)
                  .toStream()
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 53746cd7fd6..8f6cbe213d5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -138,8 +138,13 @@ public class TimeWindowedKStreamIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false", 
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
-    public void shouldAggregateWindowedWithNoGrace(final StrategyType type, 
final boolean withCache) throws Exception {
+    @CsvSource({
+        "ON_WINDOW_UPDATE, true, false", "ON_WINDOW_UPDATE, true, true",
+        "ON_WINDOW_UPDATE, false, false", "ON_WINDOW_UPDATE, false, true",
+        "ON_WINDOW_CLOSE, true, false", "ON_WINDOW_CLOSE, true, true",
+        "ON_WINDOW_CLOSE, false, false", "ON_WINDOW_CLOSE, false, true"
+    })
+    public void shouldAggregateWindowedWithNoGrace(final StrategyType type, 
final boolean withCache, final boolean withHeaders) throws Exception {
         produceMessages(
             streamOneInput,
             new KeyValueTimestamp<>("A", "1", 0),
@@ -164,6 +169,8 @@ public class TimeWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         // on window close
@@ -208,8 +215,13 @@ public class TimeWindowedKStreamIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false", 
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
-    public void shouldAggregateWindowedWithGrace(final StrategyType type, 
final boolean withCache) throws Exception {
+    @CsvSource({
+        "ON_WINDOW_UPDATE, true, false", "ON_WINDOW_UPDATE, true, true",
+        "ON_WINDOW_UPDATE, false, false", "ON_WINDOW_UPDATE, false, true",
+        "ON_WINDOW_CLOSE, true, false", "ON_WINDOW_CLOSE, true, true",
+        "ON_WINDOW_CLOSE, false, false", "ON_WINDOW_CLOSE, false, true"
+    })
+    public void shouldAggregateWindowedWithGrace(final StrategyType type, 
final boolean withCache, final boolean withHeaders) throws Exception {
         produceMessages(
             streamOneInput,
             new KeyValueTimestamp<>("A", "1", 0),
@@ -234,6 +246,8 @@ public class TimeWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         // on window close
@@ -278,8 +292,13 @@ public class TimeWindowedKStreamIntegrationTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false", 
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
-    public void shouldRestoreAfterJoinRestart(final StrategyType type, final 
boolean withCache) throws Exception {
+    @CsvSource({
+        "ON_WINDOW_UPDATE, true, false", "ON_WINDOW_UPDATE, true, true",
+        "ON_WINDOW_UPDATE, false, false", "ON_WINDOW_UPDATE, false, true",
+        "ON_WINDOW_CLOSE, true, false", "ON_WINDOW_CLOSE, true, true",
+        "ON_WINDOW_CLOSE, false, false", "ON_WINDOW_CLOSE, false, true"
+    })
+    public void shouldRestoreAfterJoinRestart(final StrategyType type, final 
boolean withCache, final boolean withHeaders) throws Exception {
         produceMessages(
             streamOneInput,
             new KeyValueTimestamp<>("A", "L1", 0),
@@ -320,6 +339,8 @@ public class TimeWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
+        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+
         startStreams();
 
         // ON_WINDOW_CLOSE expires all records.
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 832e8eb1a06..079565ea97d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -280,6 +280,20 @@ public class IntegrationTestUtils {
         }
     }
 
+    /**
+     * Configures the DSL store format to use headers if enabled.
+     * This is a helper method to reduce boilerplate in parameterized tests 
that test both
+     * with and without headers mode.
+     *
+     * @param streamsConfig The streams configuration properties to modify
+     * @param withHeaders Whether to enable headers mode
+     */
+    public static void maybeSetDslStoreFormatHeaders(final Properties 
streamsConfig, final boolean withHeaders) {
+        if (withHeaders) {
+            streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
+    }
+
     /**
      * @param topic          Kafka topic to write the data records to
      * @param records        Data records to write to Kafka

Reply via email to