This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7c17a0fb5b KAFKA-20222: Enable SessionStore with headers in DSL  
(#21600)
e7c17a0fb5b is described below

commit e7c17a0fb5b3bc7dddbef75921650395254a194d
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Mar 14 14:51:13 2026 -0400

    KAFKA-20222: Enable SessionStore with headers in DSL  (#21600)
    
    Adding support for DSL integration with SessionStores with header in
    support of KIP-1285.
    
    Also adds a ReadOnlySessionStoreFacade that wraps
    SessionStoreWithHeaders and strips the AggregationWithHeaders wrapper
    when queried through Interactive Queries, mirroring the existing pattern
    for ReadOnlyKeyValueStoreFacade and ReadOnlyWindowStoreFacade.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../integration/IQv2StoreIntegrationTest.java      |  25 +--
 .../KStreamAggregationIntegrationTest.java         | 161 +++++++++--------
 .../internals/KStreamSessionWindowAggregate.java   |  37 ++--
 ...va => SessionCacheFlushListenerWithHeader.java} |  24 ++-
 .../internals/SessionStoreMaterializer.java        |  18 +-
 .../streams/state/BuiltInDslStoreSuppliers.java    |   6 +-
 .../kafka/streams/state/DslSessionParams.java      |  21 ++-
 .../org/apache/kafka/streams/state/Stores.java     |  10 +-
 .../state/internals/GlobalStateStoreProvider.java  |   3 +
 .../state/internals/MeteredSessionStore.java       |   7 +-
 .../internals/MeteredSessionStoreWithHeaders.java  |  12 ++
 .../internals/ReadOnlySessionStoreFacade.java      |  88 ++++++++++
 .../internals/StreamThreadStateStoreProvider.java  |   3 +
 ...KStreamSessionWindowAggregateProcessorTest.java |  42 +++--
 ...> SessionCacheFlushListenerWithHeaderTest.java} |  10 +-
 .../internals/SessionWindowedKStreamImplTest.java  |  25 ++-
 .../internals/ReadOnlySessionStoreFacadeTest.java  | 194 +++++++++++++++++++++
 .../state/internals/SessionStoreFetchTest.java     |   8 +-
 .../apache/kafka/streams/TopologyTestDriver.java   | 117 +++++++++++++
 19 files changed, 654 insertions(+), 157 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 649799c7976..973196cfb11 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -108,6 +108,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -1498,14 +1499,10 @@ public class IQv2StoreIntegrationTest {
                     throw new AssertionError(queryResult.toString());
                 }
                 assertThat(partitionResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
-                assertThat(partitionResult.getFailureMessage(), is(
-                    "This store"
-                        + " (class 
org.apache.kafka.streams.state.internals.MeteredSessionStore)"
-                        + " doesn't know how to execute the given query"
-                        + " (WindowRangeQuery{key=Optional.empty, 
timeFrom=Optional[1970-01-01T00:00:00Z], 
timeTo=Optional[1970-01-01T00:00:00Z]})"
-                        + " because SessionStores only support 
WindowRangeQuery.withKey."
-                        + " Contact the store maintainer if you need support 
for a new query type."
-                ));
+                assertThat(partitionResult.getFailureMessage(),
+                    containsString("doesn't know how to execute the given 
query"));
+                assertThat(partitionResult.getFailureMessage(),
+                    containsString("because SessionStores only support 
WindowRangeQuery.withKey."));
             }
         }
     }
@@ -1571,14 +1568,10 @@ public class IQv2StoreIntegrationTest {
                     throw new AssertionError(queryResult.toString());
                 }
                 assertThat(partitionResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
-                assertThat(partitionResult.getFailureMessage(), is(
-                    "This store"
-                        + " (class 
org.apache.kafka.streams.state.internals.MeteredSessionStore)"
-                        + " doesn't know how to execute the given query"
-                        + " (WindowRangeQuery{key=Optional.empty, 
timeFrom=Optional[1970-01-01T00:00:00Z], 
timeTo=Optional[1970-01-01T00:00:00Z]})"
-                        + " because SessionStores only support 
WindowRangeQuery.withKey."
-                        + " Contact the store maintainer if you need support 
for a new query type."
-                ));
+                assertThat(partitionResult.getFailureMessage(),
+                    containsString("doesn't know how to execute the given 
query"));
+                assertThat(partitionResult.getFailureMessage(),
+                    containsString("because SessionStores only support 
WindowRangeQuery.withKey."));
             }
         }
     }
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index aa283726134..8654ca75f5e 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -71,11 +74,15 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -718,8 +725,9 @@ public class KStreamAggregationIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldCountSessionWindows() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldCountSessionWindows(final boolean withHeaders) throws 
Exception {
         final long sessionGap = 5 * 60 * 1000L;
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(
             new KeyValue<>("bob", "start"),
@@ -797,6 +805,10 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
 
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
+
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)))
@@ -819,88 +831,37 @@ public class KStreamAggregationIntegrationTest {
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo(KeyValue.pair(1L, t3)));
     }
 
-    @Test
-    public void shouldReduceSessionWindows() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldReduceSessionWindows(final boolean withHeaders) throws 
Exception {
         final long sessionGap = 1000L; // something to do with time
-        final List<KeyValue<String, String>> t1Messages = Arrays.asList(
-            new KeyValue<>("bob", "start"),
-            new KeyValue<>("penny", "start"),
-            new KeyValue<>("jo", "pause"),
-            new KeyValue<>("emily", "pause")
-        );
+
+        final Properties producerConfig = TestUtils.producerConfig(
+            CLUSTER.bootstrapServers(),
+            StringSerializer.class,
+            StringSerializer.class,
+            new Properties());
 
         final long t1 = mockTime.milliseconds();
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            userSessionsStream,
-            t1Messages,
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            t1
-        );
         final long t2 = t1 + (sessionGap / 2);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            userSessionsStream,
-            Collections.singletonList(
-                new KeyValue<>("emily", "resume")
-            ),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            t2
-        );
         final long t3 = t1 + sessionGap + 1;
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            userSessionsStream,
-            Arrays.asList(
-                new KeyValue<>("bob", "pause"),
-                new KeyValue<>("penny", "stop")
-            ),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            t3
-        );
         final long t4 = t3 + (sessionGap / 2);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            userSessionsStream,
-            Arrays.asList(
-                new KeyValue<>("bob", "resume"), // bobs session continues
-                new KeyValue<>("jo", "resume")   // jo's starts new session
-            ),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            t4
-        );
         final long t5 = t4 - 1;
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            userSessionsStream,
-            Collections.singletonList(
-                new KeyValue<>("jo", "late")   // jo has late arrival
-            ),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            t5
-        );
+
+        produceSessionWindowData(producerConfig, withHeaders, t1, t2, t3, t4, 
t5, sessionGap);
 
         final Map<Windowed<String>, KeyValue<String, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
 
+        if (withHeaders) {
+            streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
+
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) 
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), 
ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2, 
Materialized.as(userSessionsStore))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), 
ofMinutes(1)))
+            .reduce((value1, value2) -> value1 + ":" + value2, 
Materialized.as(userSessionsStore))
             .toStream()
             .process(() -> (Processor<Windowed<String>, String, Object, 
Object>) record -> {
                 results.put(record.key(), KeyValue.pair(record.value(), 
record.timestamp()));
@@ -919,9 +880,61 @@ public class KStreamAggregationIntegrationTest {
         assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, 
t4))), equalTo(KeyValue.pair("pause:resume", t4)));
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo(KeyValue.pair("stop", t3)));
 
-        // verify can query data via IQ
+        verifySessionStore(userSessionsStore, t1, t3, t4);
+
+    }
+
+    private void produceSessionWindowData(final Properties producerConfig,
+                                           final boolean withHeaders,
+                                           final long t1, final long t2, final 
long t3,
+                                           final long t4, final long t5,
+                                           final long sessionGap) throws 
Exception {
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+            new KeyValue<>("bob", "start"),
+            new KeyValue<>("penny", "start"),
+            new KeyValue<>("jo", "pause"),
+            new KeyValue<>("emily", "pause")
+        );
+
+        produceWithOptionalHeaders(t1Messages, producerConfig, withHeaders, 
"t1", t1);
+        produceWithOptionalHeaders(
+            Collections.singletonList(new KeyValue<>("emily", "resume")),
+            producerConfig, withHeaders, "t2", t2);
+        produceWithOptionalHeaders(
+            Arrays.asList(new KeyValue<>("bob", "pause"), new 
KeyValue<>("penny", "stop")),
+            producerConfig, withHeaders, "t3", t3);
+        produceWithOptionalHeaders(
+            Arrays.asList(
+                new KeyValue<>("bob", "resume"),  // bobs session continues
+                new KeyValue<>("jo", "resume")),  // jo's starts new session
+            producerConfig, withHeaders, "t4", t4);
+        produceWithOptionalHeaders(
+            Collections.singletonList(new KeyValue<>("jo", "late")),  // jo 
has late arrival
+            producerConfig, withHeaders, "t5", t5);
+    }
+
+    private void produceWithOptionalHeaders(final Collection<KeyValue<String, 
String>> records,
+                                             final Properties producerConfig,
+                                             final boolean withHeaders,
+                                             final String batchId,
+                                             final long timestamp) throws 
Exception {
+        if (withHeaders) {
+            final Headers headers = new RecordHeaders(Arrays.asList(
+                new RecordHeader("batch", 
batchId.getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("source", 
"test".getBytes(StandardCharsets.UTF_8))
+            ));
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream, records, producerConfig, headers, 
timestamp, false);
+        } else {
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream, records, producerConfig, timestamp);
+        }
+    }
+
+    private void verifySessionStore(final String storeName,
+                                    final long t1, final long t3, final long 
t4) throws Exception {
         final ReadOnlySessionStore<String, String> sessionStore =
-            IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, 
QueryableStoreTypes.sessionStore());
+            IntegrationTestUtils.getStore(storeName, kafkaStreams, 
QueryableStoreTypes.sessionStore());
 
         try (final KeyValueIterator<Windowed<String>, String> bob = 
sessionStore.fetch("bob")) {
             assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", 
new SessionWindow(t1, t1)), "start")));
@@ -929,7 +942,7 @@ public class KStreamAggregationIntegrationTest {
             assertFalse(bob.hasNext());
         }
     }
-
+    
     @Test
     public void shouldCountUnlimitedWindows() throws Exception {
         final long startTime = mockTime.milliseconds() - 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f3ca9e6740a..67730e82c89 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
@@ -36,6 +37,7 @@ import 
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -105,7 +107,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
     private class KStreamSessionWindowAggregateProcessor extends
         ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
-        private SessionStore<KIn, VAgg> store;
+        private SessionStore<KIn, AggregationWithHeaders<VAgg>> store;
         private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
         private Sensor emittedRecordsSensor;
@@ -147,7 +149,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store,
                     context,
-                    new SessionCacheFlushListener<>(context),
+                    new SessionCacheFlushListenerWithHeader<>(context),
                     sendOldValues);
             }
         }
@@ -165,22 +167,22 @@ public class KStreamSessionWindowAggregate<KIn, VIn, 
VAgg> implements KStreamAgg
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long windowCloseTime = observedStreamTime - 
windows.gracePeriodMs() - windows.inactivityGap();
 
-            final List<KeyValue<Windowed<KIn>, VAgg>> merged = new 
ArrayList<>();
+            final List<KeyValue<Windowed<KIn>, AggregationWithHeaders<VAgg>>> 
merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
             VAgg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = 
store.findSessions(
+                final KeyValueIterator<Windowed<KIn>, 
AggregationWithHeaders<VAgg>> iterator = store.findSessions(
                     record.key(),
                     timestamp - windows.inactivityGap(),
                     timestamp + windows.inactivityGap()
                 )
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<KIn>, VAgg> next = iterator.next();
+                    final KeyValue<Windowed<KIn>, 
AggregationWithHeaders<VAgg>> next = iterator.next();
                     merged.add(next);
-                    agg = sessionMerger.apply(record.key(), agg, next.value);
+                    agg = sessionMerger.apply(record.key(), agg, 
AggregationWithHeaders.getAggregationOrNull(next.value));
                     mergedWindow = mergeSessionWindow(mergedWindow, 
(SessionWindow) next.key.window());
                 }
             }
@@ -189,16 +191,15 @@ public class KStreamSessionWindowAggregate<KIn, VIn, 
VAgg> implements KStreamAgg
                 logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, 
mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
-                    for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
+                    for (final KeyValue<Windowed<KIn>, 
AggregationWithHeaders<VAgg>> session : merged) {
                         store.remove(session.key);
-
-                        maybeForwardUpdate(session.key, session.value, null);
+                        maybeForwardUpdate(session.key, 
AggregationWithHeaders.getAggregationOrNull(session.value), null);
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
-                store.put(sessionKey, agg);
+                store.put(sessionKey, AggregationWithHeaders.make(agg, new 
RecordHeaders()));
 
                 maybeForwardUpdate(sessionKey, null, agg);
             }
@@ -281,16 +282,16 @@ public class KStreamSessionWindowAggregate<KIn, VIn, 
VAgg> implements KStreamAgg
 
             // Only time ordered (indexed) session store should have 
implemented
             // this function, otherwise a not-supported exception would throw
-            try (final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit = 
store
-                    .findSessions(emitRangeLowerBound, emitRangeUpperBound)) {
+            try (final KeyValueIterator<Windowed<KIn>, 
AggregationWithHeaders<VAgg>> windowToEmit =
+                     store.findSessions(emitRangeLowerBound, 
emitRangeUpperBound)) {
 
                 while (windowToEmit.hasNext()) {
                     emittedCount++;
-                    final KeyValue<Windowed<KIn>, VAgg> kv = 
windowToEmit.next();
+                    final KeyValue<Windowed<KIn>, 
AggregationWithHeaders<VAgg>> kv = windowToEmit.next();
 
                     tupleForwarder.maybeForward(
                         record.withKey(kv.key)
-                            .withValue(new Change<>(kv.value, null))
+                            .withValue(new 
Change<>(AggregationWithHeaders.getAggregationOrNull(kv.value), null))
                             // set the timestamp as the window end timestamp
                             .withTimestamp(kv.key.window().end())
                             .withHeaders(record.headers()));
@@ -381,8 +382,8 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
 
     private class KTableSessionWindowValueGetter implements 
KTableValueGetter<Windowed<KIn>, VAgg> {
 
-        private SessionStore<KIn, VAgg> store;
-
+        private SessionStore<KIn, AggregationWithHeaders<VAgg>> store;
+        
         @Override
         public void init(final ProcessorContext<?, ?> context) {
             store = context.getStateStore(storeName);
@@ -390,8 +391,10 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
 
         @Override
         public ValueAndTimestamp<VAgg> get(final Windowed<KIn> key) {
+            final AggregationWithHeaders<VAgg> result =
+                store.fetchSession(key.key(), key.window().start(), 
key.window().end());
             return ValueAndTimestamp.make(
-                store.fetchSession(key.key(), key.window().start(), 
key.window().end()),
+                AggregationWithHeaders.getAggregationOrNull(result),
                 key.window().end());
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java
similarity index 61%
rename from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
rename to 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java
index af69dbab679..bc0f2a99153 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java
@@ -16,30 +16,46 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Windowed<KOut>, VOut> {
+class SessionCacheFlushListenerWithHeader<KOut, VOut>
+    implements CacheFlushListener<Windowed<KOut>, 
AggregationWithHeaders<VOut>> {
+
     private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> 
context;
 
     @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
-    SessionCacheFlushListener(final ProcessorContext<Windowed<KOut>, 
Change<VOut>> context) {
+    SessionCacheFlushListenerWithHeader(final ProcessorContext<Windowed<KOut>, 
Change<VOut>> context) {
         this.context = (InternalProcessorContext<Windowed<KOut>, 
Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
+    public void apply(final Record<Windowed<KOut>, 
Change<AggregationWithHeaders<VOut>>> record) {
         @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(record.withTimestamp(record.key().window().end()));
+            final VOut newValue = 
AggregationWithHeaders.getAggregationOrNull(record.value().newValue);
+            final VOut oldValue = 
AggregationWithHeaders.getAggregationOrNull(record.value().oldValue);
+
+            final Headers headers = record.value().newValue != null
+                ? record.value().newValue.headers()
+                : new RecordHeaders();
+
+            context.forward(
+                record
+                    .withValue(new Change<>(newValue, oldValue, 
record.value().isLatest))
+                    .withTimestamp(record.key().window().end())
+                    .withHeaders(headers));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index a5317f48880..5f1f9e409e7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.state.DslSessionParams;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 
@@ -57,19 +59,21 @@ public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
     }
 
     @Override
-    public StoreBuilder<?> builder() {
+    public  StoreBuilder<SessionStoreWithHeaders<K, V>> builder() {
+        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
         final SessionBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
                 ? dslStoreSuppliers().sessionStore(new DslSessionParams(
                         materialized.storeName(),
                         Duration.ofMillis(retentionPeriod),
-                        emitStrategy))
+                        emitStrategy,
+                        storeFormat))
                 : (SessionBytesStoreSupplier) materialized.storeSupplier();
 
-        final StoreBuilder<SessionStore<K, V>> builder = 
Stores.sessionStoreBuilder(
-                supplier,
-                materialized.keySerde(),
-                materialized.valueSerde()
-        );
+        final StoreBuilder<SessionStoreWithHeaders<K, V>> builder = 
Stores.sessionStoreBuilderWithHeaders(
+                    supplier,
+                    materialized.keySerde(),
+                    materialized.valueSerde()
+            );
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index ef81c869379..3b23b9e1f6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -98,9 +98,13 @@ public class BuiltInDslStoreSuppliers {
                 return new RocksDbTimeOrderedSessionBytesStoreSupplier(
                         params.name(),
                         params.retentionPeriod().toMillis(),
-                        true);
+                        true,
+                        params.storeFormat() == DslStoreFormat.HEADERS);
             }
 
+            if (params.storeFormat() == DslStoreFormat.HEADERS) {
+                return Stores.persistentSessionStoreWithHeaders(params.name(), 
params.retentionPeriod());
+            }
             return Stores.persistentSessionStore(params.name(), 
params.retentionPeriod());
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java 
b/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
index 97193de6884..16c8c523ec7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 
 import java.time.Duration;
@@ -30,6 +31,7 @@ public class DslSessionParams {
     private final String name;
     private final Duration retentionPeriod;
     private final EmitStrategy emitStrategy;
+    private final DslStoreFormat storeFormat;
 
     /**
      * @param name              name of the store (cannot be {@code null})
@@ -38,15 +40,24 @@ public class DslSessionParams {
      *                          contain the inactivity gap of the session and 
the entire grace period.)
      * @param emitStrategy      defines how to emit results
      */
+    @Deprecated
     public DslSessionParams(
             final String name,
             final Duration retentionPeriod,
             final EmitStrategy emitStrategy
     ) {
+        this(name, retentionPeriod, emitStrategy, DslStoreFormat.PLAIN);
+    }
+
+    public DslSessionParams(final String name,
+                            final Duration retentionPeriod,
+                            final EmitStrategy emitStrategy,
+                            final DslStoreFormat storeFormat) {
         Objects.requireNonNull(name);
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.emitStrategy = emitStrategy;
+        this.storeFormat = storeFormat;
     }
 
     public String name() {
@@ -61,6 +72,10 @@ public class DslSessionParams {
         return emitStrategy;
     }
 
+    public DslStoreFormat storeFormat() {
+        return storeFormat;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -72,12 +87,13 @@ public class DslSessionParams {
         final DslSessionParams that = (DslSessionParams) o;
         return Objects.equals(name, that.name)
                 && Objects.equals(retentionPeriod, that.retentionPeriod)
-                && Objects.equals(emitStrategy, that.emitStrategy);
+                && Objects.equals(emitStrategy, that.emitStrategy)
+                && Objects.equals(storeFormat, that.storeFormat);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, retentionPeriod, emitStrategy);
+        return Objects.hash(name, retentionPeriod, emitStrategy, storeFormat);
     }
 
     @Override
@@ -86,6 +102,7 @@ public class DslSessionParams {
                 "name='" + name + '\'' +
                 ", retentionPeriod=" + retentionPeriod +
                 ", emitStrategy=" + emitStrategy +
+                ", storeFormat=" + storeFormat +
                 '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 53bf38cd777..db39656b83b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -672,14 +672,16 @@ public final class Stores {
      *
      * @param supplier      a {@link SessionBytesStoreSupplier} (cannot be 
{@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is 
{@code null} for put operations,
+     *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link 
SessionStoreWithHeaders}
      */
-    public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> 
sessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier supplier,
-                                                                               
                      final Serde<K> keySerde,
-                                                                               
                      final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> 
sessionStoreBuilderWithHeaders(
+            final SessionBytesStoreSupplier supplier,
+            final Serde<K> keySerde,
+            final Serde<V> valueSerde) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilderWithHeaders<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 8ae592e4b4b..f15c21be5c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
@@ -48,6 +49,8 @@ public class GlobalStateStoreProvider implements 
StateStoreProvider {
             return (List<T>) Collections.singletonList(new 
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store));
         } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
             return (List<T>) Collections.singletonList(new 
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
+        } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
+            return (List<T>) Collections.singletonList(new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store));
         }
         return (List<T>) Collections.singletonList(store);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index cf5e00d403b..9b4f05e44cf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
@@ -163,11 +164,15 @@ public class MeteredSessionStore<K, V>
         restoreSensor.record(restoreTimeNs);
     }
 
+    protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, 
final SerdeGetter getter) {
+        return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
+    }
+
     private void initStoreSerde(final StateStoreContext context) {
         final String storeName = name();
         final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
         serdes = StoreSerdeInitializer.prepareStoreSerde(
-            context, storeName, changelogTopic, keySerde, valueSerde, 
WrappingNullableUtils::prepareValueSerde);
+            context, storeName, changelogTopic, keySerde, valueSerde, 
this::prepareValueSerdeForStore);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index 7f0b8d35cf7..221cf461ba4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -53,6 +54,17 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
         super(inner, metricsScope, keySerde, aggSerde, time);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Serde<AggregationWithHeaders<AGG>> prepareValueSerdeForStore(
+            final Serde<AggregationWithHeaders<AGG>> valueSerde,
+            final SerdeGetter getter) {
+        if (valueSerde == null) {
+            return new AggregationWithHeadersSerde<>((Serde<AGG>) 
getter.valueSerde());
+        }
+        return super.prepareValueSerdeForStore(valueSerde, getter);
+    }
+
     @Override
     public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG> aggregate) {
         Objects.requireNonNull(sessionKey, "sessionKey can't be null");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java
new file mode 100644
index 00000000000..9e08ba95190
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+
+public class ReadOnlySessionStoreFacade<K, V> implements 
ReadOnlySessionStore<K, V> {
+    protected final SessionStoreWithHeaders<K, V> inner;
+
+    protected ReadOnlySessionStoreFacade(final SessionStoreWithHeaders<K, V> 
store) {
+        inner = store;
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
+                                                          final long 
earliestSessionEndTime,
+                                                          final long 
latestSessionStartTime) {
+        return new SessionStoreIteratorFacade<>(inner.findSessions(key, 
earliestSessionEndTime, latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new 
SessionStoreIteratorFacade<>(inner.backwardFindSessions(key, 
earliestSessionEndTime, latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
+                                                          final K keyTo,
+                                                          final long 
earliestSessionEndTime,
+                                                          final long 
latestSessionStartTime) {
+        return new SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, 
keyTo, earliestSessionEndTime, latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
keyFrom,
+                                                                  final K 
keyTo,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new 
SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime));
+    }
+
+    @Override
+    public V fetchSession(final K key,
+                          final long sessionStartTime,
+                          final long sessionEndTime) {
+        return 
AggregationWithHeaders.getAggregationOrNull(inner.fetchSession(key, 
sessionStartTime, sessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+        return new SessionStoreIteratorFacade<>(inner.fetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
+        return new SessionStoreIteratorFacade<>(inner.backwardFetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K 
keyTo) {
+        return new SessionStoreIteratorFacade<>(inner.fetch(keyFrom, keyTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom, 
final K keyTo) {
+        return new SessionStoreIteratorFacade<>(inner.backwardFetch(keyFrom, 
keyTo));
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 275123e3225..f797ff0aadb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.Task;
 import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
@@ -107,6 +108,8 @@ public class StreamThreadStateStoreProvider {
                 return (T) new 
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
             } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                 return (T) new 
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store);
+            } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
+                return (T) new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
             } else {
                 return (T) store;
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index d6b94c7e7f9..5c18ef5662c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -38,9 +38,10 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
-import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
@@ -91,7 +92,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private InternalMockProcessorContext<Windowed<String>, Change<Long>> 
mockContext;
     private KStreamSessionWindowAggregate<String, String, Long> 
sessionAggregator;
     private Processor<String, String, Windowed<String>, Change<Long>> 
processor;
-    private SessionStore<String, Long> sessionStore;
+    private SessionStoreWithHeaders<String, Long> sessionStore;
     
     public EmitStrategy.StrategyType type;
 
@@ -152,8 +153,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
             new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, GAP_MS 
* 3, true) :
             Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
 
-        final StoreBuilder<SessionStore<String, Long>> storeBuilder = 
Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long())
-            .withLoggingDisabled();
+        final StoreBuilder<SessionStoreWithHeaders<String, Long>> storeBuilder 
=
+            Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(), 
Serdes.Long())
+                .withLoggingDisabled();
 
         if (enableCaching && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             storeBuilder.withCachingEnabled();
@@ -179,10 +181,10 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process(new Record<>("john", "first", 0L));
         processor.process(new Record<>("john", "second", 500L));
 
-        try (final KeyValueIterator<Windowed<String>, Long> values =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> values =
                  sessionStore.findSessions("john", 0, 2000)) {
             assertTrue(values.hasNext());
-            assertEquals(Long.valueOf(2), values.next().value);
+            assertEquals(Long.valueOf(2), 
AggregationWithHeaders.getAggregationOrNull(values.next().value));
         }
     }
 
@@ -192,27 +194,27 @@ public class KStreamSessionWindowAggregateProcessorTest {
         setup(inputType, true);
         final String sessionId = "mel";
         processor.process(new Record<>(sessionId, "first", 0L));
-        try (final KeyValueIterator<Windowed<String>, Long> iterator = 
sessionStore.findSessions(sessionId, 0, 0)) {
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> iterator = sessionStore.findSessions(sessionId, 
0, 0)) {
             assertTrue(iterator.hasNext());
         }
 
         // move time beyond gap
         processor.process(new Record<>(sessionId, "second", GAP_MS + 1));
-        try (final KeyValueIterator<Windowed<String>, Long> iterator = 
sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1)) {
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> iterator = sessionStore.findSessions(sessionId, 
GAP_MS + 1, GAP_MS + 1)) {
             assertTrue(iterator.hasNext());
         }
         // should still exist as not within gap
-        try (final KeyValueIterator<Windowed<String>, Long> iterator = 
sessionStore.findSessions(sessionId, 0, 0)) {
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> iterator = sessionStore.findSessions(sessionId, 
0, 0)) {
             assertTrue(iterator.hasNext());
         }
         // move time back
         processor.process(new Record<>(sessionId, "third", GAP_MS / 2));
 
-        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> iterator =
                  sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) {
-            final KeyValue<Windowed<String>, Long> kv = iterator.next();
+            final KeyValue<Windowed<String>, AggregationWithHeaders<Long>> kv 
= iterator.next();
 
-            assertEquals(Long.valueOf(3), kv.value);
+            assertEquals(Long.valueOf(3), 
AggregationWithHeaders.getAggregationOrNull(kv.value));
             assertFalse(iterator.hasNext());
         }
     }
@@ -223,9 +225,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
         setup(inputType, true);
         processor.process(new Record<>("mel", "first", 0L));
         processor.process(new Record<>("mel", "second", 0L));
-        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> iterator =
                  sessionStore.findSessions("mel", 0, 0)) {
-            assertEquals(Long.valueOf(2L), iterator.next().value);
+            assertEquals(Long.valueOf(2L), 
AggregationWithHeaders.getAggregationOrNull(iterator.next().value));
             assertFalse(iterator.hasNext());
         }
     }
@@ -289,18 +291,22 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process(new Record<>("a", "1", 0L));
 
         // first ensure it is in the store
-        try (final KeyValueIterator<Windowed<String>, Long> a1 =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> a1 =
                  sessionStore.findSessions("a", 0, 0)) {
-            assertEquals(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), 1L), a1.next());
+            final KeyValue<Windowed<String>, AggregationWithHeaders<Long>> 
next = a1.next();
+            assertEquals(new Windowed<>("a", new SessionWindow(0, 0)), 
next.key);
+            assertEquals(1L, 
AggregationWithHeaders.getAggregationOrNull(next.value));
         }
 
 
         processor.process(new Record<>("a", "2", 100L));
         // a1 from above should have been removed
         // should have merged session in store
-        try (final KeyValueIterator<Windowed<String>, Long> a2 =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> a2 =
                  sessionStore.findSessions("a", 0, 100)) {
-            assertEquals(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 100)), 2L), a2.next());
+            final KeyValue<Windowed<String>, AggregationWithHeaders<Long>> 
next = a2.next();
+            assertEquals(new Windowed<>("a", new SessionWindow(0, 100)), 
next.key);
+            assertEquals(2L, 
AggregationWithHeaders.getAggregationOrNull(next.value));
             assertFalse(a2.hasNext());
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java
similarity index 83%
rename from 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java
index 4d837f68fea..33487507a07 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -33,7 +35,7 @@ import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class SessionCacheFlushListenerTest {
+public class SessionCacheFlushListenerWithHeaderTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
         @SuppressWarnings("unchecked")
@@ -44,10 +46,12 @@ public class SessionCacheFlushListenerTest {
                 new Change<>("newValue", "oldValue"),
                 73L));
 
-        new SessionCacheFlushListener<>(context).apply(
+        new SessionCacheFlushListenerWithHeader<>(context).apply(
             new Record<>(
                 new Windowed<>("key", new SessionWindow(21L, 73L)),
-                new Change<>("newValue", "oldValue"),
+                new Change<>(
+                    AggregationWithHeaders.make("newValue", new 
RecordHeaders()),
+                    AggregationWithHeaders.make("oldValue", new 
RecordHeaders())),
                 42L));
 
         verify(context, times(2)).setCurrentNode(null);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 6dbb9b8d1db..a78818ca028 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -36,10 +36,12 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
 import org.apache.kafka.streams.state.internals.MeteredSessionStore;
-import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore;
+import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -227,7 +229,7 @@ public class SessionWindowedKStreamImplTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             processData(driver);
             final SessionStore<String, Long> store = 
driver.getSessionStore("count-store");
-            final List<KeyValue<Windowed<String>, Long>> data = 
StreamsTestUtils.toListAndCloseIterator(store.fetch("1", "2"));
+            final List<KeyValue<Windowed<String>, Long>> data = 
unwrapAggregations(store.fetch("1", "2"));
             if (!emitFinal) {
                 assertThat(
                         data,
@@ -255,7 +257,7 @@ public class SessionWindowedKStreamImplTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             processData(driver);
             final SessionStore<String, String> sessionStore = 
driver.getSessionStore("reduced");
-            final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2"));
+            final List<KeyValue<Windowed<String>, String>> data = 
unwrapAggregations(sessionStore.fetch("1", "2"));
 
             if (!emitFinal) {
                 assertThat(
@@ -288,7 +290,7 @@ public class SessionWindowedKStreamImplTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             processData(driver);
             final SessionStore<String, String> sessionStore = 
driver.getSessionStore("aggregated");
-            final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2"));
+            final List<KeyValue<Windowed<String>, String>> data = 
unwrapAggregations(sessionStore.fetch("1", "2"));
             if (!emitFinal) {
                 assertThat(
                         data,
@@ -423,11 +425,11 @@ public class SessionWindowedKStreamImplTest {
                 Materialized.<String, String, SessionStore<Bytes, 
byte[]>>as("aggregated").withValueSerde(Serdes.String()));
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
-            final SessionStore<String, String> store = 
driver.getSessionStore("aggregated");
+            final StateStore store = 
driver.getAllStateStores().get("aggregated");
             final WrappedStateStore changeLogging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
             assertThat(store, instanceOf(MeteredSessionStore.class));
             assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
-            assertThat(changeLogging.wrapped(), 
instanceOf(RocksDBTimeOrderedSessionStore.class));
+            assertThat(changeLogging.wrapped(), 
instanceOf(SessionToHeadersStoreAdapter.class));
         }
     }
 
@@ -440,4 +442,15 @@ public class SessionWindowedKStreamImplTest {
         inputTopic.pipeInput("2", "1", 600);
         inputTopic.pipeInput("2", "2", 599);
     }
+
+    private <V> List<KeyValue<Windowed<String>, V>> unwrapAggregations(
+            final KeyValueIterator<Windowed<String>, V> iterator) {
+        final List<KeyValue<Windowed<String>, V>> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            final KeyValue<Windowed<String>, V> next = iterator.next();
+            result.add(KeyValue.pair(next.key, next.value));
+        }
+        iterator.close();
+        return result;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java
new file mode 100644
index 00000000000..f0efd857f71
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class ReadOnlySessionStoreFacadeTest {
+    @Mock
+    private SessionStoreWithHeaders<String, String> 
mockedSessionStoreWithHeaders;
+    @Mock
+    private KeyValueIterator<Windowed<String>, AggregationWithHeaders<String>> 
mockedIterator;
+
+    private ReadOnlySessionStoreFacade<String, String> 
readOnlySessionStoreFacade;
+
+    @BeforeEach
+    public void setup() {
+        readOnlySessionStoreFacade = new 
ReadOnlySessionStoreFacade<>(mockedSessionStoreWithHeaders);
+    }
+
+    @Test
+    public void shouldReturnPlainValueOnFetchSession() {
+        when(mockedSessionStoreWithHeaders.fetchSession("key", 10L, 20L))
+            .thenReturn(AggregationWithHeaders.make("value", new 
RecordHeaders()));
+
+        assertThat(readOnlySessionStoreFacade.fetchSession("key", 10L, 20L), 
is("value"));
+    }
+
+    @Test
+    public void shouldReturnNullOnFetchSessionWhenNull() {
+        when(mockedSessionStoreWithHeaders.fetchSession("unknownKey", 10L, 
20L))
+            .thenReturn(null);
+
+        assertNull(readOnlySessionStoreFacade.fetchSession("unknownKey", 10L, 
20L));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnFindSessions() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(10L, 20L)),
+                AggregationWithHeaders.make("value1", new RecordHeaders())))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new SessionWindow(30L, 40L)),
+                AggregationWithHeaders.make("value2", new RecordHeaders())));
+        when(mockedSessionStoreWithHeaders.findSessions("key1", 10L, 40L))
+            .thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.findSessions("key1", 10L, 40L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(10L, 20L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new SessionWindow(30L, 40L)), "value2")));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnBackwardFindSessions() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(30L, 40L)),
+                AggregationWithHeaders.make("value1", new RecordHeaders())));
+        when(mockedSessionStoreWithHeaders.backwardFindSessions("key1", 10L, 
40L))
+            .thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.backwardFindSessions("key1", 10L, 40L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(30L, 40L)), "value1")));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnFindSessionsWithKeyRange() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(10L, 20L)),
+                AggregationWithHeaders.make("value1", new RecordHeaders())));
+        when(mockedSessionStoreWithHeaders.findSessions("key1", "key2", 10L, 
40L))
+            .thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.findSessions("key1", "key2", 10L, 40L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(10L, 20L)), "value1")));
+    }
+
+    @Test
+    public void 
shouldReturnStrippedKeyValuePairsOnBackwardFindSessionsWithKeyRange() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new SessionWindow(30L, 40L)),
+                AggregationWithHeaders.make("value2", new RecordHeaders())));
+        when(mockedSessionStoreWithHeaders.backwardFindSessions("key1", 
"key2", 10L, 40L))
+            .thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.backwardFindSessions("key1", "key2", 
10L, 40L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new SessionWindow(30L, 40L)), "value2")));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnFetch() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(10L, 20L)),
+                AggregationWithHeaders.make("value1", new RecordHeaders())))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(30L, 40L)),
+                AggregationWithHeaders.make("value2", new RecordHeaders())));
+        
when(mockedSessionStoreWithHeaders.fetch("key1")).thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.fetch("key1");
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(10L, 20L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(30L, 40L)), "value2")));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnBackwardFetch() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(30L, 40L)),
+                AggregationWithHeaders.make("value1", new RecordHeaders())));
+        
when(mockedSessionStoreWithHeaders.backwardFetch("key1")).thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.backwardFetch("key1");
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(30L, 40L)), "value1")));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnFetchWithKeyRange() {
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new SessionWindow(10L, 20L)),
+                AggregationWithHeaders.make("value1", new RecordHeaders())));
+        when(mockedSessionStoreWithHeaders.fetch("key1", 
"key2")).thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.fetch("key1", "key2");
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new SessionWindow(10L, 20L)), "value1")));
+    }
+
+    @Test
+    public void shouldReturnStrippedKeyValuePairsOnBackwardFetchWithKeyRange() 
{
+        when(mockedIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new SessionWindow(30L, 40L)),
+                AggregationWithHeaders.make("value2", new RecordHeaders())));
+        when(mockedSessionStoreWithHeaders.backwardFetch("key1", 
"key2")).thenReturn(mockedIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlySessionStoreFacade.backwardFetch("key1", "key2");
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new SessionWindow(30L, 40L)), "value2")));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
index c306bab040c..87770940cf8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java
@@ -161,7 +161,7 @@ public class SessionStoreFetchTest {
                 expectedRecords.iterator() :
                 expectedRecords.descendingIterator();
 
-            TestUtils.checkEquals(scanIterator, dataIterator);
+            TestUtils.checkEquals(dataIterator, scanIterator);
         }
 
         try (final KeyValueIterator<Windowed<String>, Long> scanIterator = 
forward ?
@@ -172,7 +172,7 @@ public class SessionStoreFetchTest {
                 expectedRecords.iterator() :
                 expectedRecords.descendingIterator();
 
-            TestUtils.checkEquals(scanIterator, dataIterator);
+            TestUtils.checkEquals(dataIterator, scanIterator);
         }
     }
 
@@ -185,7 +185,7 @@ public class SessionStoreFetchTest {
                 expectedRecords.iterator() :
                 expectedRecords.descendingIterator();
 
-            TestUtils.checkEquals(scanIterator, dataIterator);
+            TestUtils.checkEquals(dataIterator, scanIterator);
         }
 
         try (final KeyValueIterator<Windowed<String>, Long> scanIterator = 
forward ?
@@ -196,7 +196,7 @@ public class SessionStoreFetchTest {
                 expectedRecords.iterator() :
                 expectedRecords.descendingIterator();
 
-            TestUtils.checkEquals(scanIterator, dataIterator);
+            TestUtils.checkEquals(dataIterator, scanIterator);
         }
     }
 
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 20d81be52f9..4fa9117cee7 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -76,12 +76,14 @@ import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -93,6 +95,7 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
 import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
+import org.apache.kafka.streams.state.internals.SessionStoreIteratorFacade;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.test.TestRecord;
 
@@ -1195,6 +1198,9 @@ public class TopologyTestDriver implements Closeable {
     @SuppressWarnings("unchecked")
     public <K, V> SessionStore<K, V> getSessionStore(final String name) {
         final StateStore store = getStateStore(name, false);
+        if (store instanceof SessionStoreWithHeaders) {
+            return new SessionStoreFacade<>((SessionStoreWithHeaders<K, V>) 
store);
+        }
         return store instanceof SessionStore ? (SessionStore<K, V>) store : 
null;
     }
 
@@ -1436,4 +1442,115 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
+    static class SessionStoreFacade<K, V> implements SessionStore<K, V> {
+        private final SessionStoreWithHeaders<K, V> inner;
+
+        SessionStoreFacade(final SessionStoreWithHeaders<K, V> inner) {
+            this.inner = inner;
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
+                                                              final long 
earliestSessionEndTime,
+                                                              final long 
latestSessionStartTime) {
+            return new SessionStoreIteratorFacade<>(inner.findSessions(key, 
earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
key,
+                                                                      final 
long earliestSessionEndTime,
+                                                                      final 
long latestSessionStartTime) {
+            return new 
SessionStoreIteratorFacade<>(inner.backwardFindSessions(key, 
earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
+                                                              final K keyTo,
+                                                              final long 
earliestSessionEndTime,
+                                                              final long 
latestSessionStartTime) {
+            return new 
SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
keyFrom,
+                                                                      final K 
keyTo,
+                                                                      final 
long earliestSessionEndTime,
+                                                                      final 
long latestSessionStartTime) {
+            return new 
SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public V fetchSession(final K key,
+                              final long sessionStartTime,
+                              final long sessionEndTime) {
+            return 
AggregationWithHeaders.getAggregationOrNull(inner.fetchSession(key, 
sessionStartTime, sessionEndTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> findSessions(final long 
earliestSessionEndTime,
+                                                              final long 
latestSessionEndTime) {
+            return new 
SessionStoreIteratorFacade<>(inner.findSessions(earliestSessionEndTime, 
latestSessionEndTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+            return new SessionStoreIteratorFacade<>(inner.fetch(key));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
+            return new SessionStoreIteratorFacade<>(inner.backwardFetch(key));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K 
keyTo) {
+            return new SessionStoreIteratorFacade<>(inner.fetch(keyFrom, 
keyTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom, 
final K keyTo) {
+            return new 
SessionStoreIteratorFacade<>(inner.backwardFetch(keyFrom, keyTo));
+        }
+
+        @Override
+        public void remove(final Windowed<K> sessionKey) {
+            inner.remove(sessionKey);
+        }
+
+        @Override
+        public void put(final Windowed<K> sessionKey, final V aggregate) {
+            inner.put(sessionKey, AggregationWithHeaders.make(aggregate, new 
RecordHeaders()));
+        }
+
+        @Override
+        public String name() {
+            return inner.name();
+        }
+
+        @Override
+        public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+            inner.init(stateStoreContext, root);
+        }
+
+        @Override
+        public void close() {
+            inner.close();
+        }
+
+        @Override
+        public boolean persistent() {
+            return inner.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return inner.isOpen();
+        }
+
+        @Override
+        public Position getPosition() {
+            return inner.getPosition();
+        }
+    }
+
 }

Reply via email to