This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 1ea96d10290 KAFKA-20422: Add DSL integration tests for header stores 
with explicit suppliers (#22002)
1ea96d10290 is described below

commit 1ea96d102909f7eb3b6b500b4496871a2a1d1e3d
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Apr 21 03:51:55 2026 +0200

    KAFKA-20422: Add DSL integration tests for header stores with explicit 
suppliers (#22002)
    
    Add integration tests for DSL operations with header stores via
    Materialized.as().
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../integration/PapiDslIntegrationTest.java        | 246 +++++++++++++++++++++
 1 file changed, 246 insertions(+)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
index 48d9c597ee4..4b8f25d9e4c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -35,15 +36,19 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
@@ -556,4 +561,245 @@ public class PapiDslIntegrationTest {
             assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
         }
     }
+
+    @Test
+    public void 
processorShouldAccessKStreamAggregatedKTableStoreAsHeadersStoreViaSupplier() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> 
materialized =
+            
Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"));
+
+        builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+            .toStream()
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedKeyValueStoreWithHeaders<String, String> 
store = context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueTimestampHeaders<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, 
ValueTimestampHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamReducedKTableStoreAsHeadersStoreViaSupplier() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> 
materialized =
+            
Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"));
+
+        builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .reduce(
+                (value, aggregate) -> value,
+                
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+            .toStream()
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedKeyValueStoreWithHeaders<String, String> 
store = context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueTimestampHeaders<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, 
ValueTimestampHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamCountKTableStoreAsHeadersStoreViaSupplier() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            
.count(Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store")))
+            .toStream()
+            .process(() -> new ContextualProcessor<String, Long, String, 
Long>() {
+                @Override
+                public void process(final Record<String, Long> record) {
+                    final TimestampedKeyValueStoreWithHeaders<String, Long> 
store = context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueTimestampHeaders<Long>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, 
ValueTimestampHeaders<Long>> row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, Long> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
LongDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", 1L), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void 
processorShouldBuildTopologyWithWindowStoreWithHeadersViaSupplier() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Materialized<String, String, WindowStore<Bytes, byte[]>> 
materialized =
+            
Materialized.as(Stores.persistentTimestampedWindowStoreWithHeaders("table-store",
 Duration.ofHours(24L), Duration.ofHours(1L), false));
+
+        builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+            .toStream()
+            .process(() -> new ContextualProcessor<Windowed<String>, String, 
Windowed<String>, String>() {
+                @Override
+                public void process(final Record<Windowed<String>, String> 
record) {
+                    final WindowStore<String, ValueTimestampHeaders<String>> 
store = context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", 
Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, 
Duration.ofHours(1L).toMillis()), Serdes.String()));
+
+        // Verify topology can be built and run with window headers store 
supplier
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<Windowed<String>, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new TimeWindowedDeserializer<>(new 
StringDeserializer(), Duration.ofHours(1L).toMillis()), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals("value1", outputTopic.readKeyValue().value);
+        }
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamSessionAggregatedKTableStoreAsHeadersStoreViaSupplier()
 {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Materialized<String, String, SessionStore<Bytes, byte[]>> 
materialized =
+            
Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store", 
Duration.ofHours(1L)));
+
+        builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L)))
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                (key, left, right) -> left,
+                
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+            .toStream((windowedKey, value) -> windowedKey.key())
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final SessionStoreWithHeaders<String, String> store = 
context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> it = store.findSessions("key1", 0L, 
Long.MAX_VALUE)) {
+                        while (it.hasNext()) {
+                            final KeyValue<Windowed<String>, 
AggregationWithHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key.key(), 
row.value.aggregation(), record.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamSessionReducedKTableStoreAsHeadersStoreViaSupplier()
 {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Materialized<String, String, SessionStore<Bytes, byte[]>> 
materialized =
+            
Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store", 
Duration.ofHours(1L)));
+
+        builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L)))
+            .reduce(
+                (value, aggregate) -> value,
+                
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+            .toStream((windowedKey, value) -> windowedKey.key())
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final SessionStoreWithHeaders<String, String> store = 
context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> it = store.findSessions("key1", 0L, 
Long.MAX_VALUE)) {
+                        while (it.hasNext()) {
+                            final KeyValue<Windowed<String>, 
AggregationWithHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key.key(), 
row.value.aggregation(), record.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
 }

Reply via email to