This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 433f06a1665 KAFKA-20329: Test `headers` dsl.store.format further (1/N)
(#21806)
433f06a1665 is described below
commit 433f06a1665399b30998c49ac546b2a3c016bdff
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Apr 9 00:19:18 2026 +0200
KAFKA-20329: Test `headers` dsl.store.format further (1/N) (#21806)
This PR adds withHeaders parameterization to Kafka Streams integration
Tests to verify that stream operations work correctly with both DSL
store format configurations (regular and headers-based format).
Changes: - Added `withHeaders` boolean parameter to test methods -
Tests now run with both `withHeaders=false` and `withHeaders=true` -
When `withHeaders=true`, tests configure
`StreamsConfig.DSL_STORE_FORMAT_CONFIG` to
`StreamsConfig.DSL_STORE_FORMAT_HEADERS` - For tests that produce
data in @BeforeAll, moved data production to @BeforeEach to ensure fresh
data for each parameterized test run - Added topic cleanup in
@AfterEach for proper isolation between test runs
This ensures comprehensive test coverage for foreign key joins across
different store formats.
Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
<[email protected]>
---
.../JoinGracePeriodDurabilityIntegrationTest.java | 22 +++-
.../integration/JoinStoreIntegrationTest.java | 16 ++-
.../JoinWithIncompleteMetadataIntegrationTest.java | 2 +-
.../KStreamAggregationDedupIntegrationTest.java | 24 ++--
.../KStreamAggregationIntegrationTest.java | 80 ++++++++-----
...yInnerJoinCustomPartitionerIntegrationTest.java | 25 ++--
...bleForeignKeyInnerJoinMultiIntegrationTest.java | 70 ++++++-----
.../KTableKTableForeignKeyJoinDistributedTest.java | 17 ++-
.../KTableKTableForeignKeyJoinIntegrationTest.java | 63 ++++++----
...reignKeyJoinMaterializationIntegrationTest.java | 6 +-
.../RelaxedNullKeyRequirementJoinTest.java | 40 +++++--
.../SelfJoinUpgradeIntegrationTest.java | 18 ++-
.../SlidingWindowedKStreamIntegrationTest.java | 35 ++++--
.../StreamStreamJoinIntegrationTest.java | 51 +++++---
.../StreamTableJoinIntegrationTest.java | 27 +++--
.../StreamTableJoinWithGraceIntegrationTest.java | 15 ++-
.../integration/TableTableJoinIntegrationTest.java | 129 ++++++++++++++-------
.../TimeWindowedKStreamIntegrationTest.java | 33 +++++-
.../integration/utils/IntegrationTestUtils.java | 14 +++
19 files changed, 467 insertions(+), 220 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index a2d34cb12ba..96544a6d474 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -42,9 +42,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@@ -88,8 +89,9 @@ public class JoinGracePeriodDurabilityIntegrationTest {
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final long COMMIT_INTERVAL = 100L;
- @Test
- public void shouldRecoverBufferAfterShutdown(final TestInfo testInfo) {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldRecoverBufferAfterShutdown(final boolean withHeaders,
final TestInfo testInfo) {
final String testId = safeUniqueTestName(testInfo);
final String appId = "appId_" + testId;
final String streamInput = "Streaminput" + testId;
@@ -126,7 +128,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
- KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
+ KafkaStreams driver = startStream(streamsConfig, builder, true,
withHeaders);
try {
produceSynchronouslyToPartitionZero(
tableInput,
@@ -170,7 +172,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
// restart the driver
driver.close();
assertThat(driver.state(), is(KafkaStreams.State.NOT_RUNNING));
- driver = getStartedStreams(streamsConfig, builder, false);
+ driver = startStream(streamsConfig, builder, false, withHeaders);
// flush those recovered buffered events out.
@@ -225,4 +227,14 @@ public class JoinGracePeriodDurabilityIntegrationTest {
));
IntegrationTestUtils.produceSynchronously(producerConfig, false,
topic, Optional.of(0), toProduce);
}
+
+ private KafkaStreams startStream(
+ final Properties streamsConfig,
+ final StreamsBuilder builder,
+ final boolean clean,
+ final boolean withHeaders) {
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ return getStartedStreams(streamsConfig, builder, clean);
+ }
}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index 1041f323ae1..95a2725c8db 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -39,8 +39,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Collection;
@@ -101,8 +102,9 @@ public class JoinStoreIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
- @Test
- public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueryable()
throws InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
providingAJoinStoreNameShouldNotMakeTheJoinResultQueryable(final boolean
withHeaders) throws InterruptedException {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-no-store-access");
final StreamsBuilder builder = new StreamsBuilder();
@@ -116,6 +118,7 @@ public class JoinStoreIntegrationTest {
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer()).withStoreName("join-store"));
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG,
withHeaders);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
@@ -137,8 +140,9 @@ public class JoinStoreIntegrationTest {
}
}
- @Test
- public void
streamJoinChangelogTopicShouldBeConfiguredWithDeleteOnlyCleanupPolicy() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
streamJoinChangelogTopicShouldBeConfiguredWithDeleteOnlyCleanupPolicy(final
boolean withHeaders) throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-changelog-cleanup-policy");
final StreamsBuilder builder = new StreamsBuilder();
@@ -152,6 +156,8 @@ public class JoinStoreIntegrationTest {
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer()).withStoreName("join-store"));
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(STREAMS_CONFIG,
withHeaders);
+
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG);
final Admin admin = Admin.create(ADMIN_CONFIG)) {
kafkaStreams.setStateListener((newState, oldState) -> {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 291401c82fd..36d48491ceb 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -98,7 +98,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
final String appId = APP_ID + "-" + (useNewProtocol ? "new" : "old");
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
-
+
if (useNewProtocol) {
STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index cca699b9493..254915ce880 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -48,9 +48,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@@ -124,14 +125,17 @@ public class KStreamAggregationDedupIntegrationTest {
}
- @Test
- public void shouldReduce(final TestInfo testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldReduce(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
produceMessages(System.currentTimeMillis());
groupedStream
.reduce(reducer, Materialized.as("reduce-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(),
Serdes.String()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final long timestamp = System.currentTimeMillis();
@@ -149,8 +153,9 @@ public class KStreamAggregationDedupIntegrationTest {
testInfo);
}
- @Test
- public void shouldReduceWindowed(final TestInfo testInfo) throws Exception
{
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldReduceWindowed(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
final long firstBatchTimestamp = System.currentTimeMillis() - 1000;
produceMessages(firstBatchTimestamp);
final long secondBatchTimestamp = System.currentTimeMillis();
@@ -163,6 +168,8 @@ public class KStreamAggregationDedupIntegrationTest {
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
@@ -187,8 +194,9 @@ public class KStreamAggregationDedupIntegrationTest {
);
}
- @Test
- public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldGroupByKey(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
@@ -199,6 +207,8 @@ public class KStreamAggregationDedupIntegrationTest {
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final long window = timestamp / 500 * 500;
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 738b5cbb53b..baf0e7b3782 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -70,7 +70,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
@@ -170,14 +169,17 @@ public class KStreamAggregationIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
- @Test
- public void shouldReduce(final TestInfo testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldReduce(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream
.reduce(reducer, Materialized.as("reduce-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
produceMessages(mockTime.milliseconds());
@@ -220,8 +222,9 @@ public class KStreamAggregationIntegrationTest {
return keyComparison;
}
- @Test
- public void shouldReduceWindowed(final TestInfo testInfo) throws Exception
{
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldReduceWindowed(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
mockTime.sleep(1000);
produceMessages(firstBatchTimestamp);
@@ -236,6 +239,8 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput
= receiveMessages(
@@ -297,8 +302,9 @@ public class KStreamAggregationIntegrationTest {
}
}
- @Test
- public void shouldAggregate(final TestInfo testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldAggregate(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.aggregate(
initializer,
@@ -307,6 +313,8 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
produceMessages(mockTime.milliseconds());
@@ -337,8 +345,9 @@ public class KStreamAggregationIntegrationTest {
);
}
- @Test
- public void shouldAggregateWindowed(final TestInfo testInfo) throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldAggregateWindowed(final boolean withHeaders, final
TestInfo testInfo) throws Exception {
final long firstTimestamp = mockTime.milliseconds();
mockTime.sleep(1000);
produceMessages(firstTimestamp);
@@ -356,6 +365,8 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final List<KeyValueTimestamp<Windowed<String>, Integer>>
windowedMessages = receiveMessagesWithTimestamp(
@@ -445,30 +456,37 @@ public class KStreamAggregationIntegrationTest {
);
}
- @Test
- public void shouldCount(final TestInfo testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldCount(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.count(Materialized.as("count-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
shouldCountHelper(testInfo);
}
- @Test
- public void shouldCountWithInternalStore(final TestInfo testInfo) throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldCountWithInternalStore(final boolean withHeaders, final
TestInfo testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.count()
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
shouldCountHelper(testInfo);
}
- @Test
- public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldGroupByKey(final boolean withHeaders, final TestInfo
testInfo) throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
@@ -478,6 +496,8 @@ public class KStreamAggregationIntegrationTest {
.count()
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
@@ -506,8 +526,9 @@ public class KStreamAggregationIntegrationTest {
);
}
- @Test
- public void shouldReduceSlidingWindows(final TestInfo testInfo) throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldReduceSlidingWindows(final boolean withHeaders, final
TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
final long timeDifference = 500L;
produceMessages(firstBatchTimestamp);
@@ -523,6 +544,8 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput
= receiveMessages(
@@ -613,8 +636,9 @@ public class KStreamAggregationIntegrationTest {
}
}
- @Test
- public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldAggregateSlidingWindows(final boolean withHeaders, final
TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
final long timeDifference = 500L;
produceMessages(firstBatchTimestamp);
@@ -633,6 +657,8 @@ public class KStreamAggregationIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final List<KeyValueTimestamp<Windowed<String>, Integer>>
windowedMessages = receiveMessagesWithTimestamp(
@@ -804,9 +830,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -853,9 +877,7 @@ public class KStreamAggregationIntegrationTest {
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
- if (withHeaders) {
- streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
- }
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -941,8 +963,9 @@ public class KStreamAggregationIntegrationTest {
}
}
- @Test
- public void shouldCountUnlimitedWindows() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldCountUnlimitedWindows(final boolean withHeaders) throws
Exception {
final long startTime = mockTime.milliseconds() -
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1;
final long incrementTime = Duration.ofDays(1).toMillis();
@@ -1011,6 +1034,9 @@ public class KStreamAggregationIntegrationTest {
results.put(record.key(), KeyValue.pair(record.value(),
record.timestamp()));
latch.countDown();
});
+
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
assertTrue(latch.await(30, TimeUnit.SECONDS));
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 615ebac78c1..7782fe20348 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -49,9 +49,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@@ -173,22 +174,28 @@ public class
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig,
streamsConfigTwo, streamsConfigThree));
}
- @Test
- public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldInnerJoinMultiPartitionQueryable(final boolean
withHeaders) throws Exception {
final Set<KeyValue<String, String>> expectedOne = new HashSet<>();
expectedOne.add(new KeyValue<>("ID123-1",
"value1=ID123-A1,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-2",
"value1=ID123-A2,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-3",
"value1=ID123-A3,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-4",
"value1=ID123-A4,value2=BBB"));
- verifyKTableKTableJoin(expectedOne);
+ verifyKTableKTableJoin(expectedOne, withHeaders);
}
- @Test
- public void
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions(final
boolean withHeaders) throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
+
streams = prepareTopologyWithNonSingletonPartitions(queryableName,
streamsConfig);
streamsTwo = prepareTopologyWithNonSingletonPartitions(queryableName,
streamsConfigTwo);
streamsThree =
prepareTopologyWithNonSingletonPartitions(queryableName, streamsConfigThree);
@@ -210,10 +217,14 @@ public class
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
waitForApplicationState(Arrays.asList(streams, streamsTwo,
streamsThree), KafkaStreams.State.ERROR, ofSeconds(240));
}
- private void verifyKTableKTableJoin(final Set<KeyValue<String, String>>
expectedResult) throws Exception {
+ private void verifyKTableKTableJoin(final Set<KeyValue<String, String>>
expectedResult, final boolean withHeaders) throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
+
streams = prepareTopology(queryableName, streamsConfig);
streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
streamsThree = prepareTopology(queryableName, streamsConfigThree);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 45a7f6f4534..19adface8c1 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -47,10 +47,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
-import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
@@ -88,14 +88,8 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private static final Properties PRODUCER_CONFIG_3 = new Properties();
@BeforeAll
- public static void startCluster() throws IOException, InterruptedException
{
+ public static void startCluster() throws Exception {
CLUSTER.start();
- //Use multiple partitions to ensure distribution of keys.
-
- CLUSTER.createTopic(TABLE_1, 3, 1);
- CLUSTER.createTopic(TABLE_2, 5, 1);
- CLUSTER.createTopic(TABLE_3, 7, 1);
- CLUSTER.createTopic(OUTPUT, 11, 1);
PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
@@ -112,6 +106,30 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
IntegerSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG,
"ktable-ktable-consumer");
+ CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath +
"-1");
+ streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath
+ "-2");
+ streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG,
stateDirBasePath + "-3");
+
+ //Use multiple partitions to ensure distribution of keys.
+ CLUSTER.createTopic(TABLE_1, 3, 1);
+ CLUSTER.createTopic(TABLE_2, 5, 1);
+ CLUSTER.createTopic(TABLE_3, 7, 1);
+ CLUSTER.createTopic(OUTPUT, 11, 1);
+
final List<KeyValue<Integer, Float>> table1 = asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f),
@@ -141,28 +159,10 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1,
PRODUCER_CONFIG_1, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2,
PRODUCER_CONFIG_2, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3,
PRODUCER_CONFIG_3, MOCK_TIME);
-
- CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG,
"ktable-ktable-consumer");
- CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
- CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- }
-
- @AfterAll
- public static void closeCluster() {
- CLUSTER.stop();
- }
-
- @BeforeEach
- public void before() throws IOException {
- final String stateDirBasePath = TestUtils.tempDirectory().getPath();
- streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath +
"-1");
- streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath
+ "-2");
- streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG,
stateDirBasePath + "-3");
}
@AfterEach
- public void after() throws IOException {
+ public void after() throws Exception {
if (streams != null) {
streams.close(Duration.ofSeconds(60));
streams = null;
@@ -176,21 +176,27 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig,
streamsConfigTwo, streamsConfigThree));
+ CLUSTER.deleteTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT);
}
- @Test
- public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldInnerJoinMultiPartitionQueryable(final boolean
withHeaders) throws Exception {
final Set<KeyValue<Integer, String>> expectedOne = new HashSet<>();
expectedOne.add(new KeyValue<>(1,
"value1=1.33,value2=10,value3=waffle"));
- verifyKTableKTableJoin(expectedOne);
+ verifyKTableKTableJoin(expectedOne, withHeaders);
}
- private void verifyKTableKTableJoin(final Set<KeyValue<Integer, String>>
expectedResult) throws Exception {
+ private void verifyKTableKTableJoin(final Set<KeyValue<Integer, String>>
expectedResult, final boolean withHeaders) throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
final String queryableNameTwo = innerJoinType + "-store2";
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigTwo,
withHeaders);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfigThree,
withHeaders);
+
streams = prepareTopology(queryableName, queryableNameTwo,
streamsConfig);
streamsTwo = prepareTopology(queryableName, queryableNameTwo,
streamsConfigTwo);
streamsThree = prepareTopology(queryableName, queryableNameTwo,
streamsConfigThree);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index 84d9c9d0fcd..a86479d0617 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -39,11 +39,11 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
-import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
@@ -67,7 +67,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeAll
- public static void startCluster() throws IOException, InterruptedException
{
+ public static void startCluster() throws Exception {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@@ -121,11 +121,12 @@ public class KTableKTableForeignKeyJoinDistributedTest {
}
@AfterEach
- public void after() {
+ public void after() throws InterruptedException {
client1.close(Duration.ofSeconds(60));
client2.close(Duration.ofSeconds(60));
quietlyCleanStateAfterTest(CLUSTER, client1);
quietlyCleanStateAfterTest(CLUSTER, client2);
+ CLUSTER.deleteTopics(LEFT_TABLE, RIGHT_TABLE, OUTPUT);
}
public Properties getStreamsConfiguration(final String safeTestName) {
@@ -157,12 +158,16 @@ public class KTableKTableForeignKeyJoinDistributedTest {
.to(OUTPUT);
}
- @Test
- public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo)
throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldBeInitializedWithDefaultSerde(final boolean withHeaders,
final TestInfo testInfo) throws Exception {
final String safeTestName = safeUniqueTestName(testInfo);
final Properties streamsConfiguration1 =
getStreamsConfiguration(safeTestName);
final Properties streamsConfiguration2 =
getStreamsConfiguration(safeTestName);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration1,
withHeaders);
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration2,
withHeaders);
+
//Each streams client needs to have it's own StreamsBuilder in order
to simulate
//a truly distributed run
final StreamsBuilder builder1 = new StreamsBuilder();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index fa289ca5959..3002ca1d638 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -88,11 +89,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
baseTimestamp = time.milliseconds();
}
- private static Properties getStreamsProperties(final String optimization) {
- return mkProperties(mkMap(
+ private static Properties getStreamsProperties(final String optimization,
final boolean withHeaders) {
+ final Properties props = mkProperties(mkMap(
mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
optimization)
));
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ return props;
}
// versioning is disabled for these tests, even though the code supports
building a
@@ -105,7 +108,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final List<Boolean> rejoin = Arrays.asList(true, false);
final List<Boolean> leftVersioned = Collections.singletonList(false);
final List<Boolean> rightVersioned = Collections.singletonList(false);
- return buildParameters(leftJoin, optimization, materialized, rejoin,
leftVersioned, rightVersioned);
+ final List<Boolean> withHeaders = Arrays.asList(true, false);
+ return buildParameters(leftJoin, optimization, materialized, rejoin,
leftVersioned, rightVersioned, withHeaders);
}
// optimizations and rejoin are disabled for these tests, as these tests
focus on versioning.
@@ -117,7 +121,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final List<Boolean> rejoin = Collections.singletonList(false);
final List<Boolean> leftVersioned = Arrays.asList(true, false);
final List<Boolean> rightVersioned = Arrays.asList(true, false);
- return buildParameters(leftJoin, optimization, materialized, rejoin,
leftVersioned, rightVersioned);
+ final List<Boolean> withHeaders = Arrays.asList(true, false);
+ return buildParameters(leftJoin, optimization, materialized, rejoin,
leftVersioned, rightVersioned, withHeaders);
}
// deduplicate test cases in data and versionedData
@@ -171,8 +176,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean materialized,
final boolean rejoin,
final boolean leftVersioned,
- final boolean
rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final boolean
rightVersioned,
+ final boolean withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -296,8 +302,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean
materialized,
final boolean rejoin,
final boolean
leftVersioned,
- final boolean
rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final boolean
rightVersioned,
+ final boolean
withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -369,8 +376,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean
materialized,
final boolean rejoin,
final boolean
leftVersioned,
- final boolean
rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final boolean
rightVersioned,
+ final boolean
withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -489,8 +497,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean
materialized,
final boolean
rejoin,
final boolean
leftVersioned,
- final boolean
rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final boolean
rightVersioned,
+ final boolean
withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -555,8 +564,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final
boolean materialized,
final
boolean rejoin,
final
boolean leftVersioned,
- final
boolean rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final
boolean rightVersioned,
+ final
boolean withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -587,8 +597,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final
boolean materialized,
final
boolean rejoin,
final
boolean leftVersioned,
- final
boolean rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final
boolean rightVersioned,
+ final
boolean withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -694,8 +705,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final
boolean materialized,
final
boolean rejoin,
final
boolean leftVersioned,
- final
boolean rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final
boolean rightVersioned,
+ final
boolean withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -779,8 +791,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean
materialized,
final boolean
rejoin,
final boolean
leftVersioned,
- final boolean
rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final boolean
rightVersioned,
+ final boolean
withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, true, rejoin, leftVersioned, rightVersioned, value -> null);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -806,7 +819,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean
materialized,
final boolean rejoin,
final boolean
leftVersioned,
- final boolean
rightVersioned) {
+ final boolean
rightVersioned,
+ final boolean
withHeaders) {
final Function<String, String> foreignKeyExtractor = value -> {
final String split = value.split("\\|")[1];
if (split.equals("returnNull")) {
@@ -817,7 +831,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
return split;
}
};
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, true, rejoin, leftVersioned, rightVersioned,
foreignKeyExtractor);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
@@ -1003,8 +1017,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean
materialized,
final boolean rejoin,
final boolean
leftVersioned,
- final boolean
rightVersioned) {
- final Properties streamsConfig = getStreamsProperties(optimization);
+ final boolean
rightVersioned,
+ final boolean
withHeaders) {
+ final Properties streamsConfig = getStreamsProperties(optimization,
withHeaders);
final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index e0e51b84b7b..fd42b21d5b0 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -69,8 +70,9 @@ public class
KTableKTableForeignKeyJoinMaterializationIntegrationTest {
}
@ParameterizedTest
- @CsvSource({"false, false", "true, false", "true, true"})
- public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean
materialized, final boolean queryable) {
+ @CsvSource({"false, false, false", "false, false, true", "true, false,
false", "true, false, true", "true, true, false", "true, true, true"})
+ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean
materialized, final boolean queryable, final boolean withHeaders) {
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
final Topology topology = getTopology(streamsConfig, "store",
materialized, queryable);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
index 8789dadc090..08f419e3188 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +33,8 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.Arrays;
@@ -56,6 +58,7 @@ public class RelaxedNullKeyRequirementJoinTest {
private TestInputTopic<String, String> left;
private TestInputTopic<String, String> right;
private TestOutputTopic<String, String> out;
+ private boolean withHeaders;
@BeforeEach
void beforeEach() {
@@ -69,8 +72,10 @@ public class RelaxedNullKeyRequirementJoinTest {
testDriver.close();
}
- @Test
- void testRelaxedLeftStreamStreamJoin() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testRelaxedLeftStreamStreamJoin(final boolean withHeaders) {
+ this.withHeaders = withHeaders;
leftStream
.leftJoin(rightStream, JOINER, WINDOW)
.to(OUT);
@@ -79,8 +84,10 @@ public class RelaxedNullKeyRequirementJoinTest {
assertEquals(Collections.singletonList(new KeyValue<>(null,
"leftValue|null")), out.readKeyValuesToList());
}
- @Test
- void testRelaxedLeftStreamTableJoin() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testRelaxedLeftStreamTableJoin(final boolean withHeaders) {
+ this.withHeaders = withHeaders;
leftStream
.leftJoin(rightStream.toTable(), JOINER)
.to(OUT);
@@ -89,8 +96,10 @@ public class RelaxedNullKeyRequirementJoinTest {
assertEquals(Collections.singletonList(new KeyValue<>(null,
"leftValue|null")), out.readKeyValuesToList());
}
- @Test
- void testRelaxedOuterStreamStreamJoin() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testRelaxedOuterStreamStreamJoin(final boolean withHeaders) {
+ this.withHeaders = withHeaders;
leftStream
.outerJoin(rightStream, JOINER, WINDOW)
.to(OUT);
@@ -103,8 +112,10 @@ public class RelaxedNullKeyRequirementJoinTest {
);
}
- @Test
- void testRelaxedLeftStreamGlobalTableJoin() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testRelaxedLeftStreamGlobalTableJoin(final boolean withHeaders) {
+ this.withHeaders = withHeaders;
final GlobalKTable<String, String> global =
builder.globalTable("global");
leftStream
.leftJoin(global, (key, value) -> null, JOINER)
@@ -114,8 +125,10 @@ public class RelaxedNullKeyRequirementJoinTest {
assertEquals(Collections.singletonList(new KeyValue<>(null,
"leftValue|null")), out.readKeyValuesToList());
}
- @Test
- void
testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void
testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream(final
boolean withHeaders) {
+ this.withHeaders = withHeaders;
leftStream
.repartition()
.to(OUT);
@@ -125,7 +138,10 @@ public class RelaxedNullKeyRequirementJoinTest {
}
private void initTopology() {
- testDriver = new TopologyTestDriver(builder.build(), props());
+ final Properties props = props();
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
+ testDriver = new TopologyTestDriver(builder.build(), props);
+
left = testDriver.createInputTopic(
LEFT,
new StringSerializer(),
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index c9b498d4054..e9e53ab6cd9 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -41,8 +41,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@@ -56,6 +57,7 @@ import static
org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+
@Tag("integration")
public class SelfJoinUpgradeIntegrationTest {
public static final String INPUT_TOPIC = "selfjoin-input";
@@ -112,8 +114,9 @@ public class SelfJoinUpgradeIntegrationTest {
}
- @Test
- public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldUpgradeWithTopologyOptimizationOff(final boolean
withHeaders) throws Exception {
final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
final KStream<String, String> leftOld = streamsBuilderOld.stream(
@@ -128,6 +131,7 @@ public class SelfJoinUpgradeIntegrationTest {
final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
@@ -155,6 +159,7 @@ public class SelfJoinUpgradeIntegrationTest {
kafkaStreams = null;
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
@@ -176,8 +181,9 @@ public class SelfJoinUpgradeIntegrationTest {
kafkaStreams.close();
}
- @Test
- public void shouldRestartWithTopologyOptimizationOn() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldRestartWithTopologyOptimizationOn(final boolean
withHeaders) throws Exception {
final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
final KStream<String, String> leftOld = streamsBuilderOld.stream(
@@ -193,6 +199,7 @@ public class SelfJoinUpgradeIntegrationTest {
final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
@@ -219,6 +226,7 @@ public class SelfJoinUpgradeIntegrationTest {
kafkaStreams = null;
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders);
kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
kafkaStreams.start();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 2954fa9806d..b68a5f7a9e5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -54,7 +54,8 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.time.Duration;
@@ -63,6 +64,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
+import java.util.stream.Stream;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
@@ -130,9 +132,22 @@ public class SlidingWindowedKStreamIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
+ public static Stream<Arguments> data() {
+ return Stream.of(
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, false),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, true),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, true),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, false),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, true),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, true)
+ );
+ }
+
@ParameterizedTest
- @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false",
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
- public void shouldAggregateWindowedWithNoGrace(final StrategyType
strategyType, final boolean withCache) throws Exception {
+ @MethodSource("data")
+ public void shouldAggregateWindowedWithNoGrace(final StrategyType
strategyType, final boolean withCache, final boolean withHeaders) throws
Exception {
produceMessages(
streamOneInput,
new KeyValueTimestamp<>("A", "1", 0), // Create [0, 10](0+1)
@@ -158,6 +173,8 @@ public class SlidingWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final boolean emitFinal =
strategyType.equals(StrategyType.ON_WINDOW_CLOSE);
@@ -197,8 +214,8 @@ public class SlidingWindowedKStreamIntegrationTest {
}
@ParameterizedTest
- @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false",
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
- public void shouldAggregateWindowedWithGrace(final StrategyType
strategyType, final boolean withCache) throws Exception {
+ @MethodSource("data")
+ public void shouldAggregateWindowedWithGrace(final StrategyType
strategyType, final boolean withCache, final boolean withHeaders) throws
Exception {
produceMessages(
streamOneInput,
new KeyValueTimestamp<>("A", "1", 0), // Create [0, 10](0+1)
@@ -224,6 +241,8 @@ public class SlidingWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final boolean emitFinal =
strategyType.equals(StrategyType.ON_WINDOW_CLOSE);
@@ -272,8 +291,8 @@ public class SlidingWindowedKStreamIntegrationTest {
}
@ParameterizedTest
- @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false",
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
- public void shouldRestoreAfterJoinRestart(final StrategyType strategyType,
final boolean withCache) throws Exception {
+ @MethodSource("data")
+ public void shouldRestoreAfterJoinRestart(final StrategyType strategyType,
final boolean withCache, final boolean withHeaders) throws Exception {
produceMessages(
streamOneInput,
new KeyValueTimestamp<>("A", "L1", 0),
@@ -314,6 +333,8 @@ public class SlidingWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
final boolean emitFinal =
strategyType.equals(StrategyType.ON_WINDOW_CLOSE);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index dc3d0bc6d4f..a74965b22b4 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.TestRecord;
@@ -26,7 +27,7 @@ import org.apache.kafka.test.MockMapper;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.util.Arrays;
import java.util.Collections;
@@ -45,14 +46,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
private static final String APP_ID = "stream-stream-join-integration-test";
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testSelfJoin(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testSelfJoin(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-selfJoin");
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-A",
null, 2L)),
@@ -88,14 +91,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -140,14 +145,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerRepartitioned(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerRepartitioned(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-repartitioned");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -194,14 +201,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeft(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeft(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -247,14 +256,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftRepartitioned(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftRepartitioned(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left-repartitioned");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -302,14 +313,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuter(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuter(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -356,14 +369,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterRepartitioned(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterRepartitioned(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -412,14 +427,16 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMultiInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testMultiInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KStream<Long, String> rightStream =
builder.stream(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-multi-inner");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 18ce26abe08..67d663eaa49 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -27,7 +28,7 @@ import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.time.Duration;
import java.util.Arrays;
@@ -45,13 +46,15 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
private static final String APP_ID = "stream-table-join-integration-test";
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable =
builder.table(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -84,13 +87,15 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeft(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeft(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable =
builder.table(INPUT_TOPIC_RIGHT);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -123,14 +128,16 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerWithVersionedStore(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerWithVersionedStore(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable =
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -163,14 +170,16 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftWithVersionedStore(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftWithVersionedStore(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable =
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
index 6195cbeb281..7a078947e05 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
@@ -31,7 +32,7 @@ import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.time.Duration;
import java.util.Arrays;
@@ -51,8 +52,8 @@ public class StreamTableJoinWithGraceIntegrationTest extends
AbstractJoinIntegra
Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(),
"Grace", Duration.ofMillis(2));
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerWithVersionedStore(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerWithVersionedStore(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String()));
final KTable<Long, String> rightTable =
builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.as(
@@ -60,6 +61,8 @@ public class StreamTableJoinWithGraceIntegrationTest extends
AbstractJoinIntegra
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled,
false);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
+
leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC,
Produced.with(Serdes.Long(), Serdes.String()));
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
@@ -92,14 +95,16 @@ public class StreamTableJoinWithGraceIntegrationTest
extends AbstractJoinIntegra
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftWithVersionedStore(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftWithVersionedStore(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream =
builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable =
builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled,
true);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index a7cb0871286..0a6aa71cf1f 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -29,7 +30,7 @@ import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.time.Duration;
import java.util.Arrays;
@@ -61,8 +62,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
.withLoggingDisabled();
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -70,6 +71,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -106,8 +109,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeft(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeft(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -115,6 +118,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -151,8 +156,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuter(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuter(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -160,6 +165,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -196,8 +203,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerWithVersionedStores(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerWithVersionedStores(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("left",
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -205,6 +212,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("right",
Duration.ofMinutes(5))).withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
// versioned stores do not support caching, so we expect the same
result regardless of whether caching is enabled or not
@@ -238,8 +247,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftWithVersionedStores(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftWithVersionedStores(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("left",
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -247,6 +256,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("right",
Duration.ofMinutes(5))).withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
// versioned stores do not support caching, so we expect the same
result regardless of whether caching is enabled or not
@@ -280,8 +291,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterWithVersionedStores(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterWithVersionedStores(final boolean cacheEnabled, final
boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("left",
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -289,6 +300,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("right",
Duration.ofMinutes(5))).withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
// versioned stores do not support caching, so we expect the same
result regardless of whether caching is enabled or not
@@ -322,8 +335,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerWithLeftVersionedOnly(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerWithLeftVersionedOnly(final boolean cacheEnabled,
final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("left",
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -331,6 +344,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -367,8 +382,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftWithLeftVersionedOnly(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftWithLeftVersionedOnly(final boolean cacheEnabled,
final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("left",
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -376,6 +391,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -412,8 +429,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterWithLeftVersionedOnly(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterWithLeftVersionedOnly(final boolean cacheEnabled,
final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("left",
Duration.ofMinutes(5))).withLoggingDisabled());
@@ -421,6 +438,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -457,8 +476,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerWithRightVersionedOnly(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerWithRightVersionedOnly(final boolean cacheEnabled,
final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -466,6 +485,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("right",
Duration.ofMinutes(5))).withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -502,8 +523,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftWithRightVersionedOnly(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftWithRightVersionedOnly(final boolean cacheEnabled,
final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -511,6 +532,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("right",
Duration.ofMinutes(5))).withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -547,8 +570,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterWithRightVersionedOnly(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterWithRightVersionedOnly(final boolean cacheEnabled,
final boolean withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -556,6 +579,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long,
String>as(Stores.persistentVersionedKeyValueStore("right",
Duration.ofMinutes(5))).withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner,
materialized).toStream().to(OUTPUT_TOPIC);
if (cacheEnabled) {
@@ -592,8 +617,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -601,6 +626,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner)
.join(rightTable, valueJoiner, materialized)
.toStream()
@@ -643,8 +670,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerLeft(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerLeft(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -652,6 +679,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner)
.leftJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -691,8 +720,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInnerOuter(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testInnerOuter(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -700,6 +729,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.join(rightTable, valueJoiner)
.outerJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -742,8 +773,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -751,6 +782,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner)
.join(rightTable, valueJoiner, materialized)
.toStream()
@@ -790,8 +823,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftLeft(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftLeft(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -799,6 +832,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner)
.leftJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -840,8 +875,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLeftOuter(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testLeftOuter(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -849,6 +884,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.leftJoin(rightTable, valueJoiner)
.outerJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -890,8 +927,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterInner(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterInner(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -899,6 +936,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-inner");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner)
.join(rightTable, valueJoiner, materialized)
.toStream()
@@ -940,8 +979,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterLeft(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterLeft(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -949,6 +988,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-left");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner)
.leftJoin(rightTable, valueJoiner, materialized)
.toStream()
@@ -992,8 +1033,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testOuterOuter(final boolean cacheEnabled) {
+ @CsvSource({"true, false", "true, true", "false, false", "false, true"})
+ public void testOuterOuter(final boolean cacheEnabled, final boolean
withHeaders) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> leftTable = builder.table(INPUT_TOPIC_LEFT,
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
@@ -1001,6 +1042,8 @@ public class TableTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-inner-outer");
+
+ IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig,
withHeaders);
leftTable.outerJoin(rightTable, valueJoiner)
.outerJoin(rightTable, valueJoiner, materialized)
.toStream()
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 53746cd7fd6..8f6cbe213d5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -138,8 +138,13 @@ public class TimeWindowedKStreamIntegrationTest {
}
@ParameterizedTest
- @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false",
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
- public void shouldAggregateWindowedWithNoGrace(final StrategyType type,
final boolean withCache) throws Exception {
+ @CsvSource({
+ "ON_WINDOW_UPDATE, true, false", "ON_WINDOW_UPDATE, true, true",
+ "ON_WINDOW_UPDATE, false, false", "ON_WINDOW_UPDATE, false, true",
+ "ON_WINDOW_CLOSE, true, false", "ON_WINDOW_CLOSE, true, true",
+ "ON_WINDOW_CLOSE, false, false", "ON_WINDOW_CLOSE, false, true"
+ })
+ public void shouldAggregateWindowedWithNoGrace(final StrategyType type,
final boolean withCache, final boolean withHeaders) throws Exception {
produceMessages(
streamOneInput,
new KeyValueTimestamp<>("A", "1", 0),
@@ -164,6 +169,8 @@ public class TimeWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
// on window close
@@ -208,8 +215,13 @@ public class TimeWindowedKStreamIntegrationTest {
}
@ParameterizedTest
- @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false",
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
- public void shouldAggregateWindowedWithGrace(final StrategyType type,
final boolean withCache) throws Exception {
+ @CsvSource({
+ "ON_WINDOW_UPDATE, true, false", "ON_WINDOW_UPDATE, true, true",
+ "ON_WINDOW_UPDATE, false, false", "ON_WINDOW_UPDATE, false, true",
+ "ON_WINDOW_CLOSE, true, false", "ON_WINDOW_CLOSE, true, true",
+ "ON_WINDOW_CLOSE, false, false", "ON_WINDOW_CLOSE, false, true"
+ })
+ public void shouldAggregateWindowedWithGrace(final StrategyType type,
final boolean withCache, final boolean withHeaders) throws Exception {
produceMessages(
streamOneInput,
new KeyValueTimestamp<>("A", "1", 0),
@@ -234,6 +246,8 @@ public class TimeWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
// on window close
@@ -278,8 +292,13 @@ public class TimeWindowedKStreamIntegrationTest {
}
@ParameterizedTest
- @CsvSource({"ON_WINDOW_UPDATE, true", "ON_WINDOW_UPDATE, false",
"ON_WINDOW_CLOSE, true", "ON_WINDOW_CLOSE, false"})
- public void shouldRestoreAfterJoinRestart(final StrategyType type, final
boolean withCache) throws Exception {
+ @CsvSource({
+ "ON_WINDOW_UPDATE, true, false", "ON_WINDOW_UPDATE, true, true",
+ "ON_WINDOW_UPDATE, false, false", "ON_WINDOW_UPDATE, false, true",
+ "ON_WINDOW_CLOSE, true, false", "ON_WINDOW_CLOSE, true, true",
+ "ON_WINDOW_CLOSE, false, false", "ON_WINDOW_CLOSE, false, true"
+ })
+ public void shouldRestoreAfterJoinRestart(final StrategyType type, final
boolean withCache, final boolean withHeaders) throws Exception {
produceMessages(
streamOneInput,
new KeyValueTimestamp<>("A", "L1", 0),
@@ -320,6 +339,8 @@ public class TimeWindowedKStreamIntegrationTest {
.toStream()
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration,
withHeaders);
+
startStreams();
// ON_WINDOW_CLOSE expires all records.
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 832e8eb1a06..079565ea97d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -280,6 +280,20 @@ public class IntegrationTestUtils {
}
}
+ /**
+ * Configures the DSL store format to use headers if enabled.
+ * This is a helper method to reduce boilerplate in parameterized tests
that test both
+ * with and without headers mode.
+ *
+ * @param streamsConfig The streams configuration properties to modify
+ * @param withHeaders Whether to enable headers mode
+ */
+ public static void maybeSetDslStoreFormatHeaders(final Properties
streamsConfig, final boolean withHeaders) {
+ if (withHeaders) {
+ streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+ }
+
/**
* @param topic Kafka topic to write the data records to
* @param records Data records to write to Kafka