This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 11f932d083c KAFKA-20158: Parameterize session store unit tests to run
with headers-aware store format (#21749)
11f932d083c is described below
commit 11f932d083cce15480828751db293e5742a72734
Author: Bill Bejeck <[email protected]>
AuthorDate: Sun Mar 15 14:40:33 2026 -0400
KAFKA-20158: Parameterize session store unit tests to run with
headers-aware store format (#21749)
Add headers-enabled variants to SessionWindowedKStreamImplTest and
SessionWindowedCogroupedKStreamImplTest to verify session windowed
aggregations work correctly with the DSL_STORE_FORMAT_HEADERS config.
Reviewers: Matthias J. Sax <[email protected]>
---
.../SessionWindowedCogroupedKStreamImplTest.java | 47 ++++--
.../internals/SessionWindowedKStreamImplTest.java | 160 +++++++++++----------
2 files changed, 120 insertions(+), 87 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index d3ad500ce2e..6e44bcd0fd5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
@@ -44,6 +45,8 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Properties;
@@ -83,6 +86,10 @@ public class SessionWindowedCogroupedKStreamImplTest {
windowedCogroupedStream =
cogroupedStream.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(100),
ofDays(1)));
}
+ private void enableHeaders() {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+
@Test
public void shouldNotHaveNullInitializerOnAggregate() {
assertThrows(NullPointerException.class, () ->
windowedCogroupedStream.aggregate(null, sessionMerger));
@@ -163,8 +170,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
" <-- foo-cogroup-agg-0\n\n"));
}
- @Test
- public void sessionWindowAggregateTest() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void sessionWindowAggregateTest(final boolean withHeaders) {
+ if (withHeaders) {
+ enableHeaders();
+ }
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
.aggregate(MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
@@ -187,8 +198,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
}
}
- @Test
- public void sessionWindowAggregate2Test() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void sessionWindowAggregate2Test(final boolean withHeaders) {
+ if (withHeaders) {
+ enableHeaders();
+ }
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
.aggregate(MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
@@ -214,8 +229,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
}
- @Test
- public void sessionWindowAggregateTest2StreamsTest() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void sessionWindowAggregateTest2StreamsTest(final boolean
withHeaders) {
+ if (withHeaders) {
+ enableHeaders();
+ }
final KTable<Windowed<String>, String> customers =
windowedCogroupedStream.aggregate(
MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
@@ -252,8 +271,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
}
}
- @Test
- public void sessionWindowMixAggregatorsTest() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void sessionWindowMixAggregatorsTest(final boolean withHeaders) {
+ if (withHeaders) {
+ enableHeaders();
+ }
final KTable<Windowed<String>, String> customers =
windowedCogroupedStream.aggregate(
MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
@@ -289,8 +312,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
}
- @Test
- public void sessionWindowMixAggregatorsManyWindowsTest() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void sessionWindowMixAggregatorsManyWindowsTest(final boolean
withHeaders) {
+ if (withHeaders) {
+ enableHeaders();
+ }
final KTable<Windowed<String>, String> customers =
windowedCogroupedStream.aggregate(
MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index a78818ca028..6638b7c12dc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -40,8 +40,8 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
+import
org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStoreWithHeaders;
import org.apache.kafka.streams.state.internals.MeteredSessionStore;
-import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -49,14 +49,17 @@ import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.stream.Stream;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
@@ -78,10 +81,22 @@ public class SessionWindowedKStreamImplTest {
private boolean emitFinal;
- public void setup(final EmitStrategy.StrategyType inputType) {
+ static Stream<Arguments> emitStrategyAndHeaders() {
+ return Stream.of(
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false),
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true),
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false),
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true)
+ );
+ }
+
+ public void setup(final EmitStrategy.StrategyType inputType, final boolean
withHeaders) {
type = inputType;
final EmitStrategy emitStrategy =
EmitStrategy.StrategyType.forType(type);
emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
+ if (withHeaders) {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
this.stream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()))
@@ -90,17 +105,17 @@ public class SessionWindowedKStreamImplTest {
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldCountSessionWindowedWithCachingDisabled(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldCountSessionWindowedWithCachingDisabled(final
EmitStrategy.StrategyType inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
shouldCountSessionWindowed();
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldCountSessionWindowedWithCachingEnabled(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldCountSessionWindowedWithCachingEnabled(final
EmitStrategy.StrategyType inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
shouldCountSessionWindowed();
}
@@ -141,9 +156,9 @@ public class SessionWindowedKStreamImplTest {
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldReduceWindowed(final EmitStrategy.StrategyType
inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldReduceWindowed(final EmitStrategy.StrategyType
inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
stream.reduce(MockReducer.STRING_ADDER)
.toStream()
@@ -180,9 +195,9 @@ public class SessionWindowedKStreamImplTest {
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType
inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType
inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
stream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -221,9 +236,9 @@ public class SessionWindowedKStreamImplTest {
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldMaterializeCount(final EmitStrategy.StrategyType
inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldMaterializeCount(final EmitStrategy.StrategyType
inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
stream.count(Materialized.as("count-store"));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
@@ -249,9 +264,9 @@ public class SessionWindowedKStreamImplTest {
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldMaterializeReduced(final EmitStrategy.StrategyType
inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldMaterializeReduced(final EmitStrategy.StrategyType
inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
stream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced"));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
@@ -278,9 +293,9 @@ public class SessionWindowedKStreamImplTest {
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldMaterializeAggregated(final EmitStrategy.StrategyType
inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldMaterializeAggregated(final EmitStrategy.StrategyType
inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
stream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -309,38 +324,33 @@ public class SessionWindowedKStreamImplTest {
}
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.aggregate(null,
MockAggregator.TOSTRING_ADDER, sessionMerger));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () ->
stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldThrowNullPointerOnAggregateIfMergerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () ->
stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
null));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldThrowNullPointerOnReduceIfReducerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.reduce(null));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void
shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void
shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.aggregate(
null,
MockAggregator.TOSTRING_ADDER,
@@ -348,10 +358,9 @@ public class SessionWindowedKStreamImplTest {
Materialized.as("store")));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void
shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void
shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.aggregate(
MockInitializer.STRING_INIT,
null,
@@ -359,10 +368,9 @@ public class SessionWindowedKStreamImplTest {
Materialized.as("store")));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void
shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -371,10 +379,9 @@ public class SessionWindowedKStreamImplTest {
}
@SuppressWarnings("unchecked")
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void
shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void
shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -382,39 +389,35 @@ public class SessionWindowedKStreamImplTest {
(Materialized) null));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void
shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () -> stream.reduce(null,
Materialized.as("store")));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
@SuppressWarnings("unchecked")
- public void
shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void
shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () ->
stream.reduce(MockReducer.STRING_ADDER, (Materialized) null));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () ->
stream.reduce(MockReducer.STRING_ADDER, (Named) null));
}
- @ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldThrowNullPointerOnCountIfMaterializedIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @Test
+ public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+ setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
assertThrows(NullPointerException.class, () ->
stream.count((Materialized<String, Long, SessionStore<Bytes, byte[]>>) null));
}
@ParameterizedTest
- @EnumSource(EmitStrategy.StrategyType.class)
- public void shouldNotEnableCachingWithEmitFinal(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ @MethodSource("emitStrategyAndHeaders")
+ public void shouldNotEnableCachingWithEmitFinal(final
EmitStrategy.StrategyType inputType, final boolean withHeaders) {
+ setup(inputType, withHeaders);
if (!emitFinal)
return;
@@ -428,8 +431,11 @@ public class SessionWindowedKStreamImplTest {
final StateStore store =
driver.getAllStateStores().get("aggregated");
final WrappedStateStore changeLogging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredSessionStore.class));
- assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStore.class));
- assertThat(changeLogging.wrapped(),
instanceOf(SessionToHeadersStoreAdapter.class));
+ if (withHeaders) {
+ assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStoreWithHeaders.class));
+ } else {
+ assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStore.class));
+ }
}
}