This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 1ea96d10290 KAFKA-20422: Add DSL integration tests for header stores
with explicit suppliers (#22002)
1ea96d10290 is described below
commit 1ea96d102909f7eb3b6b500b4496871a2a1d1e3d
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Apr 21 03:51:55 2026 +0200
KAFKA-20422: Add DSL integration tests for header stores with explicit
suppliers (#22002)
Add integration tests for DSL operations with header stores via
Materialized.as().
Reviewers: Matthias J. Sax <[email protected]>
---
.../integration/PapiDslIntegrationTest.java | 246 +++++++++++++++++++++
1 file changed, 246 insertions(+)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
index 48d9c597ee4..4b8f25d9e4c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -35,15 +36,19 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
@@ -556,4 +561,245 @@ public class PapiDslIntegrationTest {
assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
}
}
+
+ @Test
+ public void
processorShouldAccessKStreamAggregatedKTableStoreAsHeadersStoreViaSupplier() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized =
+
Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"));
+
+ builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ .toStream()
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedKeyValueStoreWithHeaders<String, String>
store = context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueTimestampHeaders<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String,
ValueTimestampHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamReducedKTableStoreAsHeadersStoreViaSupplier() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized =
+
Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"));
+
+ builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .reduce(
+ (value, aggregate) -> value,
+
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ .toStream()
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedKeyValueStoreWithHeaders<String, String>
store = context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueTimestampHeaders<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String,
ValueTimestampHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamCountKTableStoreAsHeadersStoreViaSupplier() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.count(Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store")))
+ .toStream()
+ .process(() -> new ContextualProcessor<String, Long, String,
Long>() {
+ @Override
+ public void process(final Record<String, Long> record) {
+ final TimestampedKeyValueStoreWithHeaders<String, Long>
store = context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueTimestampHeaders<Long>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String,
ValueTimestampHeaders<Long>> row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
LongDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", 1L),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void
processorShouldBuildTopologyWithWindowStoreWithHeadersViaSupplier() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Materialized<String, String, WindowStore<Bytes, byte[]>>
materialized =
+
Materialized.as(Stores.persistentTimestampedWindowStoreWithHeaders("table-store",
Duration.ofHours(24L), Duration.ofHours(1L), false));
+
+ builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ .toStream()
+ .process(() -> new ContextualProcessor<Windowed<String>, String,
Windowed<String>, String>() {
+ @Override
+ public void process(final Record<Windowed<String>, String>
record) {
+ final WindowStore<String, ValueTimestampHeaders<String>>
store = context().getStateStore("table-store");
+
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic",
Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class,
Duration.ofHours(1L).toMillis()), Serdes.String()));
+
+ // Verify topology can be built and run with window headers store
supplier
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<Windowed<String>, String> outputTopic =
testDriver.createOutputTopic("output-topic", new TimeWindowedDeserializer<>(new
StringDeserializer(), Duration.ofHours(1L).toMillis()), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals("value1", outputTopic.readKeyValue().value);
+ }
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamSessionAggregatedKTableStoreAsHeadersStoreViaSupplier()
{
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Materialized<String, String, SessionStore<Bytes, byte[]>>
materialized =
+
Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store",
Duration.ofHours(1L)));
+
+ builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L)))
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ (key, left, right) -> left,
+
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ .toStream((windowedKey, value) -> windowedKey.key())
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final SessionStoreWithHeaders<String, String> store =
context().getStateStore("table-store");
+
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<String>> it = store.findSessions("key1", 0L,
Long.MAX_VALUE)) {
+ while (it.hasNext()) {
+ final KeyValue<Windowed<String>,
AggregationWithHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key.key(),
row.value.aggregation(), record.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamSessionReducedKTableStoreAsHeadersStoreViaSupplier()
{
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Materialized<String, String, SessionStore<Bytes, byte[]>>
materialized =
+
Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store",
Duration.ofHours(1L)));
+
+ builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L)))
+ .reduce(
+ (value, aggregate) -> value,
+
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ .toStream((windowedKey, value) -> windowedKey.key())
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final SessionStoreWithHeaders<String, String> store =
context().getStateStore("table-store");
+
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<String>> it = store.findSessions("key1", 0L,
Long.MAX_VALUE)) {
+ while (it.hasNext()) {
+ final KeyValue<Windowed<String>,
AggregationWithHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key.key(),
row.value.aggregation(), record.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
}