This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a6fd96c36f1 KAFKA-20317: Enable time-ordered header window store in 
DSL (#21780)
a6fd96c36f1 is described below

commit a6fd96c36f1cb24df44b739cbad9fd240d56d353
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Mar 18 23:37:18 2026 +0000

    KAFKA-20317: Enable time-ordered header window store in DSL (#21780)
    
    This PR enables time-ordered window store with headers. Part of
    KIP-1285.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../kafka/streams/state/HeadersBytesStore.java     |  27 +-
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java |  54 +++-
 .../RocksDBMigratingWindowStoreWithHeaders.java    |  97 +++++++
 .../RocksDBTimeOrderedKeyValueBytesStore.java      |   8 +-
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |   9 +-
 ...ocksDBTimeOrderedWindowSegmentedBytesStore.java |  47 ++--
 .../internals/RocksDBTimeOrderedWindowStore.java   |   4 +-
 .../RocksDBTimeOrderedWindowStoreWithHeaders.java  |  39 ++-
 ...IndexedTimeOrderedWindowBytesStoreSupplier.java |  21 +-
 .../TimestampedWindowStoreWithHeadersBuilder.java  |   3 +
 .../state/internals/WindowSegmentWithHeaders.java  |  95 +++++++
 .../state/internals/WindowSegmentsWithHeaders.java |  58 ++++
 .../internals/WindowStoreMaterializerTest.java     |   4 +-
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  14 +-
 .../internals/AbstractRocksDBWindowStoreTest.java  |  24 +-
 ...cksDBTimeOrderedWindowStoreWithHeadersTest.java | 146 ++++++++++
 ...OrderedWindowStoreWithHeadersWithIndexTest.java |  25 ++
 ...eredWindowStoreWithHeadersWithoutIndexTest.java |  25 ++
 ...xedTimeOrderedWindowBytesStoreSupplierTest.java |   2 +-
 ...imeOrderedCachingPersistentWindowStoreTest.java |   5 +-
 .../internals/TimeOrderedWindowStoreTest.java      |   5 +-
 .../TimeOrderedWindowStoreUpgradeTest.java         | 301 +++++++++++++++++++++
 22 files changed, 919 insertions(+), 94 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 67c47fbfd80..8dd74fd98ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -29,20 +29,23 @@ package org.apache.kafka.streams.state;
 public interface HeadersBytesStore {
 
     /**
-     * Converts a legacy timestamped value (ValueAndTimestamp format, without 
headers) to the header-embedded format.
+     * Converts a legacy value (without headers) to the header-embedded format 
by adding empty headers prefix.
      * <p>
-     * For timestamped stores, the legacy format is: [timestamp(8)][value]
-     * The new format is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+     * This is a general-purpose method that prepends an empty headers prefix 
to any byte array.
+     * The header-embedded format is: 
[headersSize(varint)][headersBytes][payload]
+     * where empty headers are represented as headersSize=0 (encoded as 
[0x00]).
      * <p>
-     * This method adds empty headers to the existing timestamped value format.
-     * <p>
-     * Empty headers are represented as 0 bytes (headersSize=0, no 
headersBytes),
+     * Usage examples:
+     * <ul>
+     *   <li>Timestamped stores: [timestamp(8)][value] → 
[0x00][timestamp(8)][value]</li>
+     *   <li>Window stores: [value] → [0x00][value]</li>
+     * </ul>
      *
-     * @param valueAndTimestamp the legacy timestamped value bytes
-     * @return the value in header-embedded format with empty headers
+     * @param value the legacy value bytes (may include timestamp for 
timestamped stores, or plain value for window stores)
+     * @return the value in header-embedded format with empty headers prefix 
[0x00][value]
      */
-    static byte[] convertToHeaderFormat(final byte[] valueAndTimestamp) {
-        if (valueAndTimestamp == null) {
+    static byte[] convertToHeaderFormat(final byte[] value) {
+        if (value == null) {
             return null;
         }
 
@@ -51,9 +54,9 @@ public interface HeadersBytesStore {
         //   headersSize = varint(0) = [0x00]
         //   headersBytes = [] (empty, 0 bytes)
         // Result: [0x00][payload]
-        final byte[] res = new byte[1 + valueAndTimestamp.length];
+        final byte[] res = new byte[1 + value.length];
         // res[0] is initialized to 0x00 per Java Specification
-        System.arraycopy(valueAndTimestamp, 0, res, 1, 
valueAndTimestamp.length);
+        System.arraycopy(value, 0, res, 1, value.length);
         return res;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index 5ec5e011a2b..e87403b4289 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +58,7 @@ import java.util.Optional;
  * @see RocksDBTimeOrderedSessionSegmentedBytesStore
  * @see RocksDBTimeOrderedWindowSegmentedBytesStore
  */
-public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends 
AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> {
+public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends 
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
 
     abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, 
byte[]> {
@@ -113,14 +115,32 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
+    /**
+     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
+     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
+     * <p>
+     * This can be reused by both window store implementations (with and 
without headers).
+     */
+    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
+        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
+            super(indexIterator);
+        }
+
+        @Override
+        protected Bytes getBaseKey(final Bytes indexKey) {
+            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
+            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
+            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
+            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
+        }
+    }
+
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
-                                                  final String metricsScope,
                                                   final long retention,
-                                                  final long segmentInterval,
                                                   final KeySchema 
baseKeySchema,
-                                                  final Optional<KeySchema> 
indexKeySchema) {
-        super(name, baseKeySchema, indexKeySchema,
-            new KeyValueSegments(name, metricsScope, retention, 
segmentInterval), retention);
+                                                  final Optional<KeySchema> 
indexKeySchema,
+                                                  final AbstractSegments<S> 
segments) {
+        super(name, baseKeySchema, indexKeySchema, segments, retention);
     }
 
     @Override
@@ -137,7 +157,15 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
         return fetch(key, from, to, false);
     }
 
-    protected abstract IndexToBaseStoreIterator 
getIndexToBaseStoreIterator(final SegmentIterator<KeyValueSegment> 
segmentIterator);
+    protected abstract IndexToBaseStoreIterator 
getIndexToBaseStoreIterator(final SegmentIterator<S> segmentIterator);
+
+    public void put(final Bytes key, final long timestamp, final int seqnum, 
final byte[] value) {
+        throw new UnsupportedOperationException("This store does not support 
put with timestamp and seqnum");
+    }
+
+    public byte[] fetch(final Bytes key, final long timestamp, final int 
seqnum) {
+        throw new UnsupportedOperationException("This store does not support 
fetch with timestamp and seqnum");
+    }
 
     KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
                                           final long from,
@@ -151,7 +179,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
         }
 
         if (indexKeySchema.isPresent()) {
-            final List<KeyValueSegment> searchSpace = 
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+            final List<S> searchSpace = 
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
                 forward);
 
             final Bytes binaryFrom = 
indexKeySchema.get().lowerRangeFixedSize(key, actualFrom);
@@ -166,7 +194,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
         }
 
 
-        final List<KeyValueSegment> searchSpace = 
baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+        final List<S> searchSpace = baseKeySchema.segmentsToSearch(segments, 
actualFrom, to,
             forward);
 
         final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, 
actualFrom);
@@ -216,7 +244,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
         }
 
         if (indexKeySchema.isPresent()) {
-            final List<KeyValueSegment> searchSpace = 
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+            final List<S> searchSpace = 
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
                 forward);
 
             final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, 
actualFrom);
@@ -230,7 +258,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
                 forward));
         }
 
-        final List<KeyValueSegment> searchSpace = 
baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+        final List<S> searchSpace = baseKeySchema.segmentsToSearch(segments, 
actualFrom, to,
             forward);
 
         final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom);
@@ -254,7 +282,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
             return KeyValueIterators.emptyIterator();
         }
 
-        final List<KeyValueSegment> searchSpace = 
segments.segments(actualFrom, timeTo, true);
+        final List<S> searchSpace = segments.segments(actualFrom, timeTo, 
true);
         final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
         final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
 
@@ -276,7 +304,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
             return KeyValueIterators.emptyIterator();
         }
 
-        final List<KeyValueSegment> searchSpace = 
segments.segments(actualFrom, timeTo, false);
+        final List<S> searchSpace = segments.segments(actualFrom, timeTo, 
false);
         final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
         final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
new file mode 100644
index 00000000000..e0a25e6bbc7
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-value store with headers support based on RocksDB for 
window stores.
+ * <p>
+ * This store provides a migration path from plain {@link RocksDBStore} 
(DEFAULT column family)
+ * to a headers-aware column family ({@code windowKeyValueWithHeaders}). It 
uses
+ * {@link DualColumnFamilyAccessor} for lazy migration when legacy data exists 
in the DEFAULT CF.
+ * <p>
+ * Value format:
+ * <ul>
+ *   <li>Old format (DEFAULT CF): {@code [value]}</li>
+ *   <li>New format (HEADERS CF): {@code 
[headersSize(varint)][headersBytes][value]}</li>
+ * </ul>
+ * <p>
+ * This is similar to {@link RocksDBMigratingSessionStoreWithHeaders} but for 
window stores.
+ * Unlike timestamped stores, window stores don't include timestamp in the 
value (it's in the key).
+ */
+public class RocksDBMigratingWindowStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBMigratingWindowStoreWithHeaders.class);
+
+    static final byte[] WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME =
+        "windowKeyValueWithHeaders".getBytes(StandardCharsets.UTF_8);
+
+    RocksDBMigratingWindowStoreWithHeaders(final String name,
+                                            final String parentDir,
+                                            final RocksDBMetricsRecorder 
metricsRecorder) {
+        super(name, parentDir, metricsRecorder);
+    }
+
+    @Override
+    void openRocksDB(final DBOptions dbOptions,
+                     final ColumnFamilyOptions columnFamilyOptions) {
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+            dbOptions,
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor(WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions),
+            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+        );
+        final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
+        final ColumnFamilyHandle withHeadersColumnFamily = 
columnFamilies.get(1);
+        final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);
+
+        // Check if DEFAULT CF has data (upgrade from old format without 
headers)
+        try (final RocksIterator noHeadersIter = 
db.newIterator(noHeadersColumnFamily)) {
+            noHeadersIter.seekToFirst();
+            if (noHeadersIter.isValid()) {
+                log.info("Opening window store {} in upgrade mode from plain 
value format", name);
+                // Migrate from [value] to [headers][value]
+                // Add empty headers prefix [0x00] to plain value
+                cfAccessor = new DualColumnFamilyAccessor(
+                    offsetsCf,
+                    noHeadersColumnFamily,
+                    withHeadersColumnFamily,
+                    HeadersBytesStore::convertToHeaderFormat,
+                    this,
+                    open
+                );
+            } else {
+                log.info("Opening window store {} in regular headers-aware 
mode", name);
+                cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
withHeadersColumnFamily);
+                noHeadersColumnFamily.close();
+            }
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
index e48cb6061fc..48ea130ab31 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
@@ -34,13 +34,17 @@ import java.util.Optional;
 /**
  * A RocksDB backed time-ordered segmented bytes store for window key schema.
  */
-public class RocksDBTimeOrderedKeyValueBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+public class RocksDBTimeOrderedKeyValueBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
 
     private long minTimestamp;
 
     RocksDBTimeOrderedKeyValueBytesStore(final String name,
                                          final String metricsScope) {
-        super(name, metricsScope, Long.MAX_VALUE, Long.MAX_VALUE, new 
TimeFirstWindowKeySchema(), Optional.empty());
+        super(name,
+            Long.MAX_VALUE,
+            new TimeFirstWindowKeySchema(),
+            Optional.empty(),
+            new KeyValueSegments(name, metricsScope, Long.MAX_VALUE, 
Long.MAX_VALUE));
         minTimestamp = Long.MAX_VALUE;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index bef0410dae6..3e1f4c5ea5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -39,7 +39,7 @@ import java.util.Optional;
 /**
  * A RocksDB backed time-ordered segmented bytes store for session key schema.
  */
-public class RocksDBTimeOrderedSessionSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+public class RocksDBTimeOrderedSessionSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
 
     private class SessionKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
         SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
@@ -60,8 +60,11 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
                                                  final long retention,
                                                  final long segmentInterval,
                                                  final boolean withIndex) {
-        super(name, metricsScope, retention, segmentInterval, new 
TimeFirstSessionKeySchema(),
-            Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() : 
null));
+        super(name,
+            retention,
+            new TimeFirstSessionKeySchema(),
+            Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() : 
null),
+            new KeyValueSegments(name, metricsScope, retention, 
segmentInterval));
     }
 
     public byte[] fetchSession(final Bytes key,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
index 269f273590c..8829c7fb063 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
 import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
 
@@ -35,38 +34,34 @@ import java.util.Optional;
 
 /**
  * A RocksDB backed time-ordered segmented bytes store for window key schema.
+ * <p>
+ * This class supports different segment implementations via the generic type 
parameter.
+ * It can be used with {@link KeyValueSegment} for regular stores or {@link 
WindowSegmentWithHeaders}
+ * for stores with headers support.
+ *
+ * @param <S> the segment type
  */
-public class RocksDBTimeOrderedWindowSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
-
-    private class WindowKeySchemaIndexToBaseStoreIterator  extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
+public class RocksDBTimeOrderedWindowSegmentedBytesStore<S extends Segment> 
extends AbstractRocksDBTimeOrderedSegmentedBytesStore<S> {
 
     RocksDBTimeOrderedWindowSegmentedBytesStore(final String name,
-                                                final String metricsScope,
                                                 final long retention,
-                                                final long segmentInterval,
-                                                final boolean withIndex) {
-        super(name, metricsScope, retention, segmentInterval, new 
TimeFirstWindowKeySchema(),
-            Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : 
null));
+                                                final boolean withIndex,
+                                                final AbstractSegments<S> 
segments) {
+        super(name,
+            retention,
+            new TimeFirstWindowKeySchema(),
+            Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : 
null),
+            segments);
     }
 
+    @Override
     public void put(final Bytes key, final long timestamp, final int seqnum, 
final byte[] value) {
         final Bytes baseKey = TimeFirstWindowKeySchema.toStoreKeyBinary(key, 
timestamp, seqnum);
         put(baseKey, value);
     }
 
-    byte[] fetch(final Bytes key, final long timestamp, final int seqnum) {
+    @Override
+    public byte[] fetch(final Bytes key, final long timestamp, final int 
seqnum) {
         return get(TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, 
seqnum));
     }
 
@@ -80,7 +75,7 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore 
extends AbstractRocksDB
     }
 
     @Override
-    Map<KeyValueSegment, WriteBatch> getWriteBatches(
+    Map<S, WriteBatch> getWriteBatches(
         final Collection<ConsumerRecord<byte[], byte[]>> records) {
         // advance stream time to the max timestamp in the batch
         for (final ConsumerRecord<byte[], byte[]> record : records) {
@@ -88,11 +83,11 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore 
extends AbstractRocksDB
             observedStreamTime = Math.max(observedStreamTime, timestamp);
         }
 
-        final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
+        final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             final long timestamp = 
WindowKeySchema.extractStoreTimestamp(record.key());
             final long segmentId = segments.segmentId(timestamp);
-            final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, 
observedStreamTime);
+            final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
internalProcessorContext, observedStreamTime);
             if (segment != null) {
                 
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
                     record,
@@ -123,7 +118,7 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore 
extends AbstractRocksDB
 
     @Override
     protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(
-        final SegmentIterator<KeyValueSegment> segmentIterator) {
+        final SegmentIterator<S> segmentIterator) {
         return new WindowKeySchemaIndexToBaseStoreIterator(segmentIterator);
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index 41708ed7b42..79e3541af58 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -36,7 +36,7 @@ import java.util.Objects;
 
 
 public class RocksDBTimeOrderedWindowStore
-    extends WrappedStateStore<RocksDBTimeOrderedWindowSegmentedBytesStore, 
Object, Object>
+    extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? 
extends Segment>, Object, Object>
     implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
 
     private final boolean retainDuplicates;
@@ -46,7 +46,7 @@ public class RocksDBTimeOrderedWindowStore
     private StateStoreContext stateStoreContext;
 
     RocksDBTimeOrderedWindowStore(
-        final RocksDBTimeOrderedWindowSegmentedBytesStore store,
+        final AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment> 
store,
         final boolean retainDuplicates,
         final long windowSize
     ) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
index 7c6c300617d..f258cf57710 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 
@@ -26,15 +31,10 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
  * {@link TimestampedBytesStore} (for timestamp support) and {@link 
HeadersBytesStore}
  * (for header support) marker interfaces.
  * <p>
- * The storage format for values is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * This store returns {@link QueryResult#forUnknownQueryType(Query, Object)} 
for all queries,
+ * as IQv2 query handling is done at the metered layer.
  * <p>
- * This implementation uses segment-level versioning for backward 
compatibility:
- * <ul>
- * <li>Old segments continue to use the legacy format without headers</li>
- * <li>New segments use the header-embedded format</li>
- * <li>Legacy values are served with empty headers on read</li>
- * <li>All new writes use the new format</li>
- * </ul>
+ * The storage format for values is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
  *
  * @see RocksDBTimeOrderedWindowStore
  * @see HeadersBytesStore
@@ -42,9 +42,30 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
  */
 class RocksDBTimeOrderedWindowStoreWithHeaders extends 
RocksDBTimeOrderedWindowStore implements TimestampedBytesStore, 
HeadersBytesStore {
 
-    RocksDBTimeOrderedWindowStoreWithHeaders(final 
RocksDBTimeOrderedWindowSegmentedBytesStore store,
+    RocksDBTimeOrderedWindowStoreWithHeaders(final 
RocksDBTimeOrderedWindowSegmentedBytesStore<WindowSegmentWithHeaders> store,
                                              final boolean retainDuplicates,
                                              final long windowSize) {
         super(store, retainDuplicates, windowSize);
     }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+        final Position position = getPosition();
+
+        synchronized (position) {
+            result = QueryResult.forUnknownQueryType(query, this);
+
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + this.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
+                );
+            }
+            result.setPosition(position.copy());
+        }
+        return result;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index c9c9a015c83..332d2d6973c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -118,32 +118,29 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
         switch (windowStoreType) {
             case DEFAULT_WINDOW_STORE:
                 return new RocksDBTimeOrderedWindowStore(
-                    new RocksDBTimeOrderedWindowSegmentedBytesStore(
+                    new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         name,
-                        metricsScope(),
                         retentionPeriod,
-                        segmentInterval,
-                        false),
+                        false,
+                        new KeyValueSegments(name, metricsScope(), 
retentionPeriod, segmentInterval)),
                     retainDuplicates,
                     windowSize);
             case INDEXED_WINDOW_STORE:
                 return new RocksDBTimeOrderedWindowStore(
-                    new RocksDBTimeOrderedWindowSegmentedBytesStore(
+                    new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         name,
-                        metricsScope(),
                         retentionPeriod,
-                        segmentInterval,
-                        true),
+                        true,
+                        new KeyValueSegments(name, metricsScope(), 
retentionPeriod, segmentInterval)),
                     retainDuplicates,
                     windowSize);
             case INDEXED_WINDOW_STORE_WITH_HEADERS:
                 return new RocksDBTimeOrderedWindowStoreWithHeaders(
-                    new RocksDBTimeOrderedWindowSegmentedBytesStore(
+                    new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         name,
-                        metricsScope(),
                         retentionPeriod,
-                        segmentInterval,
-                        true),
+                        true,
+                        new WindowSegmentsWithHeaders(name, metricsScope(), 
retentionPeriod, segmentInterval)),
                     retainDuplicates,
                     windowSize);
             default:
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 3714643d43b..5689ad229f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -115,6 +115,9 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
         if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
             return true;
         }
+        if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {
+            return true;
+        }
         if (stateStore instanceof WrappedStateStore) {
             return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>) 
stateStore).wrapped());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentWithHeaders.java
new file mode 100644
index 00000000000..468afd827a4
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentWithHeaders.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A segment that stores window key-value pairs with headers support.
+ * <p>
+ * This segment extends {@link RocksDBMigratingWindowStoreWithHeaders} to 
provide
+ * header-aware storage with dual-column-family migration support from
+ * plain value format to headers format.
+ * <p>
+ * Value format:
+ * <ul>
+ *   <li>Old format (DEFAULT CF): {@code [value]}</li>
+ *   <li>New format (HEADERS CF): {@code 
[headersSize(varint)][headersBytes][value]}</li>
+ * </ul>
+ */
+class WindowSegmentWithHeaders extends RocksDBMigratingWindowStoreWithHeaders 
implements Segment {
+
+    private final long id;
+
+    WindowSegmentWithHeaders(final String segmentName,
+                             final String windowName,
+                             final long id,
+                             final Position position,
+                             final RocksDBMetricsRecorder metricsRecorder) {
+        super(segmentName, windowName, metricsRecorder);
+        this.id = id;
+        this.position = position;
+    }
+
+    @Override
+    public long id() {
+        return id;
+    }
+
+    @Override
+    public void destroy() throws IOException {
+        Utils.delete(dbDir);
+    }
+
+    @Override
+    public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void openDB(final Map<String, Object> configs, final File stateDir) 
{
+        super.openDB(configs, stateDir);
+        // skip the registering step
+    }
+
+    @Override
+    public String toString() {
+        return "WindowSegmentWithHeaders(id=" + id + ", name=" + name() + ")";
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final WindowSegmentWithHeaders segment = (WindowSegmentWithHeaders) 
obj;
+        return id == segment.id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
new file mode 100644
index 00000000000..c6fc08f467e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+/**
+ * Manages segments for time-ordered window stores with headers support.
+ * <p>
+ * Creates {@link WindowSegmentWithHeaders} instances which store values in 
the format
+ * {@code [headers][value]} (without timestamp in value, as timestamp is in 
the key).
+ *
+ * @see WindowSegmentWithHeaders
+ */
+class WindowSegmentsWithHeaders extends 
AbstractSegments<WindowSegmentWithHeaders> {
+
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    WindowSegmentsWithHeaders(final String name,
+                              final String metricsScope,
+                              final long retentionPeriod,
+                              final long segmentInterval) {
+        super(name, retentionPeriod, segmentInterval);
+        metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+    }
+
+    @Override
+    protected WindowSegmentWithHeaders createSegment(final long segmentId, 
final String segmentName) {
+        return new WindowSegmentWithHeaders(segmentName, name, segmentId, 
position, metricsRecorder);
+    }
+
+    @Override
+    protected void openSegmentDB(final WindowSegmentWithHeaders segment, final 
StateStoreContext context) {
+        segment.openDB(context.appConfigs(), context.stateDir());
+    }
+
+    @Override
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
+        metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
+        super.openExisting(context, streamTime);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index 394a5620eac..b118b0b2c31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.streams.state.internals.CachingWindowStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
 import 
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.internals.TimeOrderedCachingWindowStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -227,6 +228,7 @@ public class WindowStoreMaterializerTest {
 
     @Test
     public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
+        
doReturn("headers").when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
@@ -235,7 +237,7 @@ public class WindowStoreMaterializerTest {
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(CachingWindowStore.class, wrapped);
+        assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
     }
 
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 2cb1f410c7f..c4d1ef855f7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -169,20 +169,18 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
     AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> 
getBytesStore() {
         switch (schemaType()) {
             case WindowSchemaWithIndex:
-                return new RocksDBTimeOrderedWindowSegmentedBytesStore(
+                return new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         storeName,
-                        METRICS_SCOPE,
                         retention,
-                        segmentInterval,
-                        true
+                        true,
+                        new KeyValueSegments(storeName, METRICS_SCOPE, 
retention, segmentInterval)
                 );
             case WindowSchemaWithoutIndex:
-                return new RocksDBTimeOrderedWindowSegmentedBytesStore(
+                return new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
                         storeName,
-                        METRICS_SCOPE,
                         retention,
-                        segmentInterval,
-                        false
+                        false,
+                        new KeyValueSegments(storeName, METRICS_SCOPE, 
retention, segmentInterval)
                 );
             case SessionSchemaWithIndex:
                 return new RocksDBTimeOrderedSessionSegmentedBytesStore(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
index fd8569c18ef..0fbae858ae8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
@@ -58,7 +58,9 @@ public abstract class AbstractRocksDBWindowStoreTest extends 
AbstractWindowBytes
     enum StoreType {
         RocksDBWindowStore,
         RocksDBTimeOrderedWindowStoreWithIndex,
-        RocksDBTimeOrderedWindowStoreWithoutIndex
+        RocksDBTimeOrderedWindowStoreWithoutIndex,
+        RocksDBTimeOrderedWindowStoreWithHeadersWithIndex,
+        RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndex
     }
 
     abstract StoreType storeType();
@@ -102,6 +104,26 @@ public abstract class AbstractRocksDBWindowStoreTest 
extends AbstractWindowBytes
                         valueSerde
                 ).build();
             }
+            case RocksDBTimeOrderedWindowStoreWithHeadersWithIndex: {
+                final long defaultSegmentInterval = Math.max(retentionPeriod / 
2, 60_000L);
+                return Stores.windowStoreBuilder(
+                        new 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME,
+                                retentionPeriod, defaultSegmentInterval, 
windowSize, retainDuplicates,
+                                true, true),
+                        keySerde,
+                        valueSerde
+                ).build();
+            }
+            case RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndex: {
+                final long defaultSegmentInterval = Math.max(retentionPeriod / 
2, 60_000L);
+                return Stores.windowStoreBuilder(
+                        new 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME,
+                                retentionPeriod, defaultSegmentInterval, 
windowSize, retainDuplicates,
+                                false, true),
+                        keySerde,
+                        valueSerde
+                ).build();
+            }
             default:
                 throw new IllegalStateException("Unknown StoreType: " + 
storeType());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersTest.java
new file mode 100644
index 00000000000..13994dd36af
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RocksDBTimeOrderedWindowStoreWithHeadersTest {
+
+    private static final String STORE_NAME = "test-time-ordered-window-store";
+    private static final long WINDOW_SIZE = 10_000L;
+    private static final long RETENTION_PERIOD = 60_000L;
+    private static final long SEGMENT_INTERVAL = 30_000L;
+
+    private RocksDBTimeOrderedWindowStoreWithHeaders windowStore;
+    private InternalMockProcessorContext<String, String> context;
+    private File baseDir;
+
+    @BeforeEach
+    public void setUp() {
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        baseDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            baseDir,
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(props)
+        );
+
+        windowStore = new RocksDBTimeOrderedWindowStoreWithHeaders(
+            new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
+                STORE_NAME,
+                RETENTION_PERIOD,
+                true,
+                new WindowSegmentsWithHeaders(STORE_NAME, 
"test-metrics-scope", RETENTION_PERIOD, SEGMENT_INTERVAL)
+            ),
+            false,
+            WINDOW_SIZE
+        );
+        windowStore.init(context, windowStore);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (windowStore != null) {
+            windowStore.close();
+        }
+    }
+
+    @Test
+    public void shouldReturnUnknownQueryTypeForWindowKeyQuery() {
+        final WindowKeyQuery<Bytes, byte[]> query = 
WindowKeyQuery.withKeyAndWindowStartRange(
+            new Bytes("test-key".getBytes()),
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final QueryResult<WindowStoreIterator<byte[]>> result =
+            windowStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertFalse(result.isSuccess());
+        assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, 
result.getFailureReason());
+        assertNotNull(result.getPosition());
+    }
+
+    @Test
+    public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+        final WindowRangeQuery<Bytes, byte[]> query = 
WindowRangeQuery.withWindowStartRange(
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final 
QueryResult<KeyValueIterator<org.apache.kafka.streams.kstream.Windowed<Bytes>, 
byte[]>> result =
+            windowStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertFalse(result.isSuccess());
+        assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, 
result.getFailureReason());
+        assertNotNull(result.getPosition());
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoWhenRequested() {
+        final WindowKeyQuery<Bytes, byte[]> query = 
WindowKeyQuery.withKeyAndWindowStartRange(
+            new Bytes("test-key".getBytes()),
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final QueryResult<WindowStoreIterator<byte[]>> result =
+            windowStore.query(query, PositionBound.unbounded(), new 
QueryConfig(true));
+
+        assertFalse(result.getExecutionInfo().isEmpty());
+        assertTrue(result.getExecutionInfo().get(0).contains("Handled in"));
+        assertTrue(result.getExecutionInfo().get(0).contains(
+            RocksDBTimeOrderedWindowStoreWithHeaders.class.getName()));
+    }
+
+    @Test
+    public void shouldNotCollectExecutionInfoWhenNotRequested() {
+        final WindowKeyQuery<Bytes, byte[]> query = 
WindowKeyQuery.withKeyAndWindowStartRange(
+            new Bytes("test-key".getBytes()),
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final QueryResult<WindowStoreIterator<byte[]>> result =
+            windowStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertTrue(result.getExecutionInfo().isEmpty());
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest.java
new file mode 100644
index 00000000000..cc992b063e8
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest extends 
AbstractRocksDBWindowStoreTest {
+
+    @Override
+    StoreType storeType() {
+        return StoreType.RocksDBTimeOrderedWindowStoreWithHeadersWithIndex;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest.java
new file mode 100644
index 00000000000..1ea609e8ee9
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest extends 
AbstractRocksDBWindowStoreTest {
+
+    @Override
+    StoreType storeType() {
+        return StoreType.RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndex;
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
index dbe1bcf81ef..ea7c2d56acf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
@@ -80,6 +80,6 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, 
instanceOf(RocksDBTimeOrderedWindowStoreWithHeaders.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
-        assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore) 
wrapped).hasIndex());
+        assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>) 
wrapped).hasIndex());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index e718fc53c54..e73230428f5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -108,12 +108,13 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
     private TimeFirstWindowKeySchema baseKeySchema;
     private WindowStore<Bytes, byte[]> underlyingStore;
     private TimeOrderedCachingWindowStore cachingStore;
-    private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore;
+    private RocksDBTimeOrderedWindowSegmentedBytesStore<KeyValueSegment> 
bytesStore;
     private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
 
     private void setUp(final boolean hasIndex) {
         baseKeySchema = new TimeFirstWindowKeySchema();
-        bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test", 
"metrics-scope", 100, SEGMENT_INTERVAL, hasIndex);
+        bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test", 
100, hasIndex,
+            new KeyValueSegments("test", "metrics-scope", 100, 
SEGMENT_INTERVAL));
         underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, 
WINDOW_SIZE);
         final TimeWindowedDeserializer<String> keyDeserializer = new 
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
         keyDeserializer.setIsChangelogTopic(true);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index c1e9a346ff0..42ab3946a0e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -103,7 +103,7 @@ public class TimeOrderedWindowStoreTest {
     private static final String CACHE_NAMESPACE = "0_0-store-name";
 
     private InternalMockProcessorContext<?, ?> context;
-    private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore;
+    private RocksDBTimeOrderedWindowSegmentedBytesStore<KeyValueSegment> 
bytesStore;
     private RocksDBTimeOrderedWindowStore underlyingStore;
     private TimeOrderedCachingWindowStore cachingStore;
     private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
@@ -112,7 +112,8 @@ public class TimeOrderedWindowStoreTest {
 
     public void setUp(final boolean hasIndex) {
         baseKeySchema = new TimeFirstWindowKeySchema();
-        bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test", 
"metrics-scope", 100, SEGMENT_INTERVAL, hasIndex);
+        bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test", 
100, hasIndex,
+            new KeyValueSegments("test", "metrics-scope", 100, 
SEGMENT_INTERVAL));
         underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, 
WINDOW_SIZE);
         final TimeWindowedDeserializer<String> keyDeserializer = new 
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
         keyDeserializer.setIsChangelogTopic(true);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
new file mode 100644
index 00000000000..df278a35972
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.delete;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests upgrade path from time-ordered window store (without headers) to
+ * time-ordered window store with headers using direct supplier creation.
+ * <p>
+ * This test validates lazy migration from ts-CF to headers-CF and ensures
+ * the store can read old data after upgrade.
+ */
+public class TimeOrderedWindowStoreUpgradeTest {
+
+    private static final String STORE_NAME = "time-ordered-window-store";
+    private static final long WINDOW_SIZE_MS = 1000L;
+    private static final long RETENTION_MS = Duration.ofMinutes(5).toMillis();
+    private static final long SEGMENT_INTERVAL_MS = 
Duration.ofMinutes(1).toMillis();
+
+    private InternalMockProcessorContext<Bytes, byte[]> context;
+    private File baseDir;
+
+    @BeforeEach
+    public void setUp() {
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        baseDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            baseDir,
+            Serdes.Bytes(),
+            Serdes.ByteArray(),
+            new StreamsConfig(props)
+        );
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (baseDir != null) {
+            try {
+                delete(baseDir);
+            } catch (final Exception e) {
+                // Ignore
+            }
+        }
+    }
+
+    /**
+     * Helper method to serialize value with headers in [headers][value] 
format.
+     * This is used for window stores with headers where the format is:
+     * [headersSize(varint)][headersBytes][value]
+     */
+    private static byte[] serializeValueWithHeaders(final byte[] value, final 
Headers headers) {
+        if (value == null) {
+            return null;
+        }
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
+        final byte[] rawHeaders = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+            ByteUtils.writeVarint(rawHeaders.length, out);
+            out.write(rawHeaders);
+            out.write(value);
+            return baos.toByteArray();
+        } catch (final IOException e) {
+            throw new RuntimeException("Failed to serialize value with 
headers", e);
+        }
+    }
+
+    /**
+     * Helper method to extract raw value from [headers][value] format.
+     * This is used for window stores with headers where the format is:
+     * [headersSize(varint)][headersBytes][value]
+     */
+    private static byte[] extractRawValue(final byte[] valueWithHeaders) {
+        if (valueWithHeaders == null) {
+            return null;
+        }
+        final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize);
+        final byte[] rawValue = new byte[buffer.remaining()];
+        buffer.get(rawValue);
+        return rawValue;
+    }
+
+    /**
+     * Helper method to extract headers from [headers][value] format.
+     * This is used for window stores with headers where the format is:
+     * [headersSize(varint)][headersBytes][value]
+     */
+    private static Headers extractHeaders(final byte[] valueWithHeaders) {
+        if (valueWithHeaders == null) {
+            return null;
+        }
+        final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        final byte[] headerBytes = new byte[headersSize];
+        buffer.get(headerBytes);
+        return HeadersDeserializer.deserialize(headerBytes);
+    }
+
+    @Test
+    public void shouldMigrateFromWithoutHeadersToWithHeaders() {
+        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
+            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+                STORE_NAME,
+                RETENTION_MS,
+                SEGMENT_INTERVAL_MS,
+                WINDOW_SIZE_MS,
+                false,
+                true,
+                false   // withHeaders = FALSE (old format)
+            );
+
+        final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        // Write data in old format
+        final long baseTime = System.currentTimeMillis();
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+        final Bytes key2 = Bytes.wrap("key2".getBytes());
+        final Bytes key3 = Bytes.wrap("key3".getBytes());
+
+        oldStore.put(key1, "value1".getBytes(), baseTime + 100);
+        oldStore.put(key2, "value2".getBytes(), baseTime + 200);
+        oldStore.put(key3, "value3".getBytes(), baseTime + 300);
+
+        // Verify old data
+        assertEquals("value1", new String(oldStore.fetch(key1, baseTime + 
100)));
+        assertEquals("value2", new String(oldStore.fetch(key2, baseTime + 
200)));
+        assertEquals("value3", new String(oldStore.fetch(key3, baseTime + 
300)));
+
+        oldStore.close();
+
+        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
+            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+                STORE_NAME,
+                RETENTION_MS,
+                SEGMENT_INTERVAL_MS,
+                WINDOW_SIZE_MS,
+                false,  // retainDuplicates
+                true,   // withIndex
+                true    // withHeaders = TRUE (new format with headers support)
+            );
+
+        final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify we can read old data (lazy migration)
+        byte[] fetch = newStore.fetch(key1, baseTime + 100);
+        assertEquals("value1", new String(extractRawValue(fetch)));
+        assertEquals(0, extractHeaders(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        fetch = newStore.fetch(key2, baseTime + 200);
+        assertEquals("value2", new String(extractRawValue(fetch)));
+        assertEquals(0, extractHeaders(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        fetch = newStore.fetch(key3, baseTime + 300);
+        assertEquals("value3", new String(extractRawValue(fetch)));
+        assertEquals(0, extractHeaders(fetch).toArray().length, "Old data 
should have empty headers after migration");
+
+
+        // Write new data (should use headers-CF)
+        final Bytes key4 = Bytes.wrap("key4".getBytes());
+
+        // Write key3 with empty headers
+        newStore.put(key3, 
serializeValueWithHeaders("value3-updated".getBytes(), new RecordHeaders()), 
baseTime + 350);
+
+        // Write key4 with actual headers
+        final RecordHeaders headersWithData = new RecordHeaders();
+        headersWithData.add("header-key-1", "header-value-1".getBytes());
+        headersWithData.add("header-key-2", "header-value-2".getBytes());
+        newStore.put(key4, serializeValueWithHeaders("value4".getBytes(), 
headersWithData), baseTime + 400);
+
+        // Verify new data - raw values
+        assertEquals("value3-updated", new 
String(extractRawValue(newStore.fetch(key3, baseTime + 350))));
+        assertEquals("value4", new String(extractRawValue(newStore.fetch(key4, 
baseTime + 400))));
+
+        // Verify headers for key3 (empty headers)
+        final byte[] fetchedKey3Value = newStore.fetch(key3, baseTime + 350);
+        final Headers retrievedKey3Headers = extractHeaders(fetchedKey3Value);
+        assertEquals(0, retrievedKey3Headers.toArray().length);
+
+        // Verify headers for key4 (headers with data)
+        final byte[] fetchedKey4Value = newStore.fetch(key4, baseTime + 400);
+        final Headers retrievedKey4Headers = extractHeaders(fetchedKey4Value);
+        assertEquals(2, retrievedKey4Headers.toArray().length);
+        assertEquals("header-value-1", new 
String(retrievedKey4Headers.lastHeader("header-key-1").value()));
+        assertEquals("header-value-2", new 
String(retrievedKey4Headers.lastHeader("header-key-2").value()));
+
+        newStore.close();
+    }
+
+    @Test
+    public void shouldMigrateFromWithIndexToWithIndexAndHeaders() {
+        // Test: withIndex=true, withHeaders=false → withIndex=true, 
withHeaders=true
+        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
+            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+                false, true, false  // withIndex=true, withHeaders=false
+            );
+
+        final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        final long baseTime = System.currentTimeMillis();
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+
+        oldStore.put(key1, "value1".getBytes(), baseTime + 100);
+        oldStore.close();
+
+        // Upgrade to headers
+        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
+            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+                false, true, true  // withIndex=true, withHeaders=true
+            );
+
+        final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify old data still accessible
+        assertEquals("value1", new String(extractRawValue(newStore.fetch(key1, 
baseTime + 100))));
+
+        newStore.close();
+    }
+
+    @Test
+    public void shouldMigrateFromWithoutIndexToWithIndexAndHeaders() {
+        // Test: withIndex=false, withHeaders=false → withIndex=true, 
withHeaders=true
+        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
+            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+                false, false, false  // withIndex=false, withHeaders=false
+            );
+
+        final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
+        oldStore.init(context, oldStore);
+
+        final long baseTime = System.currentTimeMillis();
+        final Bytes key1 = Bytes.wrap("key1".getBytes());
+
+        oldStore.put(key1, "value1".getBytes(), baseTime + 100);
+        oldStore.close();
+
+        // Upgrade to both index and headers
+        final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
+            new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+                STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+                false, true, true  // withIndex=true, withHeaders=true
+            );
+
+        final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
+        newStore.init(context, newStore);
+
+        // Verify old data still accessible
+        assertEquals("value1", new String(extractRawValue(newStore.fetch(key1, 
baseTime + 100))));
+
+        newStore.close();
+    }
+}

Reply via email to