This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 11f932d083c KAFKA-20158: Parameterize session store unit tests to run 
with headers-aware store format   (#21749)
11f932d083c is described below

commit 11f932d083cce15480828751db293e5742a72734
Author: Bill Bejeck <[email protected]>
AuthorDate: Sun Mar 15 14:40:33 2026 -0400

    KAFKA-20158: Parameterize session store unit tests to run with 
headers-aware store format   (#21749)
    
    Add headers-enabled variants to SessionWindowedKStreamImplTest and
    SessionWindowedCogroupedKStreamImplTest to verify session windowed
    aggregations work correctly with the DSL_STORE_FORMAT_HEADERS config.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../SessionWindowedCogroupedKStreamImplTest.java   |  47 ++++--
 .../internals/SessionWindowedKStreamImplTest.java  | 160 +++++++++++----------
 2 files changed, 120 insertions(+), 87 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index d3ad500ce2e..6e44bcd0fd5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -44,6 +45,8 @@ import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Properties;
 
@@ -83,6 +86,10 @@ public class SessionWindowedCogroupedKStreamImplTest {
         windowedCogroupedStream = 
cogroupedStream.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(100),
 ofDays(1)));
     }
 
+    private void enableHeaders() {
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+    }
+
     @Test
     public void shouldNotHaveNullInitializerOnAggregate() {
         assertThrows(NullPointerException.class, () -> 
windowedCogroupedStream.aggregate(null, sessionMerger));
@@ -163,8 +170,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
                 "      <-- foo-cogroup-agg-0\n\n"));
     }
 
-    @Test
-    public void sessionWindowAggregateTest() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void sessionWindowAggregateTest(final boolean withHeaders) {
+        if (withHeaders) {
+            enableHeaders();
+        }
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
                 
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
                 .aggregate(MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
@@ -187,8 +198,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
         }
     }
 
-    @Test
-    public void sessionWindowAggregate2Test() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void sessionWindowAggregate2Test(final boolean withHeaders) {
+        if (withHeaders) {
+            enableHeaders();
+        }
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
                 
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
                 .aggregate(MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
@@ -214,8 +229,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
 
     }
 
-    @Test
-    public void sessionWindowAggregateTest2StreamsTest() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void sessionWindowAggregateTest2StreamsTest(final boolean 
withHeaders) {
+        if (withHeaders) {
+            enableHeaders();
+        }
         final KTable<Windowed<String>, String> customers = 
windowedCogroupedStream.aggregate(
                 MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
@@ -252,8 +271,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
         }
     }
 
-    @Test
-    public void sessionWindowMixAggregatorsTest() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void sessionWindowMixAggregatorsTest(final boolean withHeaders) {
+        if (withHeaders) {
+            enableHeaders();
+        }
         final KTable<Windowed<String>, String> customers = 
windowedCogroupedStream.aggregate(
                 MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
@@ -289,8 +312,12 @@ public class SessionWindowedCogroupedKStreamImplTest {
 
     }
 
-    @Test
-    public void sessionWindowMixAggregatorsManyWindowsTest() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void sessionWindowMixAggregatorsManyWindowsTest(final boolean 
withHeaders) {
+        if (withHeaders) {
+            enableHeaders();
+        }
         final KTable<Windowed<String>, String> customers = 
windowedCogroupedStream.aggregate(
                 MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index a78818ca028..6638b7c12dc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -40,8 +40,8 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.MeteredSessionStore;
-import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -49,14 +49,17 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.stream.Stream;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
@@ -78,10 +81,22 @@ public class SessionWindowedKStreamImplTest {
 
     private boolean emitFinal;
 
-    public void setup(final EmitStrategy.StrategyType inputType) {
+    static Stream<Arguments> emitStrategyAndHeaders() {
+        return Stream.of(
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false),
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true),
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false),
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true)
+        );
+    }
+
+    public void setup(final EmitStrategy.StrategyType inputType, final boolean 
withHeaders) {
         type = inputType;
         final EmitStrategy emitStrategy = 
EmitStrategy.StrategyType.forType(type);
         emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
+        if (withHeaders) {
+            props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
 
         final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
         this.stream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()))
@@ -90,17 +105,17 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldCountSessionWindowedWithCachingDisabled(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldCountSessionWindowedWithCachingDisabled(final 
EmitStrategy.StrategyType inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
         shouldCountSessionWindowed();
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldCountSessionWindowedWithCachingEnabled(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldCountSessionWindowedWithCachingEnabled(final 
EmitStrategy.StrategyType inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         shouldCountSessionWindowed();
     }
 
@@ -141,9 +156,9 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldReduceWindowed(final EmitStrategy.StrategyType 
inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldReduceWindowed(final EmitStrategy.StrategyType 
inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
         stream.reduce(MockReducer.STRING_ADDER)
             .toStream()
@@ -180,9 +195,9 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType 
inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType 
inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
         stream.aggregate(MockInitializer.STRING_INIT,
                          MockAggregator.TOSTRING_ADDER,
@@ -221,9 +236,9 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldMaterializeCount(final EmitStrategy.StrategyType 
inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldMaterializeCount(final EmitStrategy.StrategyType 
inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         stream.count(Materialized.as("count-store"));
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
@@ -249,9 +264,9 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldMaterializeReduced(final EmitStrategy.StrategyType 
inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldMaterializeReduced(final EmitStrategy.StrategyType 
inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         stream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced"));
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
@@ -278,9 +293,9 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldMaterializeAggregated(final EmitStrategy.StrategyType 
inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldMaterializeAggregated(final EmitStrategy.StrategyType 
inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         stream.aggregate(
             MockInitializer.STRING_INIT,
             MockAggregator.TOSTRING_ADDER,
@@ -309,38 +324,33 @@ public class SessionWindowedKStreamImplTest {
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.aggregate(null, 
MockAggregator.TOSTRING_ADDER, sessionMerger));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> 
stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldThrowNullPointerOnAggregateIfMergerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> 
stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, 
null));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldThrowNullPointerOnReduceIfReducerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.reduce(null));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void 
shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void 
shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.aggregate(
             null,
             MockAggregator.TOSTRING_ADDER,
@@ -348,10 +358,9 @@ public class SessionWindowedKStreamImplTest {
             Materialized.as("store")));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void 
shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void 
shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.aggregate(
             MockInitializer.STRING_INIT,
             null,
@@ -359,10 +368,9 @@ public class SessionWindowedKStreamImplTest {
             Materialized.as("store")));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void 
shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.aggregate(
             MockInitializer.STRING_INIT,
             MockAggregator.TOSTRING_ADDER,
@@ -371,10 +379,9 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @SuppressWarnings("unchecked")
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void 
shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void 
shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.aggregate(
             MockInitializer.STRING_INIT,
             MockAggregator.TOSTRING_ADDER,
@@ -382,39 +389,35 @@ public class SessionWindowedKStreamImplTest {
             (Materialized) null));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void 
shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> stream.reduce(null, 
Materialized.as("store")));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
     @SuppressWarnings("unchecked")
-    public void 
shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void 
shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> 
stream.reduce(MockReducer.STRING_ADDER, (Materialized) null));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> 
stream.reduce(MockReducer.STRING_ADDER, (Named) null));
     }
 
-    @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldThrowNullPointerOnCountIfMaterializedIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @Test
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        setup(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false);
         assertThrows(NullPointerException.class, () -> 
stream.count((Materialized<String, Long, SessionStore<Bytes, byte[]>>) null));
     }
 
     @ParameterizedTest
-    @EnumSource(EmitStrategy.StrategyType.class)
-    public void shouldNotEnableCachingWithEmitFinal(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+    @MethodSource("emitStrategyAndHeaders")
+    public void shouldNotEnableCachingWithEmitFinal(final 
EmitStrategy.StrategyType inputType, final boolean withHeaders) {
+        setup(inputType, withHeaders);
         if (!emitFinal)
             return;
 
@@ -428,8 +431,11 @@ public class SessionWindowedKStreamImplTest {
             final StateStore store = 
driver.getAllStateStores().get("aggregated");
             final WrappedStateStore changeLogging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
             assertThat(store, instanceOf(MeteredSessionStore.class));
-            assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
-            assertThat(changeLogging.wrapped(), 
instanceOf(SessionToHeadersStoreAdapter.class));
+            if (withHeaders) {
+                assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStoreWithHeaders.class));
+            } else {
+                assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
+            }
         }
     }
 

Reply via email to