This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27914247233 MINOR: Use maybeSetDslStoreFormatHeaders helper in more 
streams test classes (#22057)
27914247233 is described below

commit 27914247233acae6d5093aa131212509dd9bc5a5
Author: Ken Huang <[email protected]>
AuthorDate: Thu Apr 16 13:38:54 2026 +0800

    MINOR: Use maybeSetDslStoreFormatHeaders helper in more streams test 
classes (#22057)
    
    See discussion:
    https://github.com/apache/kafka/pull/21820#discussion_r3051604424
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../GlobalKTableEOSIntegrationTest.java            |  5 +--
 .../integration/GlobalKTableIntegrationTest.java   | 21 ++++++-----
 .../integration/IQv2StoreIntegrationTest.java      |  5 +--
 .../integration/InternalTopicIntegrationTest.java  |  5 +--
 .../JoinGracePeriodDurabilityIntegrationTest.java  |  3 +-
 .../integration/JoinStoreIntegrationTest.java      |  5 ++-
 .../KStreamAggregationDedupIntegrationTest.java    |  7 ++--
 .../KStreamAggregationIntegrationTest.java         | 25 ++++++------
 ...yInnerJoinCustomPartitionerIntegrationTest.java | 13 ++++---
 ...bleForeignKeyInnerJoinMultiIntegrationTest.java |  7 ++--
 .../KTableKTableForeignKeyJoinDistributedTest.java |  5 ++-
 .../KTableKTableForeignKeyJoinIntegrationTest.java |  4 +-
 ...reignKeyJoinMaterializationIntegrationTest.java |  4 +-
 .../integration/MetricsIntegrationTest.java        | 13 ++-----
 .../RelaxedNullKeyRequirementJoinTest.java         |  4 +-
 .../integration/RestoreIntegrationTest.java        | 40 +++++---------------
 .../integration/RocksDBMetricsIntegrationTest.java |  5 +--
 .../SelfJoinUpgradeIntegrationTest.java            |  9 +++--
 .../SlidingWindowedKStreamIntegrationTest.java     |  7 ++--
 .../StandbyTaskCreationIntegrationTest.java        |  5 +--
 .../integration/StoreQueryIntegrationTest.java     |  5 +--
 .../StreamStreamJoinIntegrationTest.java           | 18 ++++-----
 .../StreamTableJoinIntegrationTest.java            | 10 ++---
 .../StreamTableJoinWithGraceIntegrationTest.java   |  8 ++--
 ...amsUncaughtExceptionHandlerIntegrationTest.java |  5 +--
 .../SuppressionDurabilityIntegrationTest.java      |  3 +-
 .../integration/SuppressionIntegrationTest.java    |  3 +-
 .../integration/TableTableJoinIntegrationTest.java | 44 +++++++++++-----------
 .../TimeWindowedKStreamIntegrationTest.java        |  7 ++--
 .../integration/utils/IntegrationTestUtils.java    | 14 -------
 .../internals/CogroupedKStreamImplTest.java        |  4 +-
 .../kstream/internals/KStreamKTableJoinTest.java   | 23 ++++-------
 .../internals/KStreamKTableLeftJoinTest.java       |  5 +--
 ...KStreamSessionWindowAggregateProcessorTest.java |  4 +-
 .../KStreamSlidingWindowAggregateTest.java         |  4 +-
 .../kstream/internals/KTableAggregateTest.java     |  4 +-
 .../KTableKTableForeignKeyJoinScenarioTest.java    | 13 ++-----
 .../internals/KTableKTableLeftJoinTest.java        |  4 +-
 .../internals/KTableTransformValuesTest.java       |  5 +--
 .../internals/SessionWindowedKStreamImplTest.java  |  4 +-
 .../internals/TimeWindowedKStreamImplTest.java     |  5 +--
 .../TimeOrderedSessionStoreUpgradeTest.java        |  4 +-
 .../org/apache/kafka/test/StreamsTestUtils.java    | 14 +++++++
 43 files changed, 175 insertions(+), 227 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 11167c84fc1..5f0925dca91 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -356,9 +357,7 @@ public class GlobalKTableEOSIntegrationTest {
     }
     
     private void startStreams(final boolean withHeaders) {
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         startStreams(null);
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 4fc0cf85eab..b4a73ccb603 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -143,7 +144,7 @@ public class GlobalKTableIntegrationTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) 
throws Exception {
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         final KStream<String, String> streamTableJoin = 
stream.leftJoin(globalTable, keyMapper, joiner);
         streamTableJoin.process(supplier);
         produceInitialGlobalTableValues();
@@ -230,7 +231,7 @@ public class GlobalKTableIntegrationTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) 
throws Exception {
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         final KStream<String, String> streamTableJoin = 
stream.join(globalTable, keyMapper, joiner);
         streamTableJoin.process(supplier);
         produceInitialGlobalTableValues();
@@ -317,7 +318,7 @@ public class GlobalKTableIntegrationTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void shouldRestoreGlobalInMemoryKTableOnRestart(final boolean 
withHeaders) throws Exception {
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         builder = new StreamsBuilder();
         globalTable = builder.globalTable(
             globalTableTopic,
@@ -350,7 +351,7 @@ public class GlobalKTableIntegrationTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void shouldGetToRunningWithOnlyGlobalTopology(final boolean 
withHeaders) throws Exception {
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         builder = new StreamsBuilder();
         globalTable = builder.globalTable(
             globalTableTopic,
@@ -394,7 +395,7 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
             TestGlobalProcessingExceptionHandler.class);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         produceInitialGlobalTableValues();
         startStreams();
@@ -413,7 +414,7 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         produceInitialGlobalTableValues();
         assertThrows(StreamsException.class, () -> {
@@ -433,7 +434,7 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         produceInitialGlobalTableValues();
         assertThrows(StreamsException.class, () -> {
@@ -453,7 +454,7 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
@@ -475,7 +476,7 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
@@ -493,7 +494,7 @@ public class GlobalKTableIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
                 TestGlobalProcessingExceptionHandler.class);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 8f27ff9bb90..2c4307bba44 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -67,6 +67,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -2048,9 +2049,7 @@ public class IQv2StoreIntegrationTest {
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
-        if (withHeaders) {
-            config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
         return config;
     }
 }
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index b197aa73650..f8cb0267bf4 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -154,9 +155,7 @@ public class InternalTopicIntegrationTest {
         if (streamsProtocolEnabled) {
             streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
-        if (withHeaders) {
-            streamsProp.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsProp, 
withHeaders);
     }
 
     /*
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index 96544a6d474..d8a09e12ca0 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -234,7 +235,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
         final boolean clean,
         final boolean withHeaders) {
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         return getStartedStreams(streamsConfig, builder, clean);
     }
 }
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index 95a2725c8db..432cbaf7c35 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -118,7 +119,7 @@ public class JoinStoreIntegrationTest {
             JoinWindows.of(ofMillis(100)),
             StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG, 
withHeaders);
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
             kafkaStreams.setStateListener((newState, oldState) -> {
                 if (newState == KafkaStreams.State.RUNNING) {
@@ -156,7 +157,7 @@ public class JoinStoreIntegrationTest {
             JoinWindows.of(ofMillis(100)),
             StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG, 
withHeaders);
 
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG);
             final Admin admin = Admin.create(ADMIN_CONFIG)) {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 254915ce880..a63cb04562e 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -134,7 +135,7 @@ public class KStreamAggregationDedupIntegrationTest {
                 .toStream()
                 .to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -168,7 +169,7 @@ public class KStreamAggregationDedupIntegrationTest {
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -207,7 +208,7 @@ public class KStreamAggregationDedupIntegrationTest {
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index baf0e7b3782..48e9ce88b9a 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -61,6 +61,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.consumer.ConsoleConsumer;
 import org.apache.kafka.tools.consumer.ConsoleConsumerOptions;
@@ -178,7 +179,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -239,7 +240,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -313,7 +314,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -365,7 +366,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -465,7 +466,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         shouldCountHelper(testInfo);
     }
@@ -479,7 +480,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         shouldCountHelper(testInfo);
     }
@@ -496,7 +497,7 @@ public class KStreamAggregationIntegrationTest {
             .count()
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -544,7 +545,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -657,7 +658,7 @@ public class KStreamAggregationIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -830,7 +831,7 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -877,7 +878,7 @@ public class KStreamAggregationIntegrationTest {
         final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -1035,7 +1036,7 @@ public class KStreamAggregationIntegrationTest {
                 latch.countDown();
             });
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
         assertTrue(latch.await(30, TimeUnit.SECONDS));
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 7782fe20348..5740103078d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -192,9 +193,9 @@ public class 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         final String innerJoinType = "INNER";
         final String queryableName = innerJoinType + "-store1";
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
 
         streams = prepareTopologyWithNonSingletonPartitions(queryableName, 
streamsConfig);
         streamsTwo = prepareTopologyWithNonSingletonPartitions(queryableName, 
streamsConfigTwo);
@@ -221,9 +222,9 @@ public class 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         final String innerJoinType = "INNER";
         final String queryableName = innerJoinType + "-store1";
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
 
         streams = prepareTopology(queryableName, streamsConfig);
         streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 19adface8c1..0675938e579 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -193,9 +194,9 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         final String queryableName = innerJoinType + "-store1";
         final String queryableNameTwo = innerJoinType + "-store2";
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree, 
withHeaders);
 
         streams = prepareTopology(queryableName, queryableNameTwo, 
streamsConfig);
         streamsTwo = prepareTopology(queryableName, queryableNameTwo, 
streamsConfigTwo);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index a86479d0617..c7e62ba24e4 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -165,8 +166,8 @@ public class KTableKTableForeignKeyJoinDistributedTest {
         final Properties streamsConfiguration1 = 
getStreamsConfiguration(safeTestName);
         final Properties streamsConfiguration2 = 
getStreamsConfiguration(safeTestName);
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration1, 
withHeaders);
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration2, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration1, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration2, 
withHeaders);
 
         //Each streams client needs to have it's own StreamsBuilder in order 
to simulate
         //a truly distributed run
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 3002ca1d638..c6daa0420c1 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -38,6 +37,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Assertions;
@@ -94,7 +94,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
                 mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
optimization)
         ));
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         return props;
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index fd42b21d5b0..77d738a156c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -26,13 +26,13 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -72,7 +72,7 @@ public class 
KTableKTableForeignKeyJoinMaterializationIntegrationTest {
     @ParameterizedTest
     @CsvSource({"false, false, false", "false, false, true", "true, false, 
false", "true, false, true", "true, true, false", "true, true, true"})
     public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean 
materialized, final boolean queryable, final boolean withHeaders) {
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         final Topology topology = getTopology(streamsConfig, "store", 
materialized, queryable);
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 738e5cb5aa3..020f6a2c315 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -348,9 +349,7 @@ public class MetricsIntegrationTest {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), 
Serdes.String()));
@@ -390,9 +389,7 @@ public class MetricsIntegrationTest {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         final Duration windowSize = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
@@ -426,9 +423,7 @@ public class MetricsIntegrationTest {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         final Duration inactivityGap = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
index 08f419e3188..4713760d623 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -139,7 +139,7 @@ public class RelaxedNullKeyRequirementJoinTest {
 
     private void initTopology() {
         final Properties props = props();
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         testDriver = new TopologyTestDriver(builder.build(), props);
 
         left = testDriver.createInputTopic(
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 1681a7cceb5..01caca64676 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -220,9 +220,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         CLUSTER.createTopics(inputTopic);
         CLUSTER.createTopics(outputTopic);
@@ -281,9 +279,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
 
         // restoring from 1000 to 4000 (committed), and then process from 4000 
to 5000 on each of the two partitions
         final int offsetLimitDelta = 1000;
@@ -346,9 +342,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
 
         // restoring from 1000 to 4000 (committed), and then process from 4000 
to 5000 on each of the two partitions
         final int offsetLimitDelta = 1000;
@@ -417,9 +411,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
 
         // restoring from 1000 to 5000, and then process from 5000 to 10000 on 
each of the two partitions
         final int offsetCheckpointed = 1000;
@@ -473,9 +465,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(builder.build(), props);
         try {
             startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -519,9 +509,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(topology, props);
 
         final CountDownLatch latch = new CountDownLatch(1);
@@ -557,9 +545,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props1.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props1.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props1, withHeaders);
         purgeLocalStreamsState(props1);
         final KafkaStreams streams1 = new KafkaStreams(builder.build(), 
props1);
 
@@ -569,9 +555,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props2.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props2.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props2, withHeaders);
         purgeLocalStreamsState(props2);
         final KafkaStreams streams2 = new KafkaStreams(builder.build(), 
props2);
 
@@ -710,9 +694,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
 
         props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
 
@@ -779,9 +761,7 @@ public class RestoreIntegrationTest {
         if (useNewProtocol) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         final KafkaStreams kafkaStreams = new 
KafkaStreams(streamsBuilder.build(), streamsConfiguration);
 
         kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 338b1b6529c..c9fabbdc053 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -152,9 +153,7 @@ public class RocksDBMetricsIntegrationTest {
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
         final StreamsBuilder builder = builderForStateStores();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index e9e53ab6cd9..6f6e65bf344 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -131,7 +132,7 @@ public class SelfJoinUpgradeIntegrationTest {
 
         final Properties props = props();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
@@ -159,7 +160,7 @@ public class SelfJoinUpgradeIntegrationTest {
         kafkaStreams = null;
 
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
@@ -199,7 +200,7 @@ public class SelfJoinUpgradeIntegrationTest {
 
         final Properties props = props();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
@@ -226,7 +227,7 @@ public class SelfJoinUpgradeIntegrationTest {
         kafkaStreams = null;
 
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index b68a5f7a9e5..562a80d0f37 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -173,7 +174,7 @@ public class SlidingWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -241,7 +242,7 @@ public class SlidingWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -333,7 +334,7 @@ public class SlidingWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index c83dfe6d10d..fe0e6adaff3 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -103,9 +104,7 @@ public class StandbyTaskCreationIntegrationTest {
         } else {
             
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         }
-        if (withHeaders) {
-            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
         return streamsConfiguration;
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index b417d84a055..7631bbfa3bc 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyS
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
@@ -681,9 +682,7 @@ public class StoreQueryIntegrationTest {
         config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
-        if (withHeaders) {
-            config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
         return config;
     }
 }
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index a74965b22b4..f6fbe1c75f8 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -18,11 +18,11 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
@@ -54,7 +54,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-selfJoin");
         streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -99,7 +99,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -153,7 +153,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-repartitioned");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -209,7 +209,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -264,7 +264,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left-repartitioned");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -321,7 +321,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -377,7 +377,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -435,7 +435,7 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-multi-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 67d663eaa49..1a9329953c9 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -18,12 +18,12 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
@@ -54,7 +54,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -95,7 +95,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -137,7 +137,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -179,7 +179,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
index 7a078947e05..ba862f5556d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
@@ -61,7 +61,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegra
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
false);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, 
Produced.with(Serdes.Long(), Serdes.String()));
 
@@ -104,7 +104,7 @@ public class StreamTableJoinWithGraceIntegrationTest 
extends AbstractJoinIntegra
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, 
true);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
@@ -135,4 +135,4 @@ public class StreamTableJoinWithGraceIntegrationTest 
extends AbstractJoinIntegra
 
         runTestWithDriver(input, expectedResult, streamsConfig, 
builder.build(streamsConfig));
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index d588f607263..94cde6824b3 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.apache.logging.log4j.Level;
@@ -133,9 +134,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
                 mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol)
             )
         );
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         return props;
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index dbb703b35e5..d6a1646dec9 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -42,6 +42,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -151,7 +152,7 @@ public class SuppressionDurabilityIntegrationTest {
         ));
 
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
         try {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 8c4000b9002..9b1aa9496a7 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.hamcrest.Matchers;
@@ -525,7 +526,7 @@ public class SuppressionIntegrationTest {
             mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath())
         ));
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         return props;
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 0a6aa71cf1f..322e9889193 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -20,12 +20,12 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
@@ -72,7 +72,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -119,7 +119,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -166,7 +166,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -213,7 +213,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         // versioned stores do not support caching, so we expect the same 
result regardless of whether caching is enabled or not
@@ -257,7 +257,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         // versioned stores do not support caching, so we expect the same 
result regardless of whether caching is enabled or not
@@ -301,7 +301,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         // versioned stores do not support caching, so we expect the same 
result regardless of whether caching is enabled or not
@@ -345,7 +345,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -392,7 +392,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -439,7 +439,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -486,7 +486,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -533,7 +533,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -580,7 +580,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
@@ -627,7 +627,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner)
                  .join(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -680,7 +680,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner)
                  .leftJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -730,7 +730,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.join(rightTable, valueJoiner)
                  .outerJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -783,7 +783,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner)
                  .join(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -833,7 +833,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner)
                  .leftJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -885,7 +885,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.leftJoin(rightTable, valueJoiner)
                  .outerJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -937,7 +937,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-inner");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner)
                  .join(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -989,7 +989,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-left");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner)
                  .leftJoin(rightTable, valueJoiner, materialized)
                  .toStream()
@@ -1043,7 +1043,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-outer");
 
-        IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
         leftTable.outerJoin(rightTable, valueJoiner)
                  .outerJoin(rightTable, valueJoiner, materialized)
                  .toStream()
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 8f6cbe213d5..1d7d8053d37 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -169,7 +170,7 @@ public class TimeWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -246,7 +247,7 @@ public class TimeWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
@@ -339,7 +340,7 @@ public class TimeWindowedKStreamIntegrationTest {
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
 
-        
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, 
withHeaders);
 
         startStreams();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 079565ea97d..832e8eb1a06 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -280,20 +280,6 @@ public class IntegrationTestUtils {
         }
     }
 
-    /**
-     * Configures the DSL store format to use headers if enabled.
-     * This is a helper method to reduce boilerplate in parameterized tests 
that test both
-     * with and without headers mode.
-     *
-     * @param streamsConfig The streams configuration properties to modify
-     * @param withHeaders Whether to enable headers mode
-     */
-    public static void maybeSetDslStoreFormatHeaders(final Properties 
streamsConfig, final boolean withHeaders) {
-        if (withHeaders) {
-            streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
-    }
-
     /**
      * @param topic          Kafka topic to write the data records to
      * @param records        Data records to write to Kafka
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
index dc6b851223d..c5b374bd96c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -83,9 +83,7 @@ public class CogroupedKStreamImplTest {
 
 
     private void setup(final boolean withHeaders) {
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 222e3e9914e..9e913daa55c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -84,9 +84,7 @@ public class KStreamKTableJoinTest {
         table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
         final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         driver = new TopologyTestDriver(builder.build(), props);
         inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
         inputTableTopic = driver.createInputTopic(tableTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
@@ -161,7 +159,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
 
@@ -179,7 +177,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
             Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
@@ -201,7 +199,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
             Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
@@ -220,7 +218,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
             Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
@@ -237,7 +235,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
             Materialized.as(Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(0))));
@@ -319,7 +317,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableC = builder.table("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
@@ -336,7 +334,7 @@ public class KStreamKTableJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
-        maybeSetDslStoreFormatHeaders(props, withHeaders);
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableC = builder.table("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
@@ -601,9 +599,4 @@ public class KStreamKTableJoinTest {
                     + "      --> none\n"
                     + "      <-- KSTREAM-SOURCE-0000000005\n\n";
 
-    private static void maybeSetDslStoreFormatHeaders(final Properties 
streamsConfig, final boolean withHeaders) {
-        if (withHeaders) {
-            streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index a64985a019f..bcb6a54d941 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
@@ -78,9 +77,7 @@ public class KStreamKTableLeftJoinTest {
         stream.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
         final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         driver = new TopologyTestDriver(builder.build(), props);
         inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
         inputTableTopic = driver.createInputTopic(tableTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f00f1806161..25d1774705c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -115,9 +115,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         // Always process
         final Properties prop = StreamsTestUtils.getStreamsConfig();
         
prop.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 0);
-        if (withHeaders) {
-            prop.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(prop, withHeaders);
         final StreamsConfig config = new StreamsConfig(prop);
 
         mockContext = new InternalMockProcessorContext<>(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index a62a86d9670..2c72add551d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -126,9 +126,7 @@ public class KStreamSlidingWindowAggregateTest {
         withCache = inputWithCache;
         emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
         emitStrategy = StrategyType.forType(type);
-        if (withHeaders) {
-            
props.put(org.apache.kafka.streams.StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
org.apache.kafka.streams.StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
     }
 
     @ParameterizedTest
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 35bdbee4d1d..653039db21e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -83,9 +83,7 @@ public class KTableAggregateTest {
         final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
         props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
                     
BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         return new StreamsBuilder(new TopologyConfig(new 
StreamsConfig(props)));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index 2fa32d3cbf4..a2c564fc16a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.params.ParameterizedTest;
@@ -188,9 +189,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath())
         ));
-        if (withHeaders) {
-            streamsConfig.setProperty(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, 
withHeaders);
 
         final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
         final StreamsBuilder builder = new StreamsBuilder();
@@ -374,9 +373,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         config.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        if (withHeaders) {
-            config.setProperty(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
         return new TopologyTestDriver(builder.build(), config);
     }
 
@@ -385,9 +382,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
         config.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
-        if (withHeaders) {
-            config.setProperty(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
         try (final TopologyTestDriver topologyTestDriver = new 
TopologyTestDriver(builder.build(), config)) {
             final TestInputTopic<Integer, String> aTopic = 
topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new 
StringSerializer());
             final TestInputTopic<Integer, String> bTopic = 
topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new 
StringSerializer());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index de56b921787..20c6811ddbd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -74,9 +74,7 @@ public class KTableKTableLeftJoinTest {
 
     private StreamsBuilder createStreamBuilderInMemory(final boolean 
withHeaders) {
         props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, 
BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         return new StreamsBuilder(new TopologyConfig(new 
StreamsConfig(props)));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 7c84d4437fe..75a64d580ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -535,9 +536,7 @@ public class KTableTransformValuesTest {
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass().getName());
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass().getName());
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         return props;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 15ba2ede5a8..941418549f5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -94,9 +94,7 @@ public class SessionWindowedKStreamImplTest {
         type = inputType;
         final EmitStrategy emitStrategy = 
EmitStrategy.StrategyType.forType(type);
         emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
 
         final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
         this.stream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()))
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 4fcdf72ae75..f46e882ac7e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -99,9 +98,7 @@ public class TimeWindowedKStreamImplTest {
         withCache = inputWithCache;
         emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
         emitStrategy = StrategyType.forType(type);
-        if (withHeaders) {
-            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
         final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
         windowedStream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()))
             .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
index 524611f6da5..9c3ef69d7f8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -340,9 +340,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
     @MethodSource("dslStoreFormats")
     public void shouldAggregateSessionsViaDslWithOnWindowClose(final boolean 
withHeaders) {
         final Properties dslProps = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
-        if (withHeaders) {
-            dslProps.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
-        }
+        StreamsTestUtils.maybeSetDslStoreFormatHeaders(dslProps, withHeaders);
 
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
         streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java 
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 6833f023cbd..35b9f5aa3f6 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -316,6 +316,20 @@ public final class StreamsTestUtils {
         }
     }
 
+    /**
+     * Configures the DSL store format to use headers if enabled.
+     * This is a helper method to reduce boilerplate in parameterized tests 
that test both
+     * with and without headers mode.
+     *
+     * @param streamsConfig The streams configuration properties to modify
+     * @param withHeaders Whether to enable headers mode
+     */
+    public static void maybeSetDslStoreFormatHeaders(final Properties 
streamsConfig, final boolean withHeaders) {
+        if (withHeaders) {
+            streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
+    }
+
     public static class TopologyMetadataBuilder {
         private final TopologyMetadata topologyMetadata;
 

Reply via email to