This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e7c17a0fb5b KAFKA-20222: Enable SessionStore with headers in DSL
(#21600)
e7c17a0fb5b is described below
commit e7c17a0fb5b3bc7dddbef75921650395254a194d
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Mar 14 14:51:13 2026 -0400
KAFKA-20222: Enable SessionStore with headers in DSL (#21600)
Adding support for DSL integration with SessionStores with header in
support of KIP-1285.
Also adds a ReadOnlySessionStoreFacade that wraps
SessionStoreWithHeaders and strips the AggregationWithHeaders wrapper
when queried through Interactive Queries, mirroring the existing pattern
for ReadOnlyKeyValueStoreFacade and ReadOnlyWindowStoreFacade.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../integration/IQv2StoreIntegrationTest.java | 25 +--
.../KStreamAggregationIntegrationTest.java | 161 +++++++++--------
.../internals/KStreamSessionWindowAggregate.java | 37 ++--
...va => SessionCacheFlushListenerWithHeader.java} | 24 ++-
.../internals/SessionStoreMaterializer.java | 18 +-
.../streams/state/BuiltInDslStoreSuppliers.java | 6 +-
.../kafka/streams/state/DslSessionParams.java | 21 ++-
.../org/apache/kafka/streams/state/Stores.java | 10 +-
.../state/internals/GlobalStateStoreProvider.java | 3 +
.../state/internals/MeteredSessionStore.java | 7 +-
.../internals/MeteredSessionStoreWithHeaders.java | 12 ++
.../internals/ReadOnlySessionStoreFacade.java | 88 ++++++++++
.../internals/StreamThreadStateStoreProvider.java | 3 +
...KStreamSessionWindowAggregateProcessorTest.java | 42 +++--
...> SessionCacheFlushListenerWithHeaderTest.java} | 10 +-
.../internals/SessionWindowedKStreamImplTest.java | 25 ++-
.../internals/ReadOnlySessionStoreFacadeTest.java | 194 +++++++++++++++++++++
.../state/internals/SessionStoreFetchTest.java | 8 +-
.../apache/kafka/streams/TopologyTestDriver.java | 117 +++++++++++++
19 files changed, 654 insertions(+), 157 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 649799c7976..973196cfb11 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -108,6 +108,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@@ -1498,14 +1499,10 @@ public class IQv2StoreIntegrationTest {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
- assertThat(partitionResult.getFailureMessage(), is(
- "This store"
- + " (class
org.apache.kafka.streams.state.internals.MeteredSessionStore)"
- + " doesn't know how to execute the given query"
- + " (WindowRangeQuery{key=Optional.empty,
timeFrom=Optional[1970-01-01T00:00:00Z],
timeTo=Optional[1970-01-01T00:00:00Z]})"
- + " because SessionStores only support
WindowRangeQuery.withKey."
- + " Contact the store maintainer if you need support
for a new query type."
- ));
+ assertThat(partitionResult.getFailureMessage(),
+ containsString("doesn't know how to execute the given
query"));
+ assertThat(partitionResult.getFailureMessage(),
+ containsString("because SessionStores only support
WindowRangeQuery.withKey."));
}
}
}
@@ -1571,14 +1568,10 @@ public class IQv2StoreIntegrationTest {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
- assertThat(partitionResult.getFailureMessage(), is(
- "This store"
- + " (class
org.apache.kafka.streams.state.internals.MeteredSessionStore)"
- + " doesn't know how to execute the given query"
- + " (WindowRangeQuery{key=Optional.empty,
timeFrom=Optional[1970-01-01T00:00:00Z],
timeTo=Optional[1970-01-01T00:00:00Z]})"
- + " because SessionStores only support
WindowRangeQuery.withKey."
- + " Contact the store maintainer if you need support
for a new query type."
- ));
+ assertThat(partitionResult.getFailureMessage(),
+ containsString("doesn't know how to execute the given
query"));
+ assertThat(partitionResult.getFailureMessage(),
+ containsString("because SessionStores only support
WindowRangeQuery.withKey."));
}
}
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index aa283726134..8654ca75f5e 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -71,11 +74,15 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -718,8 +725,9 @@ public class KStreamAggregationIntegrationTest {
}
}
- @Test
- public void shouldCountSessionWindows() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldCountSessionWindows(final boolean withHeaders) throws
Exception {
final long sessionGap = 5 * 60 * 1000L;
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
new KeyValue<>("bob", "start"),
@@ -797,6 +805,10 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)))
@@ -819,88 +831,37 @@ public class KStreamAggregationIntegrationTest {
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(KeyValue.pair(1L, t3)));
}
- @Test
- public void shouldReduceSessionWindows() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldReduceSessionWindows(final boolean withHeaders) throws
Exception {
final long sessionGap = 1000L; // something to do with time
- final List<KeyValue<String, String>> t1Messages = Arrays.asList(
- new KeyValue<>("bob", "start"),
- new KeyValue<>("penny", "start"),
- new KeyValue<>("jo", "pause"),
- new KeyValue<>("emily", "pause")
- );
+
+ final Properties producerConfig = TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties());
final long t1 = mockTime.milliseconds();
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- t1Messages,
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t1
- );
final long t2 = t1 + (sessionGap / 2);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Collections.singletonList(
- new KeyValue<>("emily", "resume")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t2
- );
final long t3 = t1 + sessionGap + 1;
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Arrays.asList(
- new KeyValue<>("bob", "pause"),
- new KeyValue<>("penny", "stop")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t3
- );
final long t4 = t3 + (sessionGap / 2);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Arrays.asList(
- new KeyValue<>("bob", "resume"), // bobs session continues
- new KeyValue<>("jo", "resume") // jo's starts new session
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t4
- );
final long t5 = t4 - 1;
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Collections.singletonList(
- new KeyValue<>("jo", "late") // jo has late arrival
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t5
- );
+
+ produceSessionWindowData(producerConfig, withHeaders, t1, t2, t3, t4,
t5, sessionGap);
final Map<Windowed<String>, KeyValue<String, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
+ if (withHeaders) {
+ streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap),
ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2,
Materialized.as(userSessionsStore))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap),
ofMinutes(1)))
+ .reduce((value1, value2) -> value1 + ":" + value2,
Materialized.as(userSessionsStore))
.toStream()
.process(() -> (Processor<Windowed<String>, String, Object,
Object>) record -> {
results.put(record.key(), KeyValue.pair(record.value(),
record.timestamp()));
@@ -919,9 +880,61 @@ public class KStreamAggregationIntegrationTest {
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3,
t4))), equalTo(KeyValue.pair("pause:resume", t4)));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(KeyValue.pair("stop", t3)));
- // verify can query data via IQ
+ verifySessionStore(userSessionsStore, t1, t3, t4);
+
+ }
+
+ private void produceSessionWindowData(final Properties producerConfig,
+ final boolean withHeaders,
+ final long t1, final long t2, final
long t3,
+ final long t4, final long t5,
+ final long sessionGap) throws
Exception {
+ final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+ new KeyValue<>("bob", "start"),
+ new KeyValue<>("penny", "start"),
+ new KeyValue<>("jo", "pause"),
+ new KeyValue<>("emily", "pause")
+ );
+
+ produceWithOptionalHeaders(t1Messages, producerConfig, withHeaders,
"t1", t1);
+ produceWithOptionalHeaders(
+ Collections.singletonList(new KeyValue<>("emily", "resume")),
+ producerConfig, withHeaders, "t2", t2);
+ produceWithOptionalHeaders(
+ Arrays.asList(new KeyValue<>("bob", "pause"), new
KeyValue<>("penny", "stop")),
+ producerConfig, withHeaders, "t3", t3);
+ produceWithOptionalHeaders(
+ Arrays.asList(
+ new KeyValue<>("bob", "resume"), // bobs session continues
+ new KeyValue<>("jo", "resume")), // jo's starts new session
+ producerConfig, withHeaders, "t4", t4);
+ produceWithOptionalHeaders(
+ Collections.singletonList(new KeyValue<>("jo", "late")), // jo
has late arrival
+ producerConfig, withHeaders, "t5", t5);
+ }
+
+ private void produceWithOptionalHeaders(final Collection<KeyValue<String,
String>> records,
+ final Properties producerConfig,
+ final boolean withHeaders,
+ final String batchId,
+ final long timestamp) throws
Exception {
+ if (withHeaders) {
+ final Headers headers = new RecordHeaders(Arrays.asList(
+ new RecordHeader("batch",
batchId.getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("source",
"test".getBytes(StandardCharsets.UTF_8))
+ ));
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ userSessionsStream, records, producerConfig, headers,
timestamp, false);
+ } else {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ userSessionsStream, records, producerConfig, timestamp);
+ }
+ }
+
+ private void verifySessionStore(final String storeName,
+ final long t1, final long t3, final long
t4) throws Exception {
final ReadOnlySessionStore<String, String> sessionStore =
- IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams,
QueryableStoreTypes.sessionStore());
+ IntegrationTestUtils.getStore(storeName, kafkaStreams,
QueryableStoreTypes.sessionStore());
try (final KeyValueIterator<Windowed<String>, String> bob =
sessionStore.fetch("bob")) {
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob",
new SessionWindow(t1, t1)), "start")));
@@ -929,7 +942,7 @@ public class KStreamAggregationIntegrationTest {
assertFalse(bob.hasNext());
}
}
-
+
@Test
public void shouldCountUnlimitedWindows() throws Exception {
final long startTime = mockTime.milliseconds() -
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f3ca9e6740a..67730e82c89 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
@@ -36,6 +37,7 @@ import
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -105,7 +107,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
private class KStreamSessionWindowAggregateProcessor extends
ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
- private SessionStore<KIn, VAgg> store;
+ private SessionStore<KIn, AggregationWithHeaders<VAgg>> store;
private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
private Sensor droppedRecordsSensor;
private Sensor emittedRecordsSensor;
@@ -147,7 +149,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
- new SessionCacheFlushListener<>(context),
+ new SessionCacheFlushListenerWithHeader<>(context),
sendOldValues);
}
}
@@ -165,22 +167,22 @@ public class KStreamSessionWindowAggregate<KIn, VIn,
VAgg> implements KStreamAgg
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long windowCloseTime = observedStreamTime -
windows.gracePeriodMs() - windows.inactivityGap();
- final List<KeyValue<Windowed<KIn>, VAgg>> merged = new
ArrayList<>();
+ final List<KeyValue<Windowed<KIn>, AggregationWithHeaders<VAgg>>>
merged = new ArrayList<>();
final SessionWindow newSessionWindow = new
SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
VAgg agg = initializer.apply();
try (
- final KeyValueIterator<Windowed<KIn>, VAgg> iterator =
store.findSessions(
+ final KeyValueIterator<Windowed<KIn>,
AggregationWithHeaders<VAgg>> iterator = store.findSessions(
record.key(),
timestamp - windows.inactivityGap(),
timestamp + windows.inactivityGap()
)
) {
while (iterator.hasNext()) {
- final KeyValue<Windowed<KIn>, VAgg> next = iterator.next();
+ final KeyValue<Windowed<KIn>,
AggregationWithHeaders<VAgg>> next = iterator.next();
merged.add(next);
- agg = sessionMerger.apply(record.key(), agg, next.value);
+ agg = sessionMerger.apply(record.key(), agg,
AggregationWithHeaders.getAggregationOrNull(next.value));
mergedWindow = mergeSessionWindow(mergedWindow,
(SessionWindow) next.key.window());
}
}
@@ -189,16 +191,15 @@ public class KStreamSessionWindowAggregate<KIn, VIn,
VAgg> implements KStreamAgg
logSkippedRecordForExpiredWindow(timestamp, windowCloseTime,
mergedWindow);
} else {
if (!mergedWindow.equals(newSessionWindow)) {
- for (final KeyValue<Windowed<KIn>, VAgg> session : merged)
{
+ for (final KeyValue<Windowed<KIn>,
AggregationWithHeaders<VAgg>> session : merged) {
store.remove(session.key);
-
- maybeForwardUpdate(session.key, session.value, null);
+ maybeForwardUpdate(session.key,
AggregationWithHeaders.getAggregationOrNull(session.value), null);
}
}
agg = aggregator.apply(record.key(), record.value(), agg);
final Windowed<KIn> sessionKey = new Windowed<>(record.key(),
mergedWindow);
- store.put(sessionKey, agg);
+ store.put(sessionKey, AggregationWithHeaders.make(agg, new
RecordHeaders()));
maybeForwardUpdate(sessionKey, null, agg);
}
@@ -281,16 +282,16 @@ public class KStreamSessionWindowAggregate<KIn, VIn,
VAgg> implements KStreamAgg
// Only time ordered (indexed) session store should have
implemented
// this function, otherwise a not-supported exception would throw
- try (final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit =
store
- .findSessions(emitRangeLowerBound, emitRangeUpperBound)) {
+ try (final KeyValueIterator<Windowed<KIn>,
AggregationWithHeaders<VAgg>> windowToEmit =
+ store.findSessions(emitRangeLowerBound,
emitRangeUpperBound)) {
while (windowToEmit.hasNext()) {
emittedCount++;
- final KeyValue<Windowed<KIn>, VAgg> kv =
windowToEmit.next();
+ final KeyValue<Windowed<KIn>,
AggregationWithHeaders<VAgg>> kv = windowToEmit.next();
tupleForwarder.maybeForward(
record.withKey(kv.key)
- .withValue(new Change<>(kv.value, null))
+ .withValue(new
Change<>(AggregationWithHeaders.getAggregationOrNull(kv.value), null))
// set the timestamp as the window end timestamp
.withTimestamp(kv.key.window().end())
.withHeaders(record.headers()));
@@ -381,8 +382,8 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
private class KTableSessionWindowValueGetter implements
KTableValueGetter<Windowed<KIn>, VAgg> {
- private SessionStore<KIn, VAgg> store;
-
+ private SessionStore<KIn, AggregationWithHeaders<VAgg>> store;
+
@Override
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
@@ -390,8 +391,10 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
@Override
public ValueAndTimestamp<VAgg> get(final Windowed<KIn> key) {
+ final AggregationWithHeaders<VAgg> result =
+ store.fetchSession(key.key(), key.window().start(),
key.window().end());
return ValueAndTimestamp.make(
- store.fetchSession(key.key(), key.window().start(),
key.window().end()),
+ AggregationWithHeaders.getAggregationOrNull(result),
key.window().end());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java
similarity index 61%
rename from
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
rename to
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java
index af69dbab679..bc0f2a99153 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java
@@ -16,30 +16,46 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
-class SessionCacheFlushListener<KOut, VOut> implements
CacheFlushListener<Windowed<KOut>, VOut> {
+class SessionCacheFlushListenerWithHeader<KOut, VOut>
+ implements CacheFlushListener<Windowed<KOut>,
AggregationWithHeaders<VOut>> {
+
private final InternalProcessorContext<Windowed<KOut>, Change<VOut>>
context;
@SuppressWarnings("rawtypes")
private final ProcessorNode myNode;
- SessionCacheFlushListener(final ProcessorContext<Windowed<KOut>,
Change<VOut>> context) {
+ SessionCacheFlushListenerWithHeader(final ProcessorContext<Windowed<KOut>,
Change<VOut>> context) {
this.context = (InternalProcessorContext<Windowed<KOut>,
Change<VOut>>) context;
myNode = this.context.currentNode();
}
@Override
- public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
+ public void apply(final Record<Windowed<KOut>,
Change<AggregationWithHeaders<VOut>>> record) {
@SuppressWarnings("rawtypes") final ProcessorNode prev =
context.currentNode();
context.setCurrentNode(myNode);
try {
- context.forward(record.withTimestamp(record.key().window().end()));
+ final VOut newValue =
AggregationWithHeaders.getAggregationOrNull(record.value().newValue);
+ final VOut oldValue =
AggregationWithHeaders.getAggregationOrNull(record.value().oldValue);
+
+ final Headers headers = record.value().newValue != null
+ ? record.value().newValue.headers()
+ : new RecordHeaders();
+
+ context.forward(
+ record
+ .withValue(new Change<>(newValue, oldValue,
record.value().isLatest))
+ .withTimestamp(record.key().window().end())
+ .withHeaders(headers));
} finally {
context.setCurrentNode(prev);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index a5317f48880..5f1f9e409e7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -17,11 +17,13 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.state.DslSessionParams;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -57,19 +59,21 @@ public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
}
@Override
- public StoreBuilder<?> builder() {
+ public StoreBuilder<SessionStoreWithHeaders<K, V>> builder() {
+ final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
final SessionBytesStoreSupplier supplier =
materialized.storeSupplier() == null
? dslStoreSuppliers().sessionStore(new DslSessionParams(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
- emitStrategy))
+ emitStrategy,
+ storeFormat))
: (SessionBytesStoreSupplier) materialized.storeSupplier();
- final StoreBuilder<SessionStore<K, V>> builder =
Stores.sessionStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
+ final StoreBuilder<SessionStoreWithHeaders<K, V>> builder =
Stores.sessionStoreBuilderWithHeaders(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index ef81c869379..3b23b9e1f6e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -98,9 +98,13 @@ public class BuiltInDslStoreSuppliers {
return new RocksDbTimeOrderedSessionBytesStoreSupplier(
params.name(),
params.retentionPeriod().toMillis(),
- true);
+ true,
+ params.storeFormat() == DslStoreFormat.HEADERS);
}
+ if (params.storeFormat() == DslStoreFormat.HEADERS) {
+ return Stores.persistentSessionStoreWithHeaders(params.name(),
params.retentionPeriod());
+ }
return Stores.persistentSessionStore(params.name(),
params.retentionPeriod());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
b/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
index 97193de6884..16c8c523ec7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.EmitStrategy;
import java.time.Duration;
@@ -30,6 +31,7 @@ public class DslSessionParams {
private final String name;
private final Duration retentionPeriod;
private final EmitStrategy emitStrategy;
+ private final DslStoreFormat storeFormat;
/**
* @param name name of the store (cannot be {@code null})
@@ -38,15 +40,24 @@ public class DslSessionParams {
* contain the inactivity gap of the session and
the entire grace period.)
* @param emitStrategy defines how to emit results
*/
+ @Deprecated
public DslSessionParams(
final String name,
final Duration retentionPeriod,
final EmitStrategy emitStrategy
) {
+ this(name, retentionPeriod, emitStrategy, DslStoreFormat.PLAIN);
+ }
+
+ public DslSessionParams(final String name,
+ final Duration retentionPeriod,
+ final EmitStrategy emitStrategy,
+ final DslStoreFormat storeFormat) {
Objects.requireNonNull(name);
this.name = name;
this.retentionPeriod = retentionPeriod;
this.emitStrategy = emitStrategy;
+ this.storeFormat = storeFormat;
}
public String name() {
@@ -61,6 +72,10 @@ public class DslSessionParams {
return emitStrategy;
}
+ public DslStoreFormat storeFormat() {
+ return storeFormat;
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -72,12 +87,13 @@ public class DslSessionParams {
final DslSessionParams that = (DslSessionParams) o;
return Objects.equals(name, that.name)
&& Objects.equals(retentionPeriod, that.retentionPeriod)
- && Objects.equals(emitStrategy, that.emitStrategy);
+ && Objects.equals(emitStrategy, that.emitStrategy)
+ && Objects.equals(storeFormat, that.storeFormat);
}
@Override
public int hashCode() {
- return Objects.hash(name, retentionPeriod, emitStrategy);
+ return Objects.hash(name, retentionPeriod, emitStrategy, storeFormat);
}
@Override
@@ -86,6 +102,7 @@ public class DslSessionParams {
"name='" + name + '\'' +
", retentionPeriod=" + retentionPeriod +
", emitStrategy=" + emitStrategy +
+ ", storeFormat=" + storeFormat +
'}';
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 53bf38cd777..db39656b83b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -672,14 +672,16 @@ public final class Stores {
*
* @param supplier a {@link SessionBytesStoreSupplier} (cannot be
{@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is
{@code null} for put operations,
+ * it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link
SessionStoreWithHeaders}
*/
- public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>>
sessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>>
sessionStoreBuilderWithHeaders(
+ final SessionBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreBuilderWithHeaders<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 8ae592e4b4b..f15c21be5c7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -48,6 +49,8 @@ public class GlobalStateStoreProvider implements
StateStoreProvider {
return (List<T>) Collections.singletonList(new
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store));
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (List<T>) Collections.singletonList(new
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
+ } else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
+ return (List<T>) Collections.singletonList(new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store));
}
return (List<T>) Collections.singletonList(store);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index cf5e00d403b..9b4f05e44cf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
@@ -163,11 +164,15 @@ public class MeteredSessionStore<K, V>
restoreSensor.record(restoreTimeNs);
}
+ protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde,
final SerdeGetter getter) {
+ return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
+ }
+
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic =
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = StoreSerdeInitializer.prepareStoreSerde(
- context, storeName, changelogTopic, keySerde, valueSerde,
WrappingNullableUtils::prepareValueSerde);
+ context, storeName, changelogTopic, keySerde, valueSerde,
this::prepareValueSerdeForStore);
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index 7f0b8d35cf7..221cf461ba4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -53,6 +54,17 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
super(inner, metricsScope, keySerde, aggSerde, time);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Serde<AggregationWithHeaders<AGG>> prepareValueSerdeForStore(
+ final Serde<AggregationWithHeaders<AGG>> valueSerde,
+ final SerdeGetter getter) {
+ if (valueSerde == null) {
+ return new AggregationWithHeadersSerde<>((Serde<AGG>)
getter.valueSerde());
+ }
+ return super.prepareValueSerdeForStore(valueSerde, getter);
+ }
+
@Override
public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG> aggregate) {
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java
new file mode 100644
index 00000000000..9e08ba95190
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+
+public class ReadOnlySessionStoreFacade<K, V> implements
ReadOnlySessionStore<K, V> {
+ protected final SessionStoreWithHeaders<K, V> inner;
+
+ protected ReadOnlySessionStoreFacade(final SessionStoreWithHeaders<K, V>
store) {
+ inner = store;
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return new SessionStoreIteratorFacade<>(inner.findSessions(key,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return new
SessionStoreIteratorFacade<>(inner.backwardFindSessions(key,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return new SessionStoreIteratorFacade<>(inner.findSessions(keyFrom,
keyTo, earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K
keyFrom,
+ final K
keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return new
SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public V fetchSession(final K key,
+ final long sessionStartTime,
+ final long sessionEndTime) {
+ return
AggregationWithHeaders.getAggregationOrNull(inner.fetchSession(key,
sessionStartTime, sessionEndTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+ return new SessionStoreIteratorFacade<>(inner.fetch(key));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
+ return new SessionStoreIteratorFacade<>(inner.backwardFetch(key));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K
keyTo) {
+ return new SessionStoreIteratorFacade<>(inner.fetch(keyFrom, keyTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo) {
+ return new SessionStoreIteratorFacade<>(inner.backwardFetch(keyFrom,
keyTo));
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 275123e3225..f797ff0aadb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.Task;
import
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -107,6 +108,8 @@ public class StreamThreadStateStoreProvider {
return (T) new
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (T) new
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store);
+ } else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
+ return (T) new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
} else {
return (T) store;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index d6b94c7e7f9..5c18ef5662c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -38,9 +38,10 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
-import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
@@ -91,7 +92,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
private InternalMockProcessorContext<Windowed<String>, Change<Long>>
mockContext;
private KStreamSessionWindowAggregate<String, String, Long>
sessionAggregator;
private Processor<String, String, Windowed<String>, Change<Long>>
processor;
- private SessionStore<String, Long> sessionStore;
+ private SessionStoreWithHeaders<String, Long> sessionStore;
public EmitStrategy.StrategyType type;
@@ -152,8 +153,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, GAP_MS
* 3, true) :
Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
- final StoreBuilder<SessionStore<String, Long>> storeBuilder =
Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long())
- .withLoggingDisabled();
+ final StoreBuilder<SessionStoreWithHeaders<String, Long>> storeBuilder
=
+ Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(),
Serdes.Long())
+ .withLoggingDisabled();
if (enableCaching && emitStrategy.type() !=
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
storeBuilder.withCachingEnabled();
@@ -179,10 +181,10 @@ public class KStreamSessionWindowAggregateProcessorTest {
processor.process(new Record<>("john", "first", 0L));
processor.process(new Record<>("john", "second", 500L));
- try (final KeyValueIterator<Windowed<String>, Long> values =
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> values =
sessionStore.findSessions("john", 0, 2000)) {
assertTrue(values.hasNext());
- assertEquals(Long.valueOf(2), values.next().value);
+ assertEquals(Long.valueOf(2),
AggregationWithHeaders.getAggregationOrNull(values.next().value));
}
}
@@ -192,27 +194,27 @@ public class KStreamSessionWindowAggregateProcessorTest {
setup(inputType, true);
final String sessionId = "mel";
processor.process(new Record<>(sessionId, "first", 0L));
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions(sessionId, 0, 0)) {
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> iterator = sessionStore.findSessions(sessionId,
0, 0)) {
assertTrue(iterator.hasNext());
}
// move time beyond gap
processor.process(new Record<>(sessionId, "second", GAP_MS + 1));
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1)) {
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> iterator = sessionStore.findSessions(sessionId,
GAP_MS + 1, GAP_MS + 1)) {
assertTrue(iterator.hasNext());
}
// should still exist as not within gap
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions(sessionId, 0, 0)) {
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> iterator = sessionStore.findSessions(sessionId,
0, 0)) {
assertTrue(iterator.hasNext());
}
// move time back
processor.process(new Record<>(sessionId, "third", GAP_MS / 2));
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> iterator =
sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) {
- final KeyValue<Windowed<String>, Long> kv = iterator.next();
+ final KeyValue<Windowed<String>, AggregationWithHeaders<Long>> kv
= iterator.next();
- assertEquals(Long.valueOf(3), kv.value);
+ assertEquals(Long.valueOf(3),
AggregationWithHeaders.getAggregationOrNull(kv.value));
assertFalse(iterator.hasNext());
}
}
@@ -223,9 +225,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
setup(inputType, true);
processor.process(new Record<>("mel", "first", 0L));
processor.process(new Record<>("mel", "second", 0L));
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> iterator =
sessionStore.findSessions("mel", 0, 0)) {
- assertEquals(Long.valueOf(2L), iterator.next().value);
+ assertEquals(Long.valueOf(2L),
AggregationWithHeaders.getAggregationOrNull(iterator.next().value));
assertFalse(iterator.hasNext());
}
}
@@ -289,18 +291,22 @@ public class KStreamSessionWindowAggregateProcessorTest {
processor.process(new Record<>("a", "1", 0L));
// first ensure it is in the store
- try (final KeyValueIterator<Windowed<String>, Long> a1 =
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> a1 =
sessionStore.findSessions("a", 0, 0)) {
- assertEquals(KeyValue.pair(new Windowed<>("a", new
SessionWindow(0, 0)), 1L), a1.next());
+ final KeyValue<Windowed<String>, AggregationWithHeaders<Long>>
next = a1.next();
+ assertEquals(new Windowed<>("a", new SessionWindow(0, 0)),
next.key);
+ assertEquals(1L,
AggregationWithHeaders.getAggregationOrNull(next.value));
}
processor.process(new Record<>("a", "2", 100L));
// a1 from above should have been removed
// should have merged session in store
- try (final KeyValueIterator<Windowed<String>, Long> a2 =
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<Long>> a2 =
sessionStore.findSessions("a", 0, 100)) {
- assertEquals(KeyValue.pair(new Windowed<>("a", new
SessionWindow(0, 100)), 2L), a2.next());
+ final KeyValue<Windowed<String>, AggregationWithHeaders<Long>>
next = a2.next();
+ assertEquals(new Windowed<>("a", new SessionWindow(0, 100)),
next.key);
+ assertEquals(2L,
AggregationWithHeaders.getAggregationOrNull(next.value));
assertFalse(a2.hasNext());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java
similarity index 83%
rename from
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
rename to
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java
index 4d837f68fea..33487507a07 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -33,7 +35,7 @@ import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class SessionCacheFlushListenerTest {
+public class SessionCacheFlushListenerWithHeaderTest {
@Test
public void shouldForwardKeyNewValueOldValueAndTimestamp() {
@SuppressWarnings("unchecked")
@@ -44,10 +46,12 @@ public class SessionCacheFlushListenerTest {
new Change<>("newValue", "oldValue"),
73L));
- new SessionCacheFlushListener<>(context).apply(
+ new SessionCacheFlushListenerWithHeader<>(context).apply(
new Record<>(
new Windowed<>("key", new SessionWindow(21L, 73L)),
- new Change<>("newValue", "oldValue"),
+ new Change<>(
+ AggregationWithHeaders.make("newValue", new
RecordHeaders()),
+ AggregationWithHeaders.make("oldValue", new
RecordHeaders())),
42L));
verify(context, times(2)).setCurrentNode(null);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 6dbb9b8d1db..a78818ca028 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -36,10 +36,12 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
import org.apache.kafka.streams.state.internals.MeteredSessionStore;
-import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore;
+import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -227,7 +229,7 @@ public class SessionWindowedKStreamImplTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(driver);
final SessionStore<String, Long> store =
driver.getSessionStore("count-store");
- final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toListAndCloseIterator(store.fetch("1", "2"));
+ final List<KeyValue<Windowed<String>, Long>> data =
unwrapAggregations(store.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
@@ -255,7 +257,7 @@ public class SessionWindowedKStreamImplTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(driver);
final SessionStore<String, String> sessionStore =
driver.getSessionStore("reduced");
- final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2"));
+ final List<KeyValue<Windowed<String>, String>> data =
unwrapAggregations(sessionStore.fetch("1", "2"));
if (!emitFinal) {
assertThat(
@@ -288,7 +290,7 @@ public class SessionWindowedKStreamImplTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(driver);
final SessionStore<String, String> sessionStore =
driver.getSessionStore("aggregated");
- final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2"));
+ final List<KeyValue<Windowed<String>, String>> data =
unwrapAggregations(sessionStore.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
@@ -423,11 +425,11 @@ public class SessionWindowedKStreamImplTest {
Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("aggregated").withValueSerde(Serdes.String()));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
- final SessionStore<String, String> store =
driver.getSessionStore("aggregated");
+ final StateStore store =
driver.getAllStateStores().get("aggregated");
final WrappedStateStore changeLogging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredSessionStore.class));
assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStore.class));
- assertThat(changeLogging.wrapped(),
instanceOf(RocksDBTimeOrderedSessionStore.class));
+ assertThat(changeLogging.wrapped(),
instanceOf(SessionToHeadersStoreAdapter.class));
}
}
@@ -440,4 +442,15 @@ public class SessionWindowedKStreamImplTest {
inputTopic.pipeInput("2", "1", 600);
inputTopic.pipeInput("2", "2", 599);
}
+
+ private <V> List<KeyValue<Windowed<String>, V>> unwrapAggregations(
+ final KeyValueIterator<Windowed<String>, V> iterator) {
+ final List<KeyValue<Windowed<String>, V>> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>, V> next = iterator.next();
+ result.add(KeyValue.pair(next.key, next.value));
+ }
+ iterator.close();
+ return result;
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java
new file mode 100644
index 00000000000..f0efd857f71
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class ReadOnlySessionStoreFacadeTest {
+ @Mock
+ private SessionStoreWithHeaders<String, String>
mockedSessionStoreWithHeaders;
+ @Mock
+ private KeyValueIterator<Windowed<String>, AggregationWithHeaders<String>>
mockedIterator;
+
+ private ReadOnlySessionStoreFacade<String, String>
readOnlySessionStoreFacade;
+
+ @BeforeEach
+ public void setup() {
+ readOnlySessionStoreFacade = new
ReadOnlySessionStoreFacade<>(mockedSessionStoreWithHeaders);
+ }
+
+ @Test
+ public void shouldReturnPlainValueOnFetchSession() {
+ when(mockedSessionStoreWithHeaders.fetchSession("key", 10L, 20L))
+ .thenReturn(AggregationWithHeaders.make("value", new
RecordHeaders()));
+
+ assertThat(readOnlySessionStoreFacade.fetchSession("key", 10L, 20L),
is("value"));
+ }
+
+ @Test
+ public void shouldReturnNullOnFetchSessionWhenNull() {
+ when(mockedSessionStoreWithHeaders.fetchSession("unknownKey", 10L,
20L))
+ .thenReturn(null);
+
+ assertNull(readOnlySessionStoreFacade.fetchSession("unknownKey", 10L,
20L));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnFindSessions() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(10L, 20L)),
+ AggregationWithHeaders.make("value1", new RecordHeaders())))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new SessionWindow(30L, 40L)),
+ AggregationWithHeaders.make("value2", new RecordHeaders())));
+ when(mockedSessionStoreWithHeaders.findSessions("key1", 10L, 40L))
+ .thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.findSessions("key1", 10L, 40L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(10L, 20L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new SessionWindow(30L, 40L)), "value2")));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnBackwardFindSessions() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(30L, 40L)),
+ AggregationWithHeaders.make("value1", new RecordHeaders())));
+ when(mockedSessionStoreWithHeaders.backwardFindSessions("key1", 10L,
40L))
+ .thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.backwardFindSessions("key1", 10L, 40L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(30L, 40L)), "value1")));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnFindSessionsWithKeyRange() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(10L, 20L)),
+ AggregationWithHeaders.make("value1", new RecordHeaders())));
+ when(mockedSessionStoreWithHeaders.findSessions("key1", "key2", 10L,
40L))
+ .thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.findSessions("key1", "key2", 10L, 40L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(10L, 20L)), "value1")));
+ }
+
+ @Test
+ public void
shouldReturnStrippedKeyValuePairsOnBackwardFindSessionsWithKeyRange() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new SessionWindow(30L, 40L)),
+ AggregationWithHeaders.make("value2", new RecordHeaders())));
+ when(mockedSessionStoreWithHeaders.backwardFindSessions("key1",
"key2", 10L, 40L))
+ .thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.backwardFindSessions("key1", "key2",
10L, 40L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new SessionWindow(30L, 40L)), "value2")));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnFetch() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(10L, 20L)),
+ AggregationWithHeaders.make("value1", new RecordHeaders())))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(30L, 40L)),
+ AggregationWithHeaders.make("value2", new RecordHeaders())));
+
when(mockedSessionStoreWithHeaders.fetch("key1")).thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.fetch("key1");
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(10L, 20L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(30L, 40L)), "value2")));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnBackwardFetch() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(30L, 40L)),
+ AggregationWithHeaders.make("value1", new RecordHeaders())));
+
when(mockedSessionStoreWithHeaders.backwardFetch("key1")).thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.backwardFetch("key1");
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(30L, 40L)), "value1")));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnFetchWithKeyRange() {
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new SessionWindow(10L, 20L)),
+ AggregationWithHeaders.make("value1", new RecordHeaders())));
+ when(mockedSessionStoreWithHeaders.fetch("key1",
"key2")).thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.fetch("key1", "key2");
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new SessionWindow(10L, 20L)), "value1")));
+ }
+
+ @Test
+ public void shouldReturnStrippedKeyValuePairsOnBackwardFetchWithKeyRange()
{
+ when(mockedIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new SessionWindow(30L, 40L)),
+ AggregationWithHeaders.make("value2", new RecordHeaders())));
+ when(mockedSessionStoreWithHeaders.backwardFetch("key1",
"key2")).thenReturn(mockedIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlySessionStoreFacade.backwardFetch("key1", "key2");
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new SessionWindow(30L, 40L)), "value2")));
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
index c306bab040c..87770940cf8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
@@ -161,7 +161,7 @@ public class SessionStoreFetchTest {
expectedRecords.iterator() :
expectedRecords.descendingIterator();
- TestUtils.checkEquals(scanIterator, dataIterator);
+ TestUtils.checkEquals(dataIterator, scanIterator);
}
try (final KeyValueIterator<Windowed<String>, Long> scanIterator =
forward ?
@@ -172,7 +172,7 @@ public class SessionStoreFetchTest {
expectedRecords.iterator() :
expectedRecords.descendingIterator();
- TestUtils.checkEquals(scanIterator, dataIterator);
+ TestUtils.checkEquals(dataIterator, scanIterator);
}
}
@@ -185,7 +185,7 @@ public class SessionStoreFetchTest {
expectedRecords.iterator() :
expectedRecords.descendingIterator();
- TestUtils.checkEquals(scanIterator, dataIterator);
+ TestUtils.checkEquals(dataIterator, scanIterator);
}
try (final KeyValueIterator<Windowed<String>, Long> scanIterator =
forward ?
@@ -196,7 +196,7 @@ public class SessionStoreFetchTest {
expectedRecords.iterator() :
expectedRecords.descendingIterator();
- TestUtils.checkEquals(scanIterator, dataIterator);
+ TestUtils.checkEquals(dataIterator, scanIterator);
}
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 20d81be52f9..4fa9117cee7 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -76,12 +76,14 @@ import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -93,6 +95,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
+import org.apache.kafka.streams.state.internals.SessionStoreIteratorFacade;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.test.TestRecord;
@@ -1195,6 +1198,9 @@ public class TopologyTestDriver implements Closeable {
@SuppressWarnings("unchecked")
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
final StateStore store = getStateStore(name, false);
+ if (store instanceof SessionStoreWithHeaders) {
+ return new SessionStoreFacade<>((SessionStoreWithHeaders<K, V>)
store);
+ }
return store instanceof SessionStore ? (SessionStore<K, V>) store :
null;
}
@@ -1436,4 +1442,115 @@ public class TopologyTestDriver implements Closeable {
}
}
+ static class SessionStoreFacade<K, V> implements SessionStore<K, V> {
+ private final SessionStoreWithHeaders<K, V> inner;
+
+ SessionStoreFacade(final SessionStoreWithHeaders<K, V> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return new SessionStoreIteratorFacade<>(inner.findSessions(key,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K
key,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return new
SessionStoreIteratorFacade<>(inner.backwardFindSessions(key,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return new
SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K
keyFrom,
+ final K
keyTo,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return new
SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public V fetchSession(final K key,
+ final long sessionStartTime,
+ final long sessionEndTime) {
+ return
AggregationWithHeaders.getAggregationOrNull(inner.fetchSession(key,
sessionStartTime, sessionEndTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(final long
earliestSessionEndTime,
+ final long
latestSessionEndTime) {
+ return new
SessionStoreIteratorFacade<>(inner.findSessions(earliestSessionEndTime,
latestSessionEndTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+ return new SessionStoreIteratorFacade<>(inner.fetch(key));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
+ return new SessionStoreIteratorFacade<>(inner.backwardFetch(key));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K
keyTo) {
+ return new SessionStoreIteratorFacade<>(inner.fetch(keyFrom,
keyTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo) {
+ return new
SessionStoreIteratorFacade<>(inner.backwardFetch(keyFrom, keyTo));
+ }
+
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ inner.remove(sessionKey);
+ }
+
+ @Override
+ public void put(final Windowed<K> sessionKey, final V aggregate) {
+ inner.put(sessionKey, AggregationWithHeaders.make(aggregate, new
RecordHeaders()));
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ inner.init(stateStoreContext, root);
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+
+ @Override
+ public Position getPosition() {
+ return inner.getPosition();
+ }
+ }
+
}