This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27914247233 MINOR: Use maybeSetDslStoreFormatHeaders helper in more
streams test classes (#22057)
27914247233 is described below
commit 27914247233acae6d5093aa131212509dd9bc5a5
Author: Ken Huang <[email protected]>
AuthorDate: Thu Apr 16 13:38:54 2026 +0800
MINOR: Use maybeSetDslStoreFormatHeaders helper in more streams test
classes (#22057)
See discussion:
https://github.com/apache/kafka/pull/21820#discussion_r3051604424
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../GlobalKTableEOSIntegrationTest.java | 5 +--
.../integration/GlobalKTableIntegrationTest.java | 21 ++++++-----
.../integration/IQv2StoreIntegrationTest.java | 5 +--
.../integration/InternalTopicIntegrationTest.java | 5 +--
.../JoinGracePeriodDurabilityIntegrationTest.java | 3 +-
.../integration/JoinStoreIntegrationTest.java | 5 ++-
.../KStreamAggregationDedupIntegrationTest.java | 7 ++--
.../KStreamAggregationIntegrationTest.java | 25 ++++++------
...yInnerJoinCustomPartitionerIntegrationTest.java | 13 ++++---
...bleForeignKeyInnerJoinMultiIntegrationTest.java | 7 ++--
.../KTableKTableForeignKeyJoinDistributedTest.java | 5 ++-
.../KTableKTableForeignKeyJoinIntegrationTest.java | 4 +-
...reignKeyJoinMaterializationIntegrationTest.java | 4 +-
.../integration/MetricsIntegrationTest.java | 13 ++-----
.../RelaxedNullKeyRequirementJoinTest.java | 4 +-
.../integration/RestoreIntegrationTest.java | 40 +++++---------------
.../integration/RocksDBMetricsIntegrationTest.java | 5 +--
.../SelfJoinUpgradeIntegrationTest.java | 9 +++--
.../SlidingWindowedKStreamIntegrationTest.java | 7 ++--
.../StandbyTaskCreationIntegrationTest.java | 5 +--
.../integration/StoreQueryIntegrationTest.java | 5 +--
.../StreamStreamJoinIntegrationTest.java | 18 ++++-----
.../StreamTableJoinIntegrationTest.java | 10 ++---
.../StreamTableJoinWithGraceIntegrationTest.java | 8 ++--
...amsUncaughtExceptionHandlerIntegrationTest.java | 5 +--
.../SuppressionDurabilityIntegrationTest.java | 3 +-
.../integration/SuppressionIntegrationTest.java | 3 +-
.../integration/TableTableJoinIntegrationTest.java | 44 +++++++++++-----------
.../TimeWindowedKStreamIntegrationTest.java | 7 ++--
.../integration/utils/IntegrationTestUtils.java | 14 -------
.../internals/CogroupedKStreamImplTest.java | 4 +-
.../kstream/internals/KStreamKTableJoinTest.java | 23 ++++-------
.../internals/KStreamKTableLeftJoinTest.java | 5 +--
...KStreamSessionWindowAggregateProcessorTest.java | 4 +-
.../KStreamSlidingWindowAggregateTest.java | 4 +-
.../kstream/internals/KTableAggregateTest.java | 4 +-
.../KTableKTableForeignKeyJoinScenarioTest.java | 13 ++-----
.../internals/KTableKTableLeftJoinTest.java | 4 +-
.../internals/KTableTransformValuesTest.java | 5 +--
.../internals/SessionWindowedKStreamImplTest.java | 4 +-
.../internals/TimeWindowedKStreamImplTest.java | 5 +--
.../TimeOrderedSessionStoreUpgradeTest.java | 4 +-
.../org/apache/kafka/test/StreamsTestUtils.java | 14 +++++++
43 files changed, 175 insertions(+), 227 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 11167c84fc1..5f0925dca91 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -356,9 +357,7 @@ public class GlobalKTableEOSIntegrationTest {
}
private void startStreams(final boolean withHeaders) {
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams(null);
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 4fc0cf85eab..b4a73ccb603 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -143,7 +144,7 @@ public class GlobalKTableIntegrationTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders)
throws Exception {
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final KStream<String, String> streamTableJoin =
stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
@@ -230,7 +231,7 @@ public class GlobalKTableIntegrationTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldKStreamGlobalKTableJoin(final boolean withHeaders)
throws Exception {
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final KStream<String, String> streamTableJoin =
stream.join(globalTable, keyMapper, joiner);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
@@ -317,7 +318,7 @@ public class GlobalKTableIntegrationTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldRestoreGlobalInMemoryKTableOnRestart(final boolean
withHeaders) throws Exception {
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder = new StreamsBuilder();
globalTable = builder.globalTable(
globalTableTopic,
@@ -350,7 +351,7 @@ public class GlobalKTableIntegrationTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldGetToRunningWithOnlyGlobalTopology(final boolean
withHeaders) throws Exception {
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder = new StreamsBuilder();
globalTable = builder.globalTable(
globalTableTopic,
@@ -394,7 +395,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
produceInitialGlobalTableValues();
startStreams();
@@ -413,7 +414,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
produceInitialGlobalTableValues();
assertThrows(StreamsException.class, () -> {
@@ -433,7 +434,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
produceInitialGlobalTableValues();
assertThrows(StreamsException.class, () -> {
@@ -453,7 +454,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
@@ -475,7 +476,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
@@ -493,7 +494,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 8f27ff9bb90..2c4307bba44 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -67,6 +67,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -2048,9 +2049,7 @@ public class IQv2StoreIntegrationTest {
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
- if (withHeaders) {
- config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
return config;
}
}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index b197aa73650..f8cb0267bf4 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -154,9 +155,7 @@ public class InternalTopicIntegrationTest {
if (streamsProtocolEnabled) {
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
- if (withHeaders) {
- streamsProp.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsProp,
withHeaders);
}
/*
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index 96544a6d474..d8a09e12ca0 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -234,7 +235,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
final boolean clean,
final boolean withHeaders) {
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
return getStartedStreams(streamsConfig, builder, clean);
}
}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index 95a2725c8db..432cbaf7c35 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -118,7 +119,7 @@ public class JoinStoreIntegrationTest {
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer()).withStoreName("join-store"));
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG,
withHeaders);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
@@ -156,7 +157,7 @@ public class JoinStoreIntegrationTest {
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer()).withStoreName("join-store"));
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG,
withHeaders);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG);
final Admin admin = Admin.create(ADMIN_CONFIG)) {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 254915ce880..a63cb04562e 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -134,7 +135,7 @@ public class KStreamAggregationDedupIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(Serdes.String(),
Serdes.String()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -168,7 +169,7 @@ public class KStreamAggregationDedupIntegrationTest {
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -207,7 +208,7 @@ public class KStreamAggregationDedupIntegrationTest {
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index baf0e7b3782..48e9ce88b9a 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -61,6 +61,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.ConsoleConsumer;
import org.apache.kafka.tools.consumer.ConsoleConsumerOptions;
@@ -178,7 +179,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -239,7 +240,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -313,7 +314,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -365,7 +366,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -465,7 +466,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
shouldCountHelper(testInfo);
}
@@ -479,7 +480,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
shouldCountHelper(testInfo);
}
@@ -496,7 +497,7 @@ public class KStreamAggregationIntegrationTest {
.count()
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -544,7 +545,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -657,7 +658,7 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -830,7 +831,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -877,7 +878,7 @@ public class KStreamAggregationIntegrationTest {
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -1035,7 +1036,7 @@ public class KStreamAggregationIntegrationTest {
latch.countDown();
});
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
assertTrue(latch.await(30, TimeUnit.SECONDS));
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 7782fe20348..5740103078d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -192,9 +193,9 @@ public class
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
streams = prepareTopologyWithNonSingletonPartitions(queryableName,
streamsConfig);
streamsTwo = prepareTopologyWithNonSingletonPartitions(queryableName,
streamsConfigTwo);
@@ -221,9 +222,9 @@ public class
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
streams = prepareTopology(queryableName, streamsConfig);
streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 19adface8c1..0675938e579 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -193,9 +194,9 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
final String queryableName = innerJoinType + "-store1";
final String queryableNameTwo = innerJoinType + "-store2";
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
streams = prepareTopology(queryableName, queryableNameTwo,
streamsConfig);
streamsTwo = prepareTopology(queryableName, queryableNameTwo,
streamsConfigTwo);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index a86479d0617..c7e62ba24e4 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -165,8 +166,8 @@ public class KTableKTableForeignKeyJoinDistributedTest {
final Properties streamsConfiguration1 =
getStreamsConfiguration(safeTestName);
final Properties streamsConfiguration2 =
getStreamsConfiguration(safeTestName);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration1,
withHeaders);
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration2,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration1,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration2,
withHeaders);
//Each streams client needs to have it's own StreamsBuilder in order
to simulate
//a truly distributed run
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 3002ca1d638..c6daa0420c1 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -38,6 +37,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
@@ -94,7 +94,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
optimization)
));
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
return props;
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index fd42b21d5b0..77d738a156c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -26,13 +26,13 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -72,7 +72,7 @@ public class
KTableKTableForeignKeyJoinMaterializationIntegrationTest {
@ParameterizedTest
@CsvSource({"false, false, false", "false, false, true", "true, false,
false", "true, false, true", "true, true, false", "true, true, true"})
public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean
materialized, final boolean queryable, final boolean withHeaders) {
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final Topology topology = getTopology(streamsConfig, "store",
materialized, queryable);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 738e5cb5aa3..020f6a2c315 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -348,9 +349,7 @@ public class MetricsIntegrationTest {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(),
Serdes.String()));
@@ -390,9 +389,7 @@ public class MetricsIntegrationTest {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final Duration windowSize = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -426,9 +423,7 @@ public class MetricsIntegrationTest {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final Duration inactivityGap = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
index 08f419e3188..4713760d623 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -139,7 +139,7 @@ public class RelaxedNullKeyRequirementJoinTest {
private void initTopology() {
final Properties props = props();
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
testDriver = new TopologyTestDriver(builder.build(), props);
left = testDriver.createInputTopic(
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 1681a7cceb5..01caca64676 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -220,9 +220,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
CLUSTER.createTopics(inputTopic);
CLUSTER.createTopics(outputTopic);
@@ -281,9 +279,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
// restoring from 1000 to 4000 (committed), and then process from 4000
to 5000 on each of the two partitions
final int offsetLimitDelta = 1000;
@@ -346,9 +342,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
// restoring from 1000 to 4000 (committed), and then process from 4000
to 5000 on each of the two partitions
final int offsetLimitDelta = 1000;
@@ -417,9 +411,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
// restoring from 1000 to 5000, and then process from 5000 to 10000 on
each of the two partitions
final int offsetCheckpointed = 1000;
@@ -473,9 +465,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(builder.build(), props);
try {
startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -519,9 +509,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
@@ -557,9 +545,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props1.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props1.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props1, withHeaders);
purgeLocalStreamsState(props1);
final KafkaStreams streams1 = new KafkaStreams(builder.build(),
props1);
@@ -569,9 +555,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props2.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props2.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props2, withHeaders);
purgeLocalStreamsState(props2);
final KafkaStreams streams2 = new KafkaStreams(builder.build(),
props2);
@@ -710,9 +694,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
@@ -779,9 +761,7 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final KafkaStreams kafkaStreams = new
KafkaStreams(streamsBuilder.build(), streamsConfiguration);
kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 338b1b6529c..c9fabbdc053 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -152,9 +153,7 @@ public class RocksDBMetricsIntegrationTest {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForStateStores();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index e9e53ab6cd9..6f6e65bf344 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -131,7 +132,7 @@ public class SelfJoinUpgradeIntegrationTest {
final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
@@ -159,7 +160,7 @@ public class SelfJoinUpgradeIntegrationTest {
kafkaStreams = null;
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
@@ -199,7 +200,7 @@ public class SelfJoinUpgradeIntegrationTest {
final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
@@ -226,7 +227,7 @@ public class SelfJoinUpgradeIntegrationTest {
kafkaStreams = null;
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index b68a5f7a9e5..562a80d0f37 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -173,7 +174,7 @@ public class SlidingWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -241,7 +242,7 @@ public class SlidingWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -333,7 +334,7 @@ public class SlidingWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index c83dfe6d10d..fe0e6adaff3 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -103,9 +104,7 @@ public class StandbyTaskCreationIntegrationTest {
} else {
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
}
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
return streamsConfiguration;
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index b417d84a055..7631bbfa3bc 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyS
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -681,9 +682,7 @@ public class StoreQueryIntegrationTest {
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
- if (withHeaders) {
- config.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
return config;
}
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index a74965b22b4..f6fbe1c75f8 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -18,11 +18,11 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -54,7 +54,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-selfJoin");
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -99,7 +99,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -153,7 +153,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-repartitioned");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -209,7 +209,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -264,7 +264,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left-repartitioned");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -321,7 +321,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -377,7 +377,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -435,7 +435,7 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-multi-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 67d663eaa49..1a9329953c9 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -18,12 +18,12 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -54,7 +54,7 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -95,7 +95,7 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -137,7 +137,7 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -179,7 +179,7 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
index 7a078947e05..ba862f5556d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -61,7 +61,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends
AbstractJoinIntegra
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled,
false);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC,
Produced.with(Serdes.Long(), Serdes.String()));
@@ -104,7 +104,7 @@ public class StreamTableJoinWithGraceIntegrationTest
extends AbstractJoinIntegra
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled,
true);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -135,4 +135,4 @@ public class StreamTableJoinWithGraceIntegrationTest
extends AbstractJoinIntegra
runTestWithDriver(input, expectedResult, streamsConfig,
builder.build(streamsConfig));
}
-}
\ No newline at end of file
+}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index d588f607263..94cde6824b3 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.apache.logging.log4j.Level;
@@ -133,9 +134,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol)
)
);
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
return props;
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index dbb703b35e5..d6a1646dec9 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -42,6 +42,7 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -151,7 +152,7 @@ public class SuppressionDurabilityIntegrationTest {
));
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
try {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 8c4000b9002..9b1aa9496a7 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers;
@@ -525,7 +526,7 @@ public class SuppressionIntegrationTest {
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath())
));
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
return props;
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 0a6aa71cf1f..322e9889193 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -20,12 +20,12 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -72,7 +72,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -119,7 +119,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -166,7 +166,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -213,7 +213,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
// versioned stores do not support caching, so we expect the same
result regardless of whether caching is enabled or not
@@ -257,7 +257,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
// versioned stores do not support caching, so we expect the same
result regardless of whether caching is enabled or not
@@ -301,7 +301,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
// versioned stores do not support caching, so we expect the same
result regardless of whether caching is enabled or not
@@ -345,7 +345,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -392,7 +392,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -439,7 +439,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -486,7 +486,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -533,7 +533,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -580,7 +580,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -627,7 +627,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner)
.join(rightTable, valueJoiner, materialized)
.toStream()
@@ -680,7 +680,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner)
.leftJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -730,7 +730,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner)
.outerJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -783,7 +783,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner)
.join(rightTable, valueJoiner, materialized)
.toStream()
@@ -833,7 +833,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner)
.leftJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -885,7 +885,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner)
.outerJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -937,7 +937,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-inner");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner)
.join(rightTable, valueJoiner, materialized)
.toStream()
@@ -989,7 +989,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-left");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner)
.leftJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -1043,7 +1043,7 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-outer");
- IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner)
.outerJoin(rightTable, valueJoiner, materialized)
.toStream()
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 8f6cbe213d5..1d7d8053d37 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -169,7 +170,7 @@ public class TimeWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -246,7 +247,7 @@ public class TimeWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
@@ -339,7 +340,7 @@ public class TimeWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
-
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
startStreams();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 079565ea97d..832e8eb1a06 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -280,20 +280,6 @@ public class IntegrationTestUtils {
}
}
- /**
- * Configures the DSL store format to use headers if enabled.
- * This is a helper method to reduce boilerplate in parameterized tests
that test both
- * with and without headers mode.
- *
- * @param streamsConfig The streams configuration properties to modify
- * @param withHeaders Whether to enable headers mode
- */
- public static void maybeSetDslStoreFormatHeaders(final Properties
streamsConfig, final boolean withHeaders) {
- if (withHeaders) {
- streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
- }
-
/**
* @param topic Kafka topic to write the data records to
* @param records Data records to write to Kafka
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
index dc6b851223d..c5b374bd96c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -83,9 +83,7 @@ public class CogroupedKStreamImplTest {
private void setup(final boolean withHeaders) {
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 222e3e9914e..9e913daa55c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -84,9 +84,7 @@ public class KStreamKTableJoinTest {
table = builder.table(tableTopic, consumed);
stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
driver = new TopologyTestDriver(builder.build(), props);
inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
inputTableTopic = driver.createInputTopic(tableTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
@@ -161,7 +159,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()));
@@ -179,7 +177,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> source = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
@@ -201,7 +199,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> source = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as(Stores.persistentVersionedKeyValueStore("tableB",
Duration.ofMinutes(5))));
@@ -220,7 +218,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as(Stores.persistentVersionedKeyValueStore("tableB",
Duration.ofMinutes(5))));
@@ -237,7 +235,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> source = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as(Stores.persistentVersionedKeyValueStore("V-grace",
Duration.ofMinutes(0))));
@@ -319,7 +317,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableC = builder.table("topic3",
Consumed.with(Serdes.String(), Serdes.String()));
@@ -336,7 +334,7 @@ public class KStreamKTableJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
- maybeSetDslStoreFormatHeaders(props, withHeaders);
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableC = builder.table("topic3",
Consumed.with(Serdes.String(), Serdes.String()));
@@ -601,9 +599,4 @@ public class KStreamKTableJoinTest {
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000005\n\n";
- private static void maybeSetDslStoreFormatHeaders(final Properties
streamsConfig, final boolean withHeaders) {
- if (withHeaders) {
- streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index a64985a019f..bcb6a54d941 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
@@ -78,9 +77,7 @@ public class KStreamKTableLeftJoinTest {
stream.leftJoin(table,
MockValueJoiner.TOSTRING_JOINER).process(supplier);
final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
driver = new TopologyTestDriver(builder.build(), props);
inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
inputTableTopic = driver.createInputTopic(tableTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f00f1806161..25d1774705c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -115,9 +115,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
// Always process
final Properties prop = StreamsTestUtils.getStreamsConfig();
prop.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
0);
- if (withHeaders) {
- prop.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(prop, withHeaders);
final StreamsConfig config = new StreamsConfig(prop);
mockContext = new InternalMockProcessorContext<>(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index a62a86d9670..2c72add551d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -126,9 +126,7 @@ public class KStreamSlidingWindowAggregateTest {
withCache = inputWithCache;
emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
emitStrategy = StrategyType.forType(type);
- if (withHeaders) {
-
props.put(org.apache.kafka.streams.StreamsConfig.DSL_STORE_FORMAT_CONFIG,
org.apache.kafka.streams.StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
}
@ParameterizedTest
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 35bdbee4d1d..653039db21e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -83,9 +83,7 @@ public class KTableAggregateTest {
final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
return new StreamsBuilder(new TopologyConfig(new
StreamsConfig(props)));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index 2fa32d3cbf4..a2c564fc16a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.params.ParameterizedTest;
@@ -188,9 +189,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath())
));
- if (withHeaders) {
- streamsConfig.setProperty(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
@@ -374,9 +373,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
config.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
- if (withHeaders) {
- config.setProperty(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
return new TopologyTestDriver(builder.build(), config);
}
@@ -385,9 +382,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
- if (withHeaders) {
- config.setProperty(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
try (final TopologyTestDriver topologyTestDriver = new
TopologyTestDriver(builder.build(), config)) {
final TestInputTopic<Integer, String> aTopic =
topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new
StringSerializer());
final TestInputTopic<Integer, String> bTopic =
topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new
StringSerializer());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index de56b921787..20c6811ddbd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -74,9 +74,7 @@ public class KTableKTableLeftJoinTest {
private StreamsBuilder createStreamBuilderInMemory(final boolean
withHeaders) {
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
return new StreamsBuilder(new TopologyConfig(new
StreamsConfig(props)));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 7c84d4437fe..75a64d580ec 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -535,9 +536,7 @@ public class KTableTransformValuesTest {
props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass().getName());
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
return props;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 15ba2ede5a8..941418549f5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -94,9 +94,7 @@ public class SessionWindowedKStreamImplTest {
type = inputType;
final EmitStrategy emitStrategy =
EmitStrategy.StrategyType.forType(type);
emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
this.stream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 4fcdf72ae75..f46e882ac7e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
@@ -99,9 +98,7 @@ public class TimeWindowedKStreamImplTest {
withCache = inputWithCache;
emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
emitStrategy = StrategyType.forType(type);
- if (withHeaders) {
- props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
windowedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
index 524611f6da5..9c3ef69d7f8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -340,9 +340,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
@MethodSource("dslStoreFormats")
public void shouldAggregateSessionsViaDslWithOnWindowClose(final boolean
withHeaders) {
final Properties dslProps =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- if (withHeaders) {
- dslProps.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+ StreamsTestUtils.maybeSetDslStoreFormatHeaders(dslProps, withHeaders);
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 6833f023cbd..35b9f5aa3f6 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -316,6 +316,20 @@ public final class StreamsTestUtils {
}
}
+ /**
+ * Configures the DSL store format to use headers if enabled.
+ * This is a helper method to reduce boilerplate in parameterized tests
that test both
+ * with and without headers mode.
+ *
+ * @param streamsConfig The streams configuration properties to modify
+ * @param withHeaders Whether to enable headers mode
+ */
+ public static void maybeSetDslStoreFormatHeaders(final Properties
streamsConfig, final boolean withHeaders) {
+ if (withHeaders) {
+ streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+ }
+
public static class TopologyMetadataBuilder {
private final TopologyMetadata topologyMetadata;