This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1ad463d9cc2 KAFKA-20318: Add time ordered session store support to the 
DSL (#21794)
1ad463d9cc2 is described below

commit 1ad463d9cc2ea53c3daaef79f7b694530a83bd31
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Mar 19 14:28:43 2026 -0400

    KAFKA-20318: Add time ordered session store support to the DSL (#21794)
    
    This PR fixes a bug where
    `RocksDbTimeOrderedSessionBytesStoreSupplier.get` with
    `withHeaders=true` created segments using `KeyValueSegment` (default-CF
    only) instead of `SessionSegmentWithHeaders` (dual-CF with lazy
    migration), which would cause the upgrade path from non-headers to
    headers format to fail. The fix introduces
    `RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders`, widens
    `RocksDBTimeOrderedSessionStore` to accept either segment type, and
    updates the supplier to create the correct bytes store. Upgrade tests
    validate lazy migration at both the store layer and DSL supplier path,
    plus a `TopologyTestDriver` end-to-end test exercises  ON_WINDOW_CLOSE
    with both PLAIN and HEADERS formats.
    
    Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
    <[email protected]>
    
    Co-authored-by: Matthias J. Sax <[email protected]>
---
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java | 106 ++++--
 .../streams/state/internals/AbstractSegments.java  |   4 +-
 .../streams/state/internals/HasNextCondition.java  |   2 +-
 .../streams/state/internals/KeyValueSegment.java   |   2 +-
 .../state/internals/LogicalKeyValueSegment.java    |   4 +-
 .../state/internals/LogicalKeyValueSegments.java   |   5 +-
 .../internals/RocksDBSessionStoreWithHeaders.java  |   3 +-
 .../RocksDBTimeOrderedKeyValueBytesStore.java      |  41 +--
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |  73 +---
 ...eredSessionSegmentedBytesStoreWithHeaders.java} |  51 ++-
 .../internals/RocksDBTimeOrderedSessionStore.java  |   4 +-
 .../RocksDBTimeOrderedSessionStoreWithHeaders.java |   5 +-
 ...ocksDBTimeOrderedWindowSegmentedBytesStore.java |  81 ++--
 .../internals/RocksDBTimeOrderedWindowStore.java   |   6 +-
 .../RocksDBTimeOrderedWindowStoreWithHeaders.java  |   5 +-
 ...IndexedTimeOrderedWindowBytesStoreSupplier.java |   4 +-
 ...ocksDbTimeOrderedSessionBytesStoreSupplier.java |  23 +-
 .../streams/state/internals/SegmentIterator.java   |   2 +-
 .../kafka/streams/state/internals/Segments.java    |   2 +-
 .../internals/TimeOrderedCachingWindowStore.java   |   6 +-
 .../TimestampedWindowStoreWithHeadersBuilder.java  |   3 -
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  46 ++-
 .../internals/AbstractSessionBytesStoreTest.java   |  18 +-
 .../internals/LogicalKeyValueSegmentsTest.java     |   6 +-
 ...ksDBTimeOrderedSessionStoreWithHeadersTest.java |   2 +-
 ...xedTimeOrderedWindowBytesStoreSupplierTest.java |  16 +-
 ...imeOrderedCachingPersistentWindowStoreTest.java |   7 +-
 .../TimeOrderedSessionStoreUpgradeTest.java        | 410 +++++++++++++++++++++
 .../internals/TimeOrderedWindowStoreTest.java      |   8 +-
 .../TimestampedWindowStoreBuilderTest.java         |   3 +-
 30 files changed, 679 insertions(+), 269 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index e87403b4289..bec19c91abc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -16,18 +16,26 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
-import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
 
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
+import java.util.function.Function;
 
 /**
  * RocksDB store backed by two SegmentedBytesStores which can optimize scan by 
time as well as window
@@ -59,9 +67,11 @@ import java.util.Optional;
  * @see RocksDBTimeOrderedWindowSegmentedBytesStore
  */
 public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends 
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
 
-    abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, 
byte[]> {
+    private long minTimestamp;
+
+    public abstract class IndexToBaseStoreIterator implements 
KeyValueIterator<Bytes, byte[]> {
         private final KeyValueIterator<Bytes, byte[]> indexIterator;
         private byte[] cachedValue;
 
@@ -115,32 +125,64 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends Se
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
-
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
                                                   final long retention,
                                                   final KeySchema 
baseKeySchema,
                                                   final Optional<KeySchema> 
indexKeySchema,
                                                   final AbstractSegments<S> 
segments) {
         super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+        minTimestamp = Long.MAX_VALUE;
+    }
+
+    Map<S, WriteBatch> getWriteBatches(
+        final Collection<ConsumerRecord<byte[], byte[]>> records,
+        final Function<byte[], Long> timestampExtractor,
+        final Function<byte[], byte[]> indexedKeyExtractor,
+        final Function<byte[], byte[]> baseKeyExtractor
+    ) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = timestampExtractor.apply(record.key());
+            minTimestamp = Math.min(minTimestamp, timestamp);
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+        }
+
+        final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = timestampExtractor.apply(record.key());
+            final long segmentId = segments.segmentId(timestamp);
+            final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
internalProcessorContext, observedStreamTime);
+            if (segment != null) {
+                
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
+                    record,
+                    consistencyEnabled,
+                    position
+                );
+                try {
+                    final WriteBatch batch = 
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
+
+                    // Assuming changelog record is serialized using 
SessionKeySchema
+                    // from ChangeLoggingSessionBytesStore. Reconstruct 
key/value to restore
+                    if (hasIndex()) {
+                        final byte[] indexKey = 
indexedKeyExtractor.apply(record.key());
+                        // Take care of tombstone
+                        final byte[] value = record.value() == null ? null : 
new byte[0];
+                        segment.addToBatch(new KeyValue<>(indexKey, value), 
batch);
+                    }
+
+                    final byte[] baseKey = 
baseKeyExtractor.apply(record.key());
+                    segment.addToBatch(new KeyValue<>(baseKey, 
record.value()), batch);
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error restoring batch 
to store " + name(), e);
+                }
+            }
+        }
+        return writeBatchMap;
+    }
+
+    protected long minTimestamp() {
+        return minTimestamp;
     }
 
     @Override
@@ -167,6 +209,22 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends Se
         throw new UnsupportedOperationException("This store does not support 
fetch with timestamp and seqnum");
     }
 
+    public byte[] fetchSession(final Bytes key, final long sessionStartTime, 
final long sessionEndTime) {
+        throw new UnsupportedOperationException("This store does not support 
fetchSession");
+    }
+
+    public KeyValueIterator<Bytes, byte[]> fetchSessions(final long 
earliestSessionEndTime, final long latestSessionEndTime) {
+        throw new UnsupportedOperationException("This store does not support 
fetchSessions");
+    }
+
+    public void remove(final Windowed<Bytes> key) {
+        throw new UnsupportedOperationException("This store does not support 
remove with Windowed key");
+    }
+
+    public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+        throw new UnsupportedOperationException("This store does not support 
put with Windowed key");
+    }
+
     KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
                                           final long from,
                                           final long to,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index ae292a44f8b..f386b24c18f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -38,7 +38,7 @@ import java.util.NavigableMap;
 import java.util.SimpleTimeZone;
 import java.util.TreeMap;
 
-abstract class AbstractSegments<S extends Segment> implements Segments<S> {
+public abstract class AbstractSegments<S extends Segment> implements 
Segments<S> {
     private static final Logger log = 
LoggerFactory.getLogger(AbstractSegments.class);
 
     final TreeMap<Long, S> segments = new TreeMap<>();
@@ -184,7 +184,7 @@ abstract class AbstractSegments<S extends Segment> 
implements Segments<S> {
         }
     }
 
-    @SuppressWarnings("deprecation")
+    @Deprecated
     @Override
     public boolean managesOffsets() {
         return true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
index 8784dbad1bc..0b60cd13f34 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
@@ -19,6 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
-interface HasNextCondition {
+public interface HasNextCondition {
     boolean hasNext(final KeyValueIterator<Bytes, ?> iterator);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 9935b80abf1..69400f497ff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
-class KeyValueSegment extends RocksDBStore implements Segment {
+public class KeyValueSegment extends RocksDBStore implements Segment {
     private final long id;
 
     KeyValueSegment(final String segmentName,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index 1adbac447b0..ba6215ecca0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -53,7 +53,7 @@ import static 
org.apache.kafka.streams.state.internals.RocksDBStore.incrementWit
  * stores a key into a shared physical store by prepending the key with a 
prefix (unique to
  * the specific logical segment), and storing the combined key into the 
physical store.
  */
-class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
+public class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
     private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
 
     private final long id;
@@ -156,7 +156,7 @@ class LogicalKeyValueSegment implements Segment, 
VersionedStoreSegment {
             iterators = new HashSet<>(openIterators);
             openIterators.clear();
         }
-        if (iterators.size() != 0) {
+        if (!iterators.isEmpty()) {
             log.warn("Closing {} open iterators for store {}", 
iterators.size(), name);
             for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
                 iterator.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index fbf7722629c..01c91efd602 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -86,8 +86,8 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
     }
 
     // VisibleForTesting
-    LogicalKeyValueSegment getReservedSegment(final long segmentId) {
-        return reservedSegments.get(segmentId);
+    LogicalKeyValueSegment getReservedSegment() {
+        return reservedSegments.get(-1L);
     }
 
     @Override
@@ -108,6 +108,7 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
     }
 
     @SuppressWarnings("deprecation")
+    @Deprecated
     @Override
     public boolean managesOffsets() {
         return physicalStore.managesOffsets();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
index 77fb98fabff..7e171ba952a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -27,7 +28,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
  * RocksDB-backed session store with support for record headers.
  * <p>
  * This store extends {@link RocksDBSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,
  * as IQv2 query handling is done at the metered layer.
  * <p>
  * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
index 48ea130ab31..37c6dce156b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
@@ -19,15 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
-import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
 import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
 
-import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -36,8 +32,6 @@ import java.util.Optional;
  */
 public class RocksDBTimeOrderedKeyValueBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
 
-    private long minTimestamp;
-
     RocksDBTimeOrderedKeyValueBytesStore(final String name,
                                          final String metricsScope) {
         super(name,
@@ -45,7 +39,6 @@ public class RocksDBTimeOrderedKeyValueBytesStore extends 
AbstractRocksDBTimeOrd
             new TimeFirstWindowKeySchema(),
             Optional.empty(),
             new KeyValueSegments(name, metricsScope, Long.MAX_VALUE, 
Long.MAX_VALUE));
-        minTimestamp = Long.MAX_VALUE;
     }
 
     @Override
@@ -55,31 +48,12 @@ public class RocksDBTimeOrderedKeyValueBytesStore extends 
AbstractRocksDBTimeOrd
 
     @Override
     Map<KeyValueSegment, WriteBatch> getWriteBatches(final 
Collection<ConsumerRecord<byte[], byte[]>> records) {
-        final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
-        for (final ConsumerRecord<byte[], byte[]> record : records) {
-            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(record.key());
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-            minTimestamp = Math.min(minTimestamp, timestamp);
-            final long segmentId = segments.segmentId(timestamp);
-            final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, 
observedStreamTime);
-            if (segment != null) {
-                //null segment is if it has expired, so  we don't want those 
records
-                
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
-                    record,
-                    consistencyEnabled,
-                    position
-                );
-                try {
-                    final WriteBatch batch = 
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
-
-                    final byte[] baseKey = 
TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
-                    segment.addToBatch(new KeyValue<>(baseKey, 
record.value()), batch);
-                } catch (final RocksDBException e) {
-                    throw new ProcessorStateException("Error restoring batch 
to store " + name(), e);
-                }
-            }
-        }
-        return writeBatchMap;
+        return getWriteBatches(
+            records,
+            WindowKeySchema::extractStoreTimestamp,
+            null, // never an indexed store -- not needed
+            TimeFirstWindowKeySchema::fromNonPrefixWindowKey
+        );
     }
 
     @Override
@@ -87,7 +61,4 @@ public class RocksDBTimeOrderedKeyValueBytesStore extends 
AbstractRocksDBTimeOrd
         throw new UnsupportedOperationException("Do not use for 
TimeOrderedKeyValueStore");
     }
 
-    protected long minTimestamp() {
-        return minTimestamp;
-    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index 3e1f4c5ea5c..dbb7713ab35 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -19,19 +19,15 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import 
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema;
 import 
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema;
 
-import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -39,7 +35,7 @@ import java.util.Optional;
 /**
  * A RocksDB backed time-ordered segmented bytes store for session key schema.
  */
-public class RocksDBTimeOrderedSessionSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
+public class RocksDBTimeOrderedSessionSegmentedBytesStore<S extends Segment> 
extends AbstractRocksDBTimeOrderedSegmentedBytesStore<S> {
 
     private class SessionKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
         SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
@@ -50,23 +46,24 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
         protected Bytes getBaseKey(final Bytes indexKey) {
             final Window window = 
KeyFirstSessionKeySchema.extractWindow(indexKey.get());
             final byte[] key = 
KeyFirstSessionKeySchema.extractKeyBytes(indexKey.get());
-
             return TimeFirstSessionKeySchema.toBinary(Bytes.wrap(key), 
window.start(), window.end());
         }
     }
 
     RocksDBTimeOrderedSessionSegmentedBytesStore(final String name,
-                                                 final String metricsScope,
                                                  final long retention,
-                                                 final long segmentInterval,
-                                                 final boolean withIndex) {
-        super(name,
+                                                 final boolean withIndex,
+                                                 final AbstractSegments<S> 
segments) {
+        super(
+            name,
             retention,
             new TimeFirstSessionKeySchema(),
             Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() : 
null),
-            new KeyValueSegments(name, metricsScope, retention, 
segmentInterval));
+            segments
+        );
     }
 
+    @Override
     public byte[] fetchSession(final Bytes key,
                                final long sessionStartTime,
                                final long sessionEndTime) {
@@ -77,9 +74,10 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
         ));
     }
 
+    @Override
     public KeyValueIterator<Bytes, byte[]> fetchSessions(final long 
earliestSessionEndTime,
                                                          final long 
latestSessionEndTime) {
-        final List<KeyValueSegment> searchSpace = 
segments.segments(earliestSessionEndTime, latestSessionEndTime, true);
+        final List<S> searchSpace = segments.segments(earliestSessionEndTime, 
latestSessionEndTime, true);
 
         // here we want [0, latestSE, FF] as the upper bound to cover any 
possible keys,
         // but since we can only get upper bound based on timestamps, we use a 
slight larger upper bound as [0, latestSE+1]
@@ -107,10 +105,12 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
                 true);
     }
 
+    @Override
     public void remove(final Windowed<Bytes> key) {
         remove(TimeFirstSessionKeySchema.toBinary(key));
     }
 
+    @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
         put(TimeFirstSessionKeySchema.toBinary(sessionKey), aggregate);
     }
@@ -123,50 +123,17 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
     }
 
     @Override
-    Map<KeyValueSegment, WriteBatch> getWriteBatches(
-        final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
-        for (final ConsumerRecord<byte[], byte[]> record : records) {
-            final long timestamp = 
SessionKeySchema.extractEndTimestamp(record.key());
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-        }
-
-        final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
-        for (final ConsumerRecord<byte[], byte[]> record : records) {
-            final long timestamp = 
SessionKeySchema.extractEndTimestamp(record.key());
-            final long segmentId = segments.segmentId(timestamp);
-            final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, 
observedStreamTime);
-            if (segment != null) {
-                
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
-                    record,
-                    consistencyEnabled,
-                    position
-                );
-                try {
-                    final WriteBatch batch = 
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
-
-                    // Assuming changelog record is serialized using 
SessionKeySchema
-                    // from ChangeLoggingSessionBytesStore. Reconstruct 
key/value to restore
-                    if (hasIndex()) {
-                        final byte[] indexKey = 
KeyFirstSessionKeySchema.prefixNonPrefixSessionKey(record.key());
-                        // Take care of tombstone
-                        final byte[] value = record.value() == null ? null : 
new byte[0];
-                        segment.addToBatch(new KeyValue<>(indexKey, value), 
batch);
-                    }
-
-                    final byte[] baseKey = 
TimeFirstSessionKeySchema.extractWindowBytesFromNonPrefixSessionKey(record.key());
-                    segment.addToBatch(new KeyValue<>(baseKey, 
record.value()), batch);
-                } catch (final RocksDBException e) {
-                    throw new ProcessorStateException("Error restoring batch 
to store " + name(), e);
-                }
-            }
-        }
-        return writeBatchMap;
+    Map<S, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], 
byte[]>> records) {
+        return getWriteBatches(
+            records,
+            SessionKeySchema::extractEndTimestamp,
+            KeyFirstSessionKeySchema::prefixNonPrefixSessionKey,
+            
TimeFirstSessionKeySchema::extractWindowBytesFromNonPrefixSessionKey
+        );
     }
 
     @Override
-    protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(
-        final SegmentIterator<KeyValueSegment> segmentIterator) {
+    protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(final 
SegmentIterator<S> segmentIterator) {
         return new SessionKeySchemaIndexToBaseStoreIterator(segmentIterator);
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java
similarity index 51%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java
index aa513138d9f..4ac63da1110 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java
@@ -21,29 +21,56 @@ import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
-import org.apache.kafka.streams.state.HeadersBytesStore;
 
 /**
- * RocksDB-backed time-ordered session store with support for record headers.
+ * A RocksDB-backed time-ordered segmented bytes store with headers support 
for session key schema.
  * <p>
- * This store extends {@link RocksDBTimeOrderedSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
- * as IQv2 query handling is done at the metered layer.
+ * This store extends {@link AbstractRocksDBTimeOrderedSegmentedBytesStore} 
and uses
+ * {@link SessionSegmentsWithHeaders} to manage segments with full header 
support,
+ * including Column Family management and lazy migration from legacy formats.
  * <p>
- * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
+ * The store maintains a dual-schema architecture:
+ * <ul>
+ *   <li>Base store: Time-first session key schema for efficient time-range 
queries</li>
+ *   <li>Index store (optional): Key-first session key schema for efficient 
key-based queries</li>
+ * </ul>
+ * <p>
+ * Headers are managed at the segment level by {@link 
SessionSegmentWithHeaders}.
+ * <p>
+ * Value format (timestamps are in the key, not in the value):
+ * <ul>
+ *   <li>Old format: {@code [aggregationBytes]}</li>
+ *   <li>New format: {@code 
[headersSize(varint)][headersBytes][aggregationBytes]}</li>
+ * </ul>
  *
  * @see RocksDBTimeOrderedSessionStore
+ * @see SessionSegmentsWithHeaders
+ * @see SessionSegmentWithHeaders
  */
-class RocksDBTimeOrderedSessionStoreWithHeaders extends 
RocksDBTimeOrderedSessionStore implements HeadersBytesStore {
+class RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders
+    extends 
RocksDBTimeOrderedSessionSegmentedBytesStore<SessionSegmentWithHeaders> {
 
-    RocksDBTimeOrderedSessionStoreWithHeaders(final 
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
-        super(store);
+    RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders(
+        final String name,
+        final String metricsScope,
+        final long retention,
+        final long segmentInterval,
+        final boolean withIndex
+    ) {
+        super(
+            name,
+            retention,
+            withIndex,
+            new SessionSegmentsWithHeaders(name, metricsScope, retention, 
segmentInterval)
+        );
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
         final QueryResult<R> result;
         final Position position = getPosition();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index cf46e2d5f2d..9e2caca3699 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -31,12 +31,12 @@ import 
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFi
 import java.util.Objects;
 
 public class RocksDBTimeOrderedSessionStore
-    extends WrappedStateStore<RocksDBTimeOrderedSessionSegmentedBytesStore, 
Object, Object>
+    extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? 
extends Segment>, Object, Object>
     implements SessionStore<Bytes, byte[]> {
 
     private StateStoreContext stateStoreContext;
 
-    RocksDBTimeOrderedSessionStore(final 
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
+    RocksDBTimeOrderedSessionStore(final 
AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment> store) {
         super(store);
         Objects.requireNonNull(store, "store is null");
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
index aa513138d9f..e4e517dc1e2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -27,7 +28,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
  * RocksDB-backed time-ordered session store with support for record headers.
  * <p>
  * This store extends {@link RocksDBTimeOrderedSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,
  * as IQv2 query handling is done at the metered layer.
  * <p>
  * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
@@ -36,7 +37,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
  */
 class RocksDBTimeOrderedSessionStoreWithHeaders extends 
RocksDBTimeOrderedSessionStore implements HeadersBytesStore {
 
-    RocksDBTimeOrderedSessionStoreWithHeaders(final 
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
+    RocksDBTimeOrderedSessionStoreWithHeaders(final 
RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders store) {
         super(store);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
index 8829c7fb063..249933df1ff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
@@ -19,16 +19,13 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
-import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
 import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
 
-import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -43,15 +40,37 @@ import java.util.Optional;
  */
 public class RocksDBTimeOrderedWindowSegmentedBytesStore<S extends Segment> 
extends AbstractRocksDBTimeOrderedSegmentedBytesStore<S> {
 
+    /**
+     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
+     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
+     * <p>
+     * This can be reused by both window store implementations (with and 
without headers).
+     */
+    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
+        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
+            super(indexIterator);
+        }
+
+        @Override
+        protected Bytes getBaseKey(final Bytes indexKey) {
+            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
+            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
+            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
+            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
+        }
+    }
+
     RocksDBTimeOrderedWindowSegmentedBytesStore(final String name,
                                                 final long retention,
                                                 final boolean withIndex,
                                                 final AbstractSegments<S> 
segments) {
-        super(name,
+        super(
+            name,
             retention,
             new TimeFirstWindowKeySchema(),
             Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : 
null),
-            segments);
+            segments
+        );
     }
 
     @Override
@@ -70,55 +89,21 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore<S 
extends Segment> exte
         final byte[] key = 
TimeFirstWindowKeySchema.extractStoreKeyBytes(baseKey.get());
         final long timestamp = 
TimeFirstWindowKeySchema.extractStoreTimestamp(baseKey.get());
         final int seqnum = 
TimeFirstWindowKeySchema.extractStoreSequence(baseKey.get());
-
         return KeyValue.pair(KeyFirstWindowKeySchema.toStoreKeyBinary(key, 
timestamp, seqnum), new byte[0]);
     }
 
     @Override
-    Map<S, WriteBatch> getWriteBatches(
-        final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
-        for (final ConsumerRecord<byte[], byte[]> record : records) {
-            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(record.key());
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-        }
-
-        final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
-        for (final ConsumerRecord<byte[], byte[]> record : records) {
-            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(record.key());
-            final long segmentId = segments.segmentId(timestamp);
-            final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
internalProcessorContext, observedStreamTime);
-            if (segment != null) {
-                
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
-                    record,
-                    consistencyEnabled,
-                    position
-                );
-                try {
-                    final WriteBatch batch = 
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
-
-                    // Assuming changelog record is serialized using 
WindowKeySchema
-                    // from ChangeLoggingTimestampedWindowBytesStore. 
Reconstruct key/value to restore
-                    if (hasIndex()) {
-                        final byte[] indexKey = 
KeyFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
-                        // Take care of tombstone
-                        final byte[] value = record.value() == null ? null : 
new byte[0];
-                        segment.addToBatch(new KeyValue<>(indexKey, value), 
batch);
-                    }
-
-                    final byte[] baseKey = 
TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
-                    segment.addToBatch(new KeyValue<>(baseKey, 
record.value()), batch);
-                } catch (final RocksDBException e) {
-                    throw new ProcessorStateException("Error restoring batch 
to store " + name(), e);
-                }
-            }
-        }
-        return writeBatchMap;
+    Map<S, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], 
byte[]>> records) {
+        return getWriteBatches(
+            records,
+            WindowKeySchema::extractStoreTimestamp,
+            KeyFirstWindowKeySchema::fromNonPrefixWindowKey,
+            TimeFirstWindowKeySchema::fromNonPrefixWindowKey
+        );
     }
 
     @Override
-    protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(
-        final SegmentIterator<S> segmentIterator) {
+    protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(final 
SegmentIterator<S> segmentIterator) {
         return new WindowKeySchemaIndexToBaseStoreIterator(segmentIterator);
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index 79e3541af58..ebe1f4c5614 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -35,8 +35,8 @@ import java.util.Map;
 import java.util.Objects;
 
 
-public class RocksDBTimeOrderedWindowStore
-    extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? 
extends Segment>, Object, Object>
+public class RocksDBTimeOrderedWindowStore<S extends Segment>
+    extends 
WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<S>, Object, 
Object>
     implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
 
     private final boolean retainDuplicates;
@@ -46,7 +46,7 @@ public class RocksDBTimeOrderedWindowStore
     private StateStoreContext stateStoreContext;
 
     RocksDBTimeOrderedWindowStore(
-        final AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment> 
store,
+        final AbstractRocksDBTimeOrderedSegmentedBytesStore<S> store,
         final boolean retainDuplicates,
         final long windowSize
     ) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
index f258cf57710..8d4d5a23b44 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -31,7 +32,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
  * {@link TimestampedBytesStore} (for timestamp support) and {@link 
HeadersBytesStore}
  * (for header support) marker interfaces.
  * <p>
- * This store returns {@link QueryResult#forUnknownQueryType(Query, Object)} 
for all queries,
+ * This store returns {@link QueryResult#forUnknownQueryType(Query, 
StateStore)} for all queries,
  * as IQv2 query handling is done at the metered layer.
  * <p>
  * The storage format for values is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
@@ -40,7 +41,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
  * @see HeadersBytesStore
  * @see TimestampedBytesStore
  */
-class RocksDBTimeOrderedWindowStoreWithHeaders extends 
RocksDBTimeOrderedWindowStore implements TimestampedBytesStore, 
HeadersBytesStore {
+class RocksDBTimeOrderedWindowStoreWithHeaders extends 
RocksDBTimeOrderedWindowStore<WindowSegmentWithHeaders> implements 
TimestampedBytesStore, HeadersBytesStore {
 
     RocksDBTimeOrderedWindowStoreWithHeaders(final 
RocksDBTimeOrderedWindowSegmentedBytesStore<WindowSegmentWithHeaders> store,
                                              final boolean retainDuplicates,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index 332d2d6973c..968821eb5c4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -117,7 +117,7 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
     public WindowStore<Bytes, byte[]> get() {
         switch (windowStoreType) {
             case DEFAULT_WINDOW_STORE:
-                return new RocksDBTimeOrderedWindowStore(
+                return new RocksDBTimeOrderedWindowStore<>(
                     new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         name,
                         retentionPeriod,
@@ -126,7 +126,7 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
                     retainDuplicates,
                     windowSize);
             case INDEXED_WINDOW_STORE:
-                return new RocksDBTimeOrderedWindowStore(
+                return new RocksDBTimeOrderedWindowStore<>(
                     new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         name,
                         retentionPeriod,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
index 25473fb97d7..6212e235604 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
@@ -49,17 +49,24 @@ public class RocksDbTimeOrderedSessionBytesStoreSupplier 
implements SessionBytes
 
     @Override
     public SessionStore<Bytes, byte[]> get() {
-        final RocksDBTimeOrderedSessionSegmentedBytesStore bytesStore =
-            new RocksDBTimeOrderedSessionSegmentedBytesStore(
+        if (withHeaders) {
+            final RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders 
bytesStore =
+                new RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders(
+                    name,
+                    metricsScope(),
+                    retentionPeriod,
+                    segmentIntervalMs(),
+                    withIndex
+                );
+            return new RocksDBTimeOrderedSessionStore(bytesStore);
+        }
+        final RocksDBTimeOrderedSessionSegmentedBytesStore<KeyValueSegment> 
bytesStore =
+            new RocksDBTimeOrderedSessionSegmentedBytesStore<>(
                 name,
-                metricsScope(),
                 retentionPeriod,
-                segmentIntervalMs(),
-                withIndex
+                withIndex,
+                new KeyValueSegments(name, metricsScope(), retentionPeriod, 
segmentIntervalMs())
             );
-        if (withHeaders) {
-            return new RocksDBTimeOrderedSessionStoreWithHeaders(bytesStore);
-        }
         return new RocksDBTimeOrderedSessionStore(bytesStore);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 9aabc787c89..a0c52db873b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -27,7 +27,7 @@ import java.util.NoSuchElementException;
 /**
  * Iterate over multiple KeyValueSegments
  */
-class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, 
byte[]> {
+public class SegmentIterator<S extends Segment> implements 
KeyValueIterator<Bytes, byte[]> {
 
     private final Bytes from;
     private final Bytes to;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 9c902da34a1..dbaea7dc302 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -42,7 +42,7 @@ interface Segments<S extends Segment> {
 
     void commit(final Map<TopicPartition, Long> changelogOffsets);
 
-    @SuppressWarnings("deprecation")
+    @Deprecated
     boolean managesOffsets();
 
     Long committedOffset(final TopicPartition partition);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 4ee73bdbd0b..4644f166cc1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -91,7 +91,7 @@ public class TimeOrderedCachingWindowStore
     }
 
     private void enforceWrappedStore(final WindowStore<Bytes, byte[]> 
underlying) {
-        final RocksDBTimeOrderedWindowStore timeOrderedWindowStore = 
getWrappedStore(underlying);
+        final RocksDBTimeOrderedWindowStore<?> timeOrderedWindowStore = 
getWrappedStore(underlying);
         if (timeOrderedWindowStore == null) {
             throw new IllegalArgumentException("TimeOrderedCachingWindowStore 
only supports RocksDBTimeOrderedWindowStore backed store");
         }
@@ -100,9 +100,9 @@ public class TimeOrderedCachingWindowStore
     }
 
     @SuppressWarnings("unchecked")
-    private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore 
wrapped) {
+    private RocksDBTimeOrderedWindowStore<?> getWrappedStore(final StateStore 
wrapped) {
         if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
-            return (RocksDBTimeOrderedWindowStore) wrapped;
+            return (RocksDBTimeOrderedWindowStore<?>) wrapped;
         }
         if (wrapped instanceof WrappedStateStore) {
             return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>) 
wrapped).wrapped());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 5689ad229f3..3714643d43b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -115,9 +115,6 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
         if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
             return true;
         }
-        if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {
-            return true;
-        }
         if (stateStore instanceof WrappedStateStore) {
             return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>) 
stateStore).wrapped());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index c4d1ef855f7..3b2265959fe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -183,20 +183,18 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                         new KeyValueSegments(storeName, METRICS_SCOPE, 
retention, segmentInterval)
                 );
             case SessionSchemaWithIndex:
-                return new RocksDBTimeOrderedSessionSegmentedBytesStore(
-                        storeName,
-                        METRICS_SCOPE,
-                        retention,
-                        segmentInterval,
-                        true
+                return new RocksDBTimeOrderedSessionSegmentedBytesStore<>(
+                    storeName,
+                    retention,
+                    true,
+                    new KeyValueSegments(storeName, METRICS_SCOPE, retention, 
segmentInterval)
                 );
             case SessionSchemaWithoutIndex:
-                return new RocksDBTimeOrderedSessionSegmentedBytesStore(
-                        storeName,
-                        METRICS_SCOPE,
-                        retention,
-                        segmentInterval,
-                        false
+                return new RocksDBTimeOrderedSessionSegmentedBytesStore<>(
+                    storeName,
+                    retention,
+                    false,
+                    new KeyValueSegments(storeName, METRICS_SCOPE, retention, 
segmentInterval)
                 );
             default:
                 throw new IllegalStateException("Unknown SchemaType: " + 
schemaType());
@@ -852,28 +850,28 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
         bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), 
expectedValue4);
 
         // Record expired as timestampFromRawKey = 1000 while 
observedStreamTime = 60,000 and retention = 1000.
-        final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+        final byte[] value1 = 
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
             key1, windows[0].start(), windows[0].end());
         assertNull(value1);
 
         // Record expired as timestampFromRawKey = 1000 while 
observedStreamTime = 60,000 and retention = 1000.
-        final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+        final byte[] value2 = 
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
             key1, windows[1].start(), windows[1].end());
         assertNull(value2);
 
         // expired record
         // timestampFromRawKey = 1500 while observedStreamTime = 60,000 and 
retention = 1000.
-        final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+        final byte[] value3 = 
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
             key2, windows[2].start(), windows[2].end());
         assertNull(value3);
 
         // only non-expired record
         // timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and 
retention = 1000.
-        final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+        final byte[] value4 = 
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
             key3, windows[3].start(), windows[3].end());
         assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
 
-        final byte[] noValue = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+        final byte[] noValue = 
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
             key3, 2000, 3000);
         assertNull(noValue);
     }
@@ -900,7 +898,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
         // Fetch point
         assertEquals(
             Collections.singletonList(KeyValue.pair(new Windowed<>(keyA, 
sessionWindows[0]), 10L)),
-            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSessions(100L, 100L))
+            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(100L, 100L))
         );
 
         // Fetch partial boundary
@@ -909,7 +907,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
                 KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
             ),
-            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSessions(100L, 200L))
+            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(100L, 200L))
         );
 
         // Fetch partial
@@ -918,11 +916,11 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
                 KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
             ),
-            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSessions(99L, 201L))
+            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(99L, 201L))
         );
 
         // Fetch partial
-        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 
199L)) {
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(101L, 199L)) {
             assertTrue(toListAndCloseIterator(values).isEmpty());
         }
 
@@ -933,7 +931,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
                 KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
             ),
-            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSessions(100L, 300L))
+            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(100L, 300L))
         );
 
         // Fetch all
@@ -943,13 +941,13 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
                 KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
             ),
-            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSessions(99L, 301L))
+            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(99L, 301L))
         );
 
         // Fetch all
         assertEquals(
             Collections.singletonList(KeyValue.pair(new Windowed<>(keyB, 
sessionWindows[1]), 100L)),
-            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSessions(101L, 299L))
+            
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) 
bytesStore).fetchSessions(101L, 299L))
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index d70bcc964db..a3855dd41de 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -150,28 +150,14 @@ public abstract class AbstractSessionBytesStoreTest {
             }
             case RocksDBTimeOrderedSessionStoreWithHeadersWithIndex: {
                 return Stores.sessionStoreBuilder(
-                        new 
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, 
retentionPeriod, true) {
-                            @Override
-                            public SessionStore<Bytes, byte[]> get() {
-                                return new 
RocksDBTimeOrderedSessionStoreWithHeaders(
-                                    new 
RocksDBTimeOrderedSessionSegmentedBytesStore(
-                                        name(), metricsScope(), 
retentionPeriod(), segmentIntervalMs(), true));
-                            }
-                        },
+                        new 
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, 
retentionPeriod, true, true),
                         keySerde,
                         valueSerde
                 ).build();
             }
             case RocksDBTimeOrderedSessionStoreWithHeadersWithoutIndex: {
                 return Stores.sessionStoreBuilder(
-                        new 
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, 
retentionPeriod, false) {
-                            @Override
-                            public SessionStore<Bytes, byte[]> get() {
-                                return new 
RocksDBTimeOrderedSessionStoreWithHeaders(
-                                    new 
RocksDBTimeOrderedSessionSegmentedBytesStore(
-                                        name(), metricsScope(), 
retentionPeriod(), segmentIntervalMs(), false));
-                            }
-                        },
+                        new 
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, 
retentionPeriod, false, true),
                         keySerde,
                         valueSerde
                 ).build();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 4e053da9b7c..2fbbfe4e947 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -51,7 +51,7 @@ public class LogicalKeyValueSegmentsTest {
     private static final String METRICS_SCOPE = "metrics-scope";
     private static final String DB_FILE_DIR = "rocksdb";
 
-    private InternalMockProcessorContext context;
+    private InternalMockProcessorContext<?, ?> context;
 
     private LogicalKeyValueSegments segments;
 
@@ -157,7 +157,7 @@ public class LogicalKeyValueSegmentsTest {
         final List<LogicalKeyValueSegment> allSegments = 
segments.allSegments(true);
         assertEquals(1, allSegments.size());
         assertEquals(segment2, allSegments.get(0));
-        assertEquals(reservedSegment, segments.getReservedSegment(-1));
+        assertEquals(reservedSegment, segments.getReservedSegment());
     }
 
     @Test
@@ -225,7 +225,7 @@ public class LogicalKeyValueSegmentsTest {
         segments.close();
 
         assertThat(segments.segmentForTimestamp(0), is(nullValue()));
-        assertThat(segments.getReservedSegment(-1), is(nullValue()));
+        assertThat(segments.getReservedSegment(), is(nullValue()));
         // verify iterators closed as well
         assertThrows(InvalidStateStoreException.class, all1::hasNext);
         assertThrows(InvalidStateStoreException.class, all2::hasNext);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
index 3f9c77bf30a..d9647494a66 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
@@ -64,7 +64,7 @@ public class RocksDBTimeOrderedSessionStoreWithHeadersTest {
         );
 
         sessionStore = new RocksDBTimeOrderedSessionStoreWithHeaders(
-            new RocksDBTimeOrderedSessionSegmentedBytesStore(
+            new RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders(
                 STORE_NAME,
                 "test-metrics-scope",
                 RETENTION_PERIOD,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
index ea7c2d56acf..d16df93141d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
@@ -58,26 +58,26 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
 
     @Test
     public void shouldCreateRocksDbTimeOrderedWindowStoreWithIndex() {
-        final WindowStore store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true, false).get();
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true, false).get();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
-        assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore) 
wrapped).hasIndex());
+        assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>) 
wrapped).hasIndex());
     }
 
     @Test
     public void shouldCreateRocksDbTimeOrderedWindowStoreWithoutIndex() {
-        final WindowStore store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, false, false).get();
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, false, false).get();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
-        assertFalse(((RocksDBTimeOrderedWindowSegmentedBytesStore) 
wrapped).hasIndex());
+        assertFalse(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>) 
wrapped).hasIndex());
     }
 
     @Test
     public void shouldCreateRocksDbTimeOrderedWindowStoreWithHeaders() {
-        final WindowStore store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true, true).get();
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true, true).get();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(store, 
instanceOf(RocksDBTimeOrderedWindowStoreWithHeaders.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
         assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>) 
wrapped).hasIndex());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index e73230428f5..e6bfa0b27c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -115,7 +115,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
         baseKeySchema = new TimeFirstWindowKeySchema();
         bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test", 
100, hasIndex,
             new KeyValueSegments("test", "metrics-scope", 100, 
SEGMENT_INTERVAL));
-        underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, 
WINDOW_SIZE);
+        underlyingStore = new RocksDBTimeOrderedWindowStore<>(bytesStore, 
false, WINDOW_SIZE);
         final TimeWindowedDeserializer<String> keyDeserializer = new 
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
         keyDeserializer.setIsChangelogTopic(true);
         cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new 
StringDeserializer());
@@ -136,7 +136,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
     @ValueSource(booleans = {true, false})
     public void shouldDelegateInit(final boolean hasIndex) {
         setUp(hasIndex);
-        final RocksDBTimeOrderedWindowStore inner = 
mock(RocksDBTimeOrderedWindowStore.class);
+        final RocksDBTimeOrderedWindowStore<?> inner = 
mock(RocksDBTimeOrderedWindowStore.class);
         when(inner.hasIndex()).thenReturn(hasIndex);
 
         final TimeOrderedCachingWindowStore outer = new 
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
@@ -154,7 +154,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
         assertThat(e.getMessage(),
             containsString("TimeOrderedCachingWindowStore only supports 
RocksDBTimeOrderedWindowStore backed store"));
 
-        final RocksDBTimeOrderedWindowStore inner = 
mock(RocksDBTimeOrderedWindowStore.class);
+        final RocksDBTimeOrderedWindowStore<?> inner = 
mock(RocksDBTimeOrderedWindowStore.class);
         // Nothing happens
         new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, 
SEGMENT_INTERVAL);
     }
@@ -1257,6 +1257,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
         verifyAndTearDownCloseTests();
     }
 
+    @SuppressWarnings("unchecked")
     private void setUpCloseTests() {
         underlyingStore = mock(RocksDBTimeOrderedWindowStore.class);
         when(underlyingStore.name()).thenReturn("store-name");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
new file mode 100644
index 00000000000..d9edd2cacdc
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
+import org.apache.kafka.streams.state.DslSessionParams;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.delete;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests upgrade path from time-ordered session store (without headers) to
+ * time-ordered session store with headers using direct supplier creation.
+ * <p>
+ * This test validates lazy migration from DEFAULT CF to headers CF and ensures
+ * the store can read old data after upgrade.
+ */
+public class TimeOrderedSessionStoreUpgradeTest {
+
+    private static final String STORE_NAME = "time-ordered-session-store";
+    private static final long RETENTION_MS = Duration.ofMinutes(5).toMillis();
+
+    private InternalMockProcessorContext<Bytes, byte[]> context;
+    private File baseDir;
+
+    @BeforeEach
+    public void setUp() {
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        baseDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            baseDir,
+            Serdes.Bytes(),
+            Serdes.ByteArray(),
+            new StreamsConfig(props)
+        );
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (baseDir != null) {
+            try {
+                delete(baseDir);
+            } catch (final Exception e) {
+                // Ignore
+            }
+        }
+    }
+
+    private static byte[] serializeValueWithHeaders(final byte[] value, final 
Headers headers) {
+        if (value == null) {
+            return null;
+        }
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
+        final byte[] rawHeaders = HeadersSerializer
+            .serialize(preSerializedHeaders, 
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders))
+            .array();
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+            ByteUtils.writeVarint(rawHeaders.length, out);
+            out.write(rawHeaders);
+            out.write(value);
+            return baos.toByteArray();
+        } catch (final IOException e) {
+            throw new RuntimeException("Failed to serialize value with 
headers", e);
+        }
+    }
+
+    @Test
+    public void shouldMigrateFromWithoutHeadersToWithHeaders() {
+        final RocksDbTimeOrderedSessionBytesStoreSupplier oldSupplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, true, false);
+
+        final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+        final Bytes key2 = Bytes.wrap("key2".getBytes());
+        final Bytes key3 = Bytes.wrap("key3".getBytes());
+
+        oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)), 
"value1".getBytes());
+        oldStore.put(new Windowed<>(key2, new SessionWindow(150, 250)), 
"value2".getBytes());
+        oldStore.put(new Windowed<>(key3, new SessionWindow(200, 300)), 
"value3".getBytes());
+
+        // Verify old data
+        assertEquals("value1", new String(oldStore.fetchSession(key1, 100, 
200)));
+        assertEquals("value2", new String(oldStore.fetchSession(key2, 150, 
250)));
+        assertEquals("value3", new String(oldStore.fetchSession(key3, 200, 
300)));
+
+        oldStore.close();
+
+        // Reopen with headers
+        final RocksDbTimeOrderedSessionBytesStoreSupplier newSupplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, true, true);
+
+        final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify old data readable with empty headers via lazy migration
+        byte[] fetch = newStore.fetchSession(key1, 100, 200);
+        assertNotNull(fetch);
+        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
+
+        fetch = newStore.fetchSession(key2, 150, 250);
+        assertNotNull(fetch);
+        assertEquals("value2", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
+
+        fetch = newStore.fetchSession(key3, 200, 300);
+        assertNotNull(fetch);
+        assertEquals("value3", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
+
+        newStore.close();
+    }
+
+    @Test
+    public void shouldMigrateFromWithIndexToWithIndexAndHeaders() {
+        final RocksDbTimeOrderedSessionBytesStoreSupplier oldSupplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, true, false);
+
+        final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+        oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)), 
"value1".getBytes());
+        oldStore.close();
+
+        // Upgrade to headers
+        final RocksDbTimeOrderedSessionBytesStoreSupplier newSupplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, true, true);
+
+        final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify old data still accessible
+        final byte[] fetch = newStore.fetchSession(key1, 100, 200);
+        assertNotNull(fetch);
+        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+
+        newStore.close();
+    }
+
+    @Test
+    public void shouldMigrateFromWithoutIndexToWithIndexAndHeaders() {
+        final RocksDbTimeOrderedSessionBytesStoreSupplier oldSupplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, false, false);
+
+        final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+        oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)), 
"value1".getBytes());
+        oldStore.close();
+
+        // Upgrade to both index and headers
+        final RocksDbTimeOrderedSessionBytesStoreSupplier newSupplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, true, true);
+
+        final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify old data still accessible
+        final byte[] fetch = newStore.fetchSession(key1, 100, 200);
+        assertNotNull(fetch);
+        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+
+        newStore.close();
+    }
+
+    @Test
+    public void shouldWriteAndReadWithHeaders() {
+        // Start fresh with headers
+        final RocksDbTimeOrderedSessionBytesStoreSupplier supplier =
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, true, true);
+
+        final SessionStore<Bytes, byte[]> store = supplier.get();
+        store.init(context, store);
+
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+        final Bytes key2 = Bytes.wrap("key2".getBytes());
+
+        // Write with empty headers
+        store.put(new Windowed<>(key1, new SessionWindow(100, 200)),
+            serializeValueWithHeaders("value1".getBytes(), new 
RecordHeaders()));
+
+        // Write with actual headers
+        final RecordHeaders headersWithData = new RecordHeaders();
+        headersWithData.add("header-key-1", "header-value-1".getBytes());
+        headersWithData.add("header-key-2", "header-value-2".getBytes());
+        store.put(new Windowed<>(key2, new SessionWindow(150, 250)),
+            serializeValueWithHeaders("value2".getBytes(), headersWithData));
+
+        // Verify values
+        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key1,
 100, 200))));
+        assertEquals("value2", new 
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key2,
 150, 250))));
+
+        // Verify headers for key1 (empty)
+        final Headers key1Headers = 
AggregationWithHeadersDeserializer.headers(store.fetchSession(key1, 100, 200));
+        assertEquals(0, key1Headers.toArray().length);
+
+        // Verify headers for key2 (with data)
+        final Headers key2Headers = 
AggregationWithHeadersDeserializer.headers(store.fetchSession(key2, 150, 250));
+        assertEquals(2, key2Headers.toArray().length);
+        assertEquals("header-value-1", new 
String(key2Headers.lastHeader("header-key-1").value()));
+        assertEquals("header-value-2", new 
String(key2Headers.lastHeader("header-key-2").value()));
+
+        // Verify findSessions works
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iter = 
store.findSessions(100, 250)) {
+            int count = 0;
+            while (iter.hasNext()) {
+                iter.next();
+                count++;
+            }
+            assertEquals(2, count);
+        }
+
+        store.close();
+    }
+
+    // --- DSL-level supplier resolution tests ---
+
+    /**
+     * Uses {@link 
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers#sessionStore(DslSessionParams)}
+     * to create stores — the same code path the DSL materializer uses when
+     * {@code ON_WINDOW_CLOSE} is set. Validates the upgrade from PLAIN to 
HEADERS format.
+     */
+    @Test
+    public void shouldMigrateViaDslSupplierPath() {
+        final BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers dslSuppliers =
+            new BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers();
+
+        // Phase 1: create store via DSL supplier with PLAIN format
+        final SessionBytesStoreSupplier oldSupplier = 
dslSuppliers.sessionStore(
+            new DslSessionParams(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+                EmitStrategy.onWindowClose(), DslStoreFormat.PLAIN));
+
+        final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+        oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)), 
"value1".getBytes());
+
+        assertEquals("value1", new String(oldStore.fetchSession(key1, 100, 
200)));
+        oldStore.close();
+
+        // Phase 2: create store via DSL supplier with HEADERS format
+        final SessionBytesStoreSupplier newSupplier = 
dslSuppliers.sessionStore(
+            new DslSessionParams(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+                EmitStrategy.onWindowClose(), DslStoreFormat.HEADERS));
+
+        final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify old data readable with empty headers via lazy migration
+        final byte[] fetch = newStore.fetchSession(key1, 100, 200);
+        assertNotNull(fetch, "Old data should be readable after upgrade via 
DSL supplier path");
+        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length,
+            "Old data should have empty headers after migration");
+
+        newStore.close();
+    }
+
+    // --- DSL end-to-end tests via TopologyTestDriver ---
+
+    private static final String INPUT_TOPIC = "input-topic";
+    private static final String MATERIALIZED_STORE = "session-count-store";
+
+    static Stream<Arguments> dslStoreFormats() {
+        return Stream.of(
+            Arguments.of(false),
+            Arguments.of(true)
+        );
+    }
+
+    /**
+     * Builds a real DSL session aggregation topology with {@code 
ON_WINDOW_CLOSE} and verifies
+     * the full chain from DSL → materializer → supplier → store works 
correctly with both
+     * PLAIN and HEADERS formats.
+     */
+    @ParameterizedTest
+    @MethodSource("dslStoreFormats")
+    public void shouldAggregateSessionsViaDslWithOnWindowClose(final boolean 
withHeaders) {
+        final Properties dslProps = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+        if (withHeaders) {
+            dslProps.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+        }
+
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
+            .emitStrategy(EmitStrategy.onWindowClose())
+            .count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as(MATERIALIZED_STORE)
+                .withRetention(Duration.ofMinutes(5)));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(streamsBuilder.build(), dslProps)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), 
new StringSerializer());
+
+            // Send records within the same session window (gap = 500ms)
+            inputTopic.pipeInput("A", "v1", 100);
+            inputTopic.pipeInput("A", "v2", 200);
+            inputTopic.pipeInput("B", "v3", 150);
+
+            // Advance time past the session gap to close windows
+            inputTopic.pipeInput("A", "v4", 2000);
+
+            // Query the materialized store
+            final SessionStore<String, Long> store = 
driver.getSessionStore(MATERIALIZED_STORE);
+            assertNotNull(store, "Session store should be materialized");
+
+            final List<KeyValue<Windowed<String>, Long>> results = 
toList(store.fetch("A", "B"));
+            // Expect: A's session [100,200] with count=2, B's session 
[150,150] with count=1
+            // A's new session [2000,2000] may or may not be closed yet
+            boolean foundASession = false;
+            boolean foundBSession = false;
+            for (final KeyValue<Windowed<String>, Long> kv : results) {
+                if (kv.key.key().equals("A") && kv.key.window().start() == 100 
&& kv.key.window().end() == 200) {
+                    assertEquals(2L, kv.value);
+                    foundASession = true;
+                }
+                if (kv.key.key().equals("B") && kv.key.window().start() == 150 
&& kv.key.window().end() == 150) {
+                    assertEquals(1L, kv.value);
+                    foundBSession = true;
+                }
+            }
+            assertTrue(foundASession, "Should find A's session [100,200]");
+            assertTrue(foundBSession, "Should find B's session [150,150]");
+        }
+    }
+
+    private static <K, V> List<KeyValue<K, V>> toList(final 
KeyValueIterator<K, V> iterator) {
+        final List<KeyValue<K, V>> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        iterator.close();
+        return result;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 42ab3946a0e..4b77ce64439 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -104,7 +104,7 @@ public class TimeOrderedWindowStoreTest {
 
     private InternalMockProcessorContext<?, ?> context;
     private RocksDBTimeOrderedWindowSegmentedBytesStore<KeyValueSegment> 
bytesStore;
-    private RocksDBTimeOrderedWindowStore underlyingStore;
+    private RocksDBTimeOrderedWindowStore<?> underlyingStore;
     private TimeOrderedCachingWindowStore cachingStore;
     private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
     private ThreadCache cache;
@@ -114,7 +114,7 @@ public class TimeOrderedWindowStoreTest {
         baseKeySchema = new TimeFirstWindowKeySchema();
         bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test", 
100, hasIndex,
             new KeyValueSegments("test", "metrics-scope", 100, 
SEGMENT_INTERVAL));
-        underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, 
WINDOW_SIZE);
+        underlyingStore = new RocksDBTimeOrderedWindowStore<>(bytesStore, 
false, WINDOW_SIZE);
         final TimeWindowedDeserializer<String> keyDeserializer = new 
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
         keyDeserializer.setIsChangelogTopic(true);
         cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new 
StringDeserializer());
@@ -135,7 +135,7 @@ public class TimeOrderedWindowStoreTest {
     @ValueSource(booleans = {true, false})
     public void shouldDelegateInit(final boolean hasIndex) {
         setUp(hasIndex);
-        final RocksDBTimeOrderedWindowStore inner = 
mock(RocksDBTimeOrderedWindowStore.class);
+        final RocksDBTimeOrderedWindowStore<?> inner = 
mock(RocksDBTimeOrderedWindowStore.class);
         when(inner.hasIndex()).thenReturn(hasIndex);
         final TimeOrderedCachingWindowStore outer = new 
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
 
@@ -156,7 +156,7 @@ public class TimeOrderedWindowStoreTest {
         assertThat(e.getMessage(),
             containsString("TimeOrderedCachingWindowStore only supports 
RocksDBTimeOrderedWindowStore backed store"));
 
-        final RocksDBTimeOrderedWindowStore inner = 
mock(RocksDBTimeOrderedWindowStore.class);
+        final RocksDBTimeOrderedWindowStore<?> inner = 
mock(RocksDBTimeOrderedWindowStore.class);
         // Nothing happens
         new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, 
SEGMENT_INTERVAL);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index 8ddab06aac3..c403614e2b7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -58,7 +58,7 @@ public class TimestampedWindowStoreBuilderTest {
     @Mock
     private RocksDBTimestampedWindowStore timestampedStore;
     @Mock
-    private RocksDBTimeOrderedWindowStore timeOrderedStore;
+    private RocksDBTimeOrderedWindowStore<?> timeOrderedStore;
     private TimestampedWindowStoreBuilder<String, String> builder;
     private boolean isTimeOrderedStore;
     private WindowStore inner;
@@ -199,7 +199,6 @@ public class TimestampedWindowStoreBuilderTest {
     }
 
     @ValueSource(strings = {TIMESTAMP_STORE_NAME, TIMEORDERED_STORE_NAME})
-    @SuppressWarnings("unchecked")
     @ParameterizedTest
     public void shouldDisableCachingWithRetainDuplicates(final String 
storeName) {
         setUpWithoutInner(storeName);


Reply via email to