This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6c651fbfa0a KAFKA-20329: Test headers dsl.store.format further (3/N)
(#21820)
6c651fbfa0a is described below
commit 6c651fbfa0a7b661d64a3e62ccd80b1c7750ae59
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Apr 14 15:19:20 2026 +0200
KAFKA-20329: Test headers dsl.store.format further (3/N) (#21820)
This PR parametrizes 10 integration test classes (49 test methods total)
to ensure comprehensive testing coverage for both default and
header-based DSL store formats. Each test now runs twice: once with
the default store format and once with dsl.store.format=headers.
Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
<[email protected]>, TengYao Chi <[email protected]>
---
.../GlobalKTableEOSIntegrationTest.java | 36 ++++++----
.../integration/GlobalKTableIntegrationTest.java | 69 +++++++++++++------
.../integration/InternalTopicIntegrationTest.java | 31 +++++----
.../integration/MetricsIntegrationTest.java | 23 +++++--
.../integration/RestoreIntegrationTest.java | 78 +++++++++++++++-------
.../integration/RocksDBMetricsIntegrationTest.java | 9 ++-
.../StandbyTaskCreationIntegrationTest.java | 21 +++---
...amsUncaughtExceptionHandlerIntegrationTest.java | 58 ++++++++--------
.../SuppressionDurabilityIntegrationTest.java | 9 ++-
.../integration/SuppressionIntegrationTest.java | 60 ++++++++++-------
10 files changed, 249 insertions(+), 145 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index cf98c0226bf..11167c84fc1 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -48,9 +48,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@@ -136,12 +137,13 @@ public class GlobalKTableEOSIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
- @Test
- public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders)
throws Exception {
final KStream<String, String> streamTableJoin =
stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
- startStreams();
+ startStreams(withHeaders);
produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
@@ -207,12 +209,13 @@ public class GlobalKTableEOSIntegrationTest {
);
}
- @Test
- public void shouldKStreamGlobalKTableJoin() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldKStreamGlobalKTableJoin(final boolean withHeaders)
throws Exception {
final KStream<String, String> streamTableJoin =
stream.join(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
- startStreams();
+ startStreams(withHeaders);
produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
@@ -277,11 +280,12 @@ public class GlobalKTableEOSIntegrationTest {
);
}
- @Test
- public void shouldRestoreTransactionalMessages() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldRestoreTransactionalMessages(final boolean withHeaders)
throws Exception {
produceInitialGlobalTableValues();
- startStreams();
+ startStreams(withHeaders);
final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
@@ -309,13 +313,14 @@ public class GlobalKTableEOSIntegrationTest {
);
}
- @Test
- public void shouldNotRestoreAbortedMessages() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldNotRestoreAbortedMessages(final boolean withHeaders)
throws Exception {
produceAbortedMessages();
produceInitialGlobalTableValues();
produceAbortedMessages();
- startStreams();
+ startStreams(withHeaders);
final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
@@ -350,7 +355,10 @@ public class GlobalKTableEOSIntegrationTest {
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
- private void startStreams() {
+ private void startStreams(final boolean withHeaders) {
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
startStreams(null);
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 9730eff0490..4fc0cf85eab 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -54,9 +54,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@@ -139,8 +140,10 @@ public class GlobalKTableIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
- @Test
- public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders)
throws Exception {
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final KStream<String, String> streamTableJoin =
stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
@@ -224,8 +227,10 @@ public class GlobalKTableIntegrationTest {
"waiting for final values");
}
- @Test
- public void shouldKStreamGlobalKTableJoin() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldKStreamGlobalKTableJoin(final boolean withHeaders)
throws Exception {
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
final KStream<String, String> streamTableJoin =
stream.join(globalTable, keyMapper, joiner);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
@@ -309,8 +314,10 @@ public class GlobalKTableIntegrationTest {
"waiting for final values");
}
- @Test
- public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldRestoreGlobalInMemoryKTableOnRestart(final boolean
withHeaders) throws Exception {
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder = new StreamsBuilder();
globalTable = builder.globalTable(
globalTableTopic,
@@ -340,8 +347,10 @@ public class GlobalKTableIntegrationTest {
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}
- @Test
- public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldGetToRunningWithOnlyGlobalTopology(final boolean
withHeaders) throws Exception {
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder = new StreamsBuilder();
globalTable = builder.globalTable(
globalTableTopic,
@@ -375,8 +384,9 @@ public class GlobalKTableIntegrationTest {
);
}
- @Test
- public void
testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testProcessingExceptionHandlerContinueEnabledRestorationPhase(final boolean
withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = true;
@@ -384,6 +394,8 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
produceInitialGlobalTableValues();
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
@@ -391,8 +403,9 @@ public class GlobalKTableIntegrationTest {
assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
}
- @Test
- public void testProcessingExceptionHandlerFailEnabledRestorationPhase()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testProcessingExceptionHandlerFailEnabledRestorationPhase(final boolean
withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = false;
@@ -400,6 +413,8 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
produceInitialGlobalTableValues();
assertThrows(StreamsException.class, () -> {
startStreams();
@@ -408,8 +423,9 @@ public class GlobalKTableIntegrationTest {
}
- @Test
- public void testProcessingExceptionHandlerDisabledRestorationPhase()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testProcessingExceptionHandlerDisabledRestorationPhase(final
boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// disable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = false;
@@ -417,6 +433,8 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
produceInitialGlobalTableValues();
assertThrows(StreamsException.class, () -> {
startStreams();
@@ -425,8 +443,9 @@ public class GlobalKTableIntegrationTest {
}
- @Test
- public void testProcessingExceptionHandlerContinueEnabledRunTimePhase()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testProcessingExceptionHandlerContinueEnabledRunTimePhase(final boolean
withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = true;
@@ -434,6 +453,8 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
produceInitialGlobalTableValues();
@@ -445,14 +466,17 @@ public class GlobalKTableIntegrationTest {
);
}
- @Test
- public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testProcessingExceptionHandlerFailEnabledRunTimePhase(final
boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
true);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
produceInitialGlobalTableValues();
@@ -460,14 +484,17 @@ public class GlobalKTableIntegrationTest {
assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
}
- @Test
- public void testProcessingExceptionHandlerDisabledRunTimePhase() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testProcessingExceptionHandlerDisabledRunTimePhase(final
boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// disable processing exception handler invoked config
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
false);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
produceInitialGlobalTableValues();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 31e39a4c8b7..b197aa73650 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -51,7 +51,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
@@ -149,11 +149,14 @@ public class InternalTopicIntegrationTest {
return Admin.create(adminClientConfig);
}
- private void configureStreams(final boolean streamsProtocolEnabled, final
String appID) {
+ private void configureStreams(final boolean streamsProtocolEnabled, final
boolean withHeaders, final String appID) {
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
if (streamsProtocolEnabled) {
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
+ if (withHeaders) {
+ streamsProp.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
}
/*
@@ -161,10 +164,10 @@ public class InternalTopicIntegrationTest {
* for internal repartition topics. See KAFKA-10689
*/
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldGetToRunningWithWindowedTableInFKJ(final boolean
streamsProtocolEnabled) throws Exception {
- final String appID = APP_ID + "-windowed-FKJ-" +
streamsProtocolEnabled;
- configureStreams(streamsProtocolEnabled, appID);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldGetToRunningWithWindowedTableInFKJ(final boolean
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+ final String appID = APP_ID + "-windowed-FKJ-" +
streamsProtocolEnabled + "-" + withHeaders;
+ configureStreams(streamsProtocolEnabled, withHeaders, appID);
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> inputTopic =
streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
@@ -191,10 +194,10 @@ public class InternalTopicIntegrationTest {
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean
streamsProtocolEnabled) throws Exception {
- final String appID = APP_ID + "-compact-" + streamsProtocolEnabled;
- configureStreams(streamsProtocolEnabled, appID);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+ final String appID = APP_ID + "-compact-" + streamsProtocolEnabled +
"-" + withHeaders;
+ configureStreams(streamsProtocolEnabled, withHeaders, appID);
//
// Step 1: Configure and start a simple word count topology
@@ -229,10 +232,10 @@ public class InternalTopicIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final
boolean streamsProtocolEnabled) throws Exception {
- final String appID = APP_ID + "-compact-delete-" +
streamsProtocolEnabled;
- configureStreams(streamsProtocolEnabled, appID);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final
boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+ final String appID = APP_ID + "-compact-delete-" +
streamsProtocolEnabled + "-" + withHeaders;
+ configureStreams(streamsProtocolEnabled, withHeaders, appID);
//
// Step 1: Configure and start a simple word count topology
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index b31cee581b0..738e5cb5aa3 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -52,7 +52,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
@@ -343,11 +343,14 @@ public class MetricsIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldAddMetricsOnAllLevels(final boolean
streamsProtocolEnabled) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldAddMetricsOnAllLevels(final boolean
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(),
Serdes.String()));
@@ -382,11 +385,14 @@ public class MetricsIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final
boolean streamsProtocolEnabled) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final
boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
final Duration windowSize = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
@@ -415,11 +421,14 @@ public class MetricsIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldAddMetricsForSessionStore(final boolean
streamsProtocolEnabled) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldAddMetricsForSessionStore(final boolean
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
final Duration inactivityGap = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index b58dd655a2d..1681a7cceb5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -76,7 +76,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -199,8 +199,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldRestoreNullRecord(final boolean useNewProtocol) throws
Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldRestoreNullRecord(final boolean useNewProtocol, final
boolean withHeaders) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final String applicationId = appId;
@@ -220,6 +220,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
CLUSTER.createTopics(inputTopic);
CLUSTER.createTopics(outputTopic);
@@ -269,8 +272,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final
boolean useNewProtocol) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final
boolean useNewProtocol, final boolean withHeaders) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final Topology topology = new Topology();
@@ -278,6 +281,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
// restoring from 1000 to 4000 (committed), and then process from 4000
to 5000 on each of the two partitions
final int offsetLimitDelta = 1000;
@@ -330,8 +336,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean
useNewProtocol) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean
useNewProtocol, final boolean withHeaders) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
@@ -340,6 +346,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
// restoring from 1000 to 4000 (committed), and then process from 4000
to 5000 on each of the two partitions
final int offsetLimitDelta = 1000;
@@ -395,8 +404,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldRestoreStateFromChangelogTopic(final boolean
useNewProtocol) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldRestoreStateFromChangelogTopic(final boolean
useNewProtocol, final boolean withHeaders) throws Exception {
final String changelog = appId + "-store-changelog";
CLUSTER.createTopic(changelog, 2, 1);
@@ -408,6 +417,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
// restoring from 1000 to 5000, and then process from 5000 to 10000 on
each of the two partitions
final int offsetCheckpointed = 1000;
@@ -446,8 +458,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean
useNewProtocol) throws InterruptedException {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean
useNewProtocol, final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, Integer> stream = builder.stream(inputStream);
@@ -461,6 +473,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
kafkaStreams = new KafkaStreams(builder.build(), props);
try {
startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -470,8 +485,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean
useNewProtocol) throws InterruptedException {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean
useNewProtocol, final boolean withHeaders) throws InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
asList(KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
@@ -504,6 +519,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
kafkaStreams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
@@ -520,8 +538,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final
boolean useNewProtocol) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final
boolean useNewProtocol, final boolean withHeaders) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
inputStream,
@@ -539,6 +557,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props1.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props1.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
purgeLocalStreamsState(props1);
final KafkaStreams streams1 = new KafkaStreams(builder.build(),
props1);
@@ -548,6 +569,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props2.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props2.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
purgeLocalStreamsState(props2);
final KafkaStreams streams2 = new KafkaStreams(builder.build(),
props2);
@@ -603,8 +627,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldInvokeUserDefinedGlobalStateRestoreListener(final
boolean useNewProtocol) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldInvokeUserDefinedGlobalStateRestoreListener(final
boolean useNewProtocol, final boolean withHeaders) throws Exception {
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
CLUSTER.createTopic(inputTopic, 5, 1);
@@ -633,7 +657,7 @@ public class RestoreIntegrationTest {
sendEvents(inputTopic, sampleData);
- kafkaStreams = startKafkaStreams(builder, null,
kafkaStreams1Configuration, useNewProtocol);
+ kafkaStreams = startKafkaStreams(builder, null,
kafkaStreams1Configuration, useNewProtocol, withHeaders);
validateReceivedMessages(sampleData, outputTopic);
@@ -642,7 +666,7 @@ public class RestoreIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
final TestStateRestoreListener kafkaStreams1StateRestoreListener = new
TestStateRestoreListener("ks1", RESTORATION_DELAY);
- kafkaStreams = startKafkaStreams(builder,
kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol);
+ kafkaStreams = startKafkaStreams(builder,
kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol,
withHeaders);
// Ensure all the restoring tasks are in active state before starting
the new instance.
// Otherwise, the tasks which assigned to first kafka streams won't
encounter "restoring suspend" after being reassigned to the second instance.
@@ -659,7 +683,8 @@ public class RestoreIntegrationTest {
try (final KafkaStreams kafkaStreams2 = startKafkaStreams(builder,
kafkaStreams2StateRestoreListener,
kafkaStreams2Configuration,
-
useNewProtocol)) {
+
useNewProtocol,
+
withHeaders)) {
waitForCondition(() -> State.RUNNING == kafkaStreams2.state(),
90_000,
@@ -675,8 +700,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldRecordRestoreMetrics(final boolean useNewProtocol)
throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldRecordRestoreMetrics(final boolean useNewProtocol, final
boolean withHeaders) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
@@ -685,6 +710,9 @@ public class RestoreIntegrationTest {
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
@@ -745,11 +773,15 @@ public class RestoreIntegrationTest {
private KafkaStreams startKafkaStreams(final StreamsBuilder streamsBuilder,
final StateRestoreListener
stateRestoreListener,
final Map<String, Object>
extraConfiguration,
- final boolean useNewProtocol) {
+ final boolean useNewProtocol,
+ final boolean withHeaders) {
final Properties streamsConfiguration =
props(mkObjectProperties(extraConfiguration));
if (useNewProtocol) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
final KafkaStreams kafkaStreams = new
KafkaStreams(streamsBuilder.build(), streamsConfiguration);
kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index d77355831bc..338b1b6529c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -46,7 +46,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
@@ -146,12 +146,15 @@ public class RocksDBMetricsIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean
streamsProtocolEnabled, final TestInfo testInfo) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean
streamsProtocolEnabled, final boolean withHeaders, final TestInfo testInfo)
throws Exception {
final Properties streamsConfiguration = streamsConfig(testInfo);
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForStateStores();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 8c8ef3dae9c..c83dfe6d10d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -42,7 +42,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
@@ -90,7 +90,7 @@ public class StandbyTaskCreationIntegrationTest {
client2.close(Duration.ofSeconds(60));
}
- private Properties streamsConfiguration(final boolean
streamsProtocolEnabled) {
+ private Properties streamsConfiguration(final boolean
streamsProtocolEnabled, final boolean withHeaders) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -103,12 +103,15 @@ public class StandbyTaskCreationIntegrationTest {
} else {
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
}
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
return streamsConfiguration;
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean
streamsProtocolEnabled) throws Exception {
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>>
keyValueStoreBuilder =
@@ -127,7 +130,7 @@ public class StandbyTaskCreationIntegrationTest {
final Topology topology = builder.build();
- createClients(topology, streamsConfiguration(streamsProtocolEnabled),
topology, streamsConfiguration(streamsProtocolEnabled));
+ createClients(topology, streamsConfiguration(streamsProtocolEnabled,
withHeaders), topology, streamsConfiguration(streamsProtocolEnabled,
withHeaders));
setStateListenersForVerification(thread ->
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
@@ -139,11 +142,11 @@ public class StandbyTaskCreationIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean
streamsProtocolEnabled) throws Exception {
- final Properties streamsConfiguration1 =
streamsConfiguration(streamsProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean
streamsProtocolEnabled, final boolean withHeaders) throws Exception {
+ final Properties streamsConfiguration1 =
streamsConfiguration(streamsProtocolEnabled, withHeaders);
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- final Properties streamsConfiguration2 =
streamsConfiguration(streamsProtocolEnabled);
+ final Properties streamsConfiguration2 =
streamsConfiguration(streamsProtocolEnabled, withHeaders);
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 6f29b7b81f5..d588f607263 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -56,7 +56,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
@@ -114,14 +114,14 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
private Properties properties;
- private Properties basicProps(final boolean
streamsRebalanceProtocolEnabled) {
+ private Properties basicProps(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) {
final String protocol;
if (streamsRebalanceProtocolEnabled) {
protocol =
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault());
} else {
protocol =
GroupProtocol.CLASSIC.name().toLowerCase(Locale.getDefault());
}
- return mkObjectProperties(
+ final Properties props = mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
@@ -133,6 +133,10 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol)
)
);
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+ return props;
}
@BeforeEach
@@ -154,9 +158,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldShutdownClient(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldShutdownClient(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception ->
SHUTDOWN_CLIENT);
@@ -171,38 +175,38 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldReplaceThreads(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldReplaceThreads(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
testReplaceThreads(2);
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldReplaceThreadsWithoutJavaHandler(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldReplaceThreadsWithoutJavaHandler(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception
thrown"));
testReplaceThreads(2);
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldReplaceSingleThread(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldReplaceSingleThread(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
testReplaceThreads(1);
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldShutdownMultipleThreadApplication(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldShutdownMultipleThreadApplication(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
testShutdownApplication(2);
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldShutdownSingleThreadApplication(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldShutdownSingleThreadApplication(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
testShutdownApplication(1);
}
@@ -234,9 +238,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void
shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void
shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
builder.addGlobalStore(
new KeyValueStoreBuilder<>(
Stores.persistentKeyValueStore("globalStore"),
@@ -263,9 +267,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void shouldEmitSameRecordAfterFailover(final boolean
streamsRebalanceProtocolEnabled) throws Exception {
- properties = basicProps(streamsRebalanceProtocolEnabled);
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ public void shouldEmitSameRecordAfterFailover(final boolean
streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception {
+ properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders);
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 7cb8108a2de..dbb703b35e5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -47,9 +47,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,8 +102,9 @@ public class SuppressionDurabilityIntegrationTest {
private static final LongDeserializer LONG_DESERIALIZER = new
LongDeserializer();
private static final long COMMIT_INTERVAL = 100L;
- @Test
- public void shouldRecoverBufferAfterShutdown(final TestInfo testInfo) {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldRecoverBufferAfterShutdown(final boolean withHeaders,
final TestInfo testInfo) {
final String testId = safeUniqueTestName(testInfo);
final String appId = "appId_" + testId;
final String input = "input" + testId;
@@ -149,6 +151,7 @@ public class SuppressionDurabilityIntegrationTest {
));
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
try {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 3d7c141129f..8c4000b9002 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -48,8 +48,9 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Instant;
@@ -113,8 +114,9 @@ public class SuppressionIntegrationTest {
.count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("counts").withCachingDisabled());
}
- @Test
- public void shouldUseDefaultSerdes() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldUseDefaultSerdes(final boolean withHeaders) {
final String testId = "-shouldInheritSerdes";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -140,7 +142,7 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw);
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
@@ -165,8 +167,9 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldInheritSerdes() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldInheritSerdes(final boolean withHeaders) {
final String testId = "-shouldInheritSerdes";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -193,7 +196,7 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw);
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
@@ -246,8 +249,9 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldShutdownWhenRecordConstraintIsViolated() throws
InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldShutdownWhenRecordConstraintIsViolated(final boolean
withHeaders) throws InterruptedException {
final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -268,7 +272,8 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
+
final KafkaStreams driver =
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
try {
produceSynchronously(
@@ -287,8 +292,9 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldShutdownWhenBytesConstraintIsViolated() throws
InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldShutdownWhenBytesConstraintIsViolated(final boolean
withHeaders) throws InterruptedException {
final String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -310,7 +316,8 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
+
final KafkaStreams driver =
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
try {
produceSynchronously(
@@ -329,8 +336,9 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldAllowOverridingChangelogConfig() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldAllowOverridingChangelogConfig(final boolean
withHeaders) {
final String testId = "-shouldAllowOverridingChangelogConfig";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -360,7 +368,7 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw);
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
@@ -389,8 +397,9 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldCreateChangelogByDefault() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldCreateChangelogByDefault(final boolean withHeaders) {
final String testId = "-shouldCreateChangelogByDefault";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -418,7 +427,7 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw);
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
@@ -445,8 +454,9 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldAllowDisablingChangelog() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldAllowDisablingChangelog(final boolean withHeaders) {
final String testId = "-shouldAllowDisablingChangelog";
final String appId =
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
@@ -474,7 +484,7 @@ public class SuppressionIntegrationTest {
.toStream()
.to(outputRaw);
- final Properties streamsConfig = getStreamsConfig(appId);
+ final Properties streamsConfig = getStreamsConfig(appId, withHeaders);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
@@ -506,8 +516,8 @@ public class SuppressionIntegrationTest {
}
}
- private static Properties getStreamsConfig(final String appId) {
- return mkProperties(mkMap(
+ private static Properties getStreamsConfig(final String appId, final
boolean withHeaders) {
+ final Properties props = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG,
Integer.toString(COMMIT_INTERVAL)),
@@ -515,6 +525,8 @@ public class SuppressionIntegrationTest {
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath())
));
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ return props;
}
/**