This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6fd96c36f1 KAFKA-20317: Enable time-ordered header window store in
DSL (#21780)
a6fd96c36f1 is described below
commit a6fd96c36f1cb24df44b739cbad9fd240d56d353
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Mar 18 23:37:18 2026 +0000
KAFKA-20317: Enable time-ordered header window store in DSL (#21780)
This PR enables time-ordered window store with headers. Part of
KIP-1285.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/state/HeadersBytesStore.java | 27 +-
...tractRocksDBTimeOrderedSegmentedBytesStore.java | 54 +++-
.../RocksDBMigratingWindowStoreWithHeaders.java | 97 +++++++
.../RocksDBTimeOrderedKeyValueBytesStore.java | 8 +-
...cksDBTimeOrderedSessionSegmentedBytesStore.java | 9 +-
...ocksDBTimeOrderedWindowSegmentedBytesStore.java | 47 ++--
.../internals/RocksDBTimeOrderedWindowStore.java | 4 +-
.../RocksDBTimeOrderedWindowStoreWithHeaders.java | 39 ++-
...IndexedTimeOrderedWindowBytesStoreSupplier.java | 21 +-
.../TimestampedWindowStoreWithHeadersBuilder.java | 3 +
.../state/internals/WindowSegmentWithHeaders.java | 95 +++++++
.../state/internals/WindowSegmentsWithHeaders.java | 58 ++++
.../internals/WindowStoreMaterializerTest.java | 4 +-
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 14 +-
.../internals/AbstractRocksDBWindowStoreTest.java | 24 +-
...cksDBTimeOrderedWindowStoreWithHeadersTest.java | 146 ++++++++++
...OrderedWindowStoreWithHeadersWithIndexTest.java | 25 ++
...eredWindowStoreWithHeadersWithoutIndexTest.java | 25 ++
...xedTimeOrderedWindowBytesStoreSupplierTest.java | 2 +-
...imeOrderedCachingPersistentWindowStoreTest.java | 5 +-
.../internals/TimeOrderedWindowStoreTest.java | 5 +-
.../TimeOrderedWindowStoreUpgradeTest.java | 301 +++++++++++++++++++++
22 files changed, 919 insertions(+), 94 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 67c47fbfd80..8dd74fd98ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -29,20 +29,23 @@ package org.apache.kafka.streams.state;
public interface HeadersBytesStore {
/**
- * Converts a legacy timestamped value (ValueAndTimestamp format, without
headers) to the header-embedded format.
+ * Converts a legacy value (without headers) to the header-embedded format
by adding empty headers prefix.
* <p>
- * For timestamped stores, the legacy format is: [timestamp(8)][value]
- * The new format is:
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * This is a general-purpose method that prepends an empty headers prefix
to any byte array.
+ * The header-embedded format is:
[headersSize(varint)][headersBytes][payload]
+ * where empty headers are represented as headersSize=0 (encoded as
[0x00]).
* <p>
- * This method adds empty headers to the existing timestamped value format.
- * <p>
- * Empty headers are represented as 0 bytes (headersSize=0, no
headersBytes),
+ * Usage examples:
+ * <ul>
+ * <li>Timestamped stores: [timestamp(8)][value] →
[0x00][timestamp(8)][value]</li>
+ * <li>Window stores: [value] → [0x00][value]</li>
+ * </ul>
*
- * @param valueAndTimestamp the legacy timestamped value bytes
- * @return the value in header-embedded format with empty headers
+ * @param value the legacy value bytes (may include timestamp for
timestamped stores, or plain value for window stores)
+ * @return the value in header-embedded format with empty headers prefix
[0x00][value]
*/
- static byte[] convertToHeaderFormat(final byte[] valueAndTimestamp) {
- if (valueAndTimestamp == null) {
+ static byte[] convertToHeaderFormat(final byte[] value) {
+ if (value == null) {
return null;
}
@@ -51,9 +54,9 @@ public interface HeadersBytesStore {
// headersSize = varint(0) = [0x00]
// headersBytes = [] (empty, 0 bytes)
// Result: [0x00][payload]
- final byte[] res = new byte[1 + valueAndTimestamp.length];
+ final byte[] res = new byte[1 + value.length];
// res[0] is initialized to 0x00 per Java Specification
- System.arraycopy(valueAndTimestamp, 0, res, 1,
valueAndTimestamp.length);
+ System.arraycopy(value, 0, res, 1, value.length);
return res;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index 5ec5e011a2b..e87403b4289 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
+import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,7 @@ import java.util.Optional;
* @see RocksDBTimeOrderedSessionSegmentedBytesStore
* @see RocksDBTimeOrderedWindowSegmentedBytesStore
*/
-public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends
AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> {
+public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes,
byte[]> {
@@ -113,14 +115,32 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
+ /**
+ * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
+ * Converts index keys (key-first schema) to base store keys (time-first
schema).
+ * <p>
+ * This can be reused by both window store implementations (with and
without headers).
+ */
+ class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
+ WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
+ super(indexIterator);
+ }
+
+ @Override
+ protected Bytes getBaseKey(final Bytes indexKey) {
+ final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
+ final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
+ final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
+ return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
+ }
+ }
+
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
- final String metricsScope,
final long retention,
- final long segmentInterval,
final KeySchema
baseKeySchema,
- final Optional<KeySchema>
indexKeySchema) {
- super(name, baseKeySchema, indexKeySchema,
- new KeyValueSegments(name, metricsScope, retention,
segmentInterval), retention);
+ final Optional<KeySchema>
indexKeySchema,
+ final AbstractSegments<S>
segments) {
+ super(name, baseKeySchema, indexKeySchema, segments, retention);
}
@Override
@@ -137,7 +157,15 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
return fetch(key, from, to, false);
}
- protected abstract IndexToBaseStoreIterator
getIndexToBaseStoreIterator(final SegmentIterator<KeyValueSegment>
segmentIterator);
+ protected abstract IndexToBaseStoreIterator
getIndexToBaseStoreIterator(final SegmentIterator<S> segmentIterator);
+
+ public void put(final Bytes key, final long timestamp, final int seqnum,
final byte[] value) {
+ throw new UnsupportedOperationException("This store does not support
put with timestamp and seqnum");
+ }
+
+ public byte[] fetch(final Bytes key, final long timestamp, final int
seqnum) {
+ throw new UnsupportedOperationException("This store does not support
fetch with timestamp and seqnum");
+ }
KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
@@ -151,7 +179,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
}
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+ final List<S> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
forward);
final Bytes binaryFrom =
indexKeySchema.get().lowerRangeFixedSize(key, actualFrom);
@@ -166,7 +194,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
}
- final List<KeyValueSegment> searchSpace =
baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+ final List<S> searchSpace = baseKeySchema.segmentsToSearch(segments,
actualFrom, to,
forward);
final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key,
actualFrom);
@@ -216,7 +244,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
}
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+ final List<S> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
forward);
final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom,
actualFrom);
@@ -230,7 +258,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
forward));
}
- final List<KeyValueSegment> searchSpace =
baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+ final List<S> searchSpace = baseKeySchema.segmentsToSearch(segments,
actualFrom, to,
forward);
final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom);
@@ -254,7 +282,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
return KeyValueIterators.emptyIterator();
}
- final List<KeyValueSegment> searchSpace =
segments.segments(actualFrom, timeTo, true);
+ final List<S> searchSpace = segments.segments(actualFrom, timeTo,
true);
final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
@@ -276,7 +304,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
return KeyValueIterators.emptyIterator();
}
- final List<KeyValueSegment> searchSpace =
segments.segments(actualFrom, timeTo, false);
+ final List<S> searchSpace = segments.segments(actualFrom, timeTo,
false);
final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
new file mode 100644
index 00000000000..e0a25e6bbc7
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-value store with headers support based on RocksDB for
window stores.
+ * <p>
+ * This store provides a migration path from plain {@link RocksDBStore}
(DEFAULT column family)
+ * to a headers-aware column family ({@code windowKeyValueWithHeaders}). It
uses
+ * {@link DualColumnFamilyAccessor} for lazy migration when legacy data exists
in the DEFAULT CF.
+ * <p>
+ * Value format:
+ * <ul>
+ * <li>Old format (DEFAULT CF): {@code [value]}</li>
+ * <li>New format (HEADERS CF): {@code
[headersSize(varint)][headersBytes][value]}</li>
+ * </ul>
+ * <p>
+ * This is similar to {@link RocksDBMigratingSessionStoreWithHeaders} but for
window stores.
+ * Unlike timestamped stores, window stores don't include timestamp in the
value (it's in the key).
+ */
+public class RocksDBMigratingWindowStoreWithHeaders extends RocksDBStore
implements HeadersBytesStore {
+ private static final Logger log =
LoggerFactory.getLogger(RocksDBMigratingWindowStoreWithHeaders.class);
+
+ static final byte[] WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME =
+ "windowKeyValueWithHeaders".getBytes(StandardCharsets.UTF_8);
+
+ RocksDBMigratingWindowStoreWithHeaders(final String name,
+ final String parentDir,
+ final RocksDBMetricsRecorder
metricsRecorder) {
+ super(name, parentDir, metricsRecorder);
+ }
+
+ @Override
+ void openRocksDB(final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions) {
+ final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+ dbOptions,
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor(WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions),
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ );
+ final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
+ final ColumnFamilyHandle withHeadersColumnFamily =
columnFamilies.get(1);
+ final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);
+
+ // Check if DEFAULT CF has data (upgrade from old format without
headers)
+ try (final RocksIterator noHeadersIter =
db.newIterator(noHeadersColumnFamily)) {
+ noHeadersIter.seekToFirst();
+ if (noHeadersIter.isValid()) {
+ log.info("Opening window store {} in upgrade mode from plain
value format", name);
+ // Migrate from [value] to [headers][value]
+ // Add empty headers prefix [0x00] to plain value
+ cfAccessor = new DualColumnFamilyAccessor(
+ offsetsCf,
+ noHeadersColumnFamily,
+ withHeadersColumnFamily,
+ HeadersBytesStore::convertToHeaderFormat,
+ this,
+ open
+ );
+ } else {
+ log.info("Opening window store {} in regular headers-aware
mode", name);
+ cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
withHeadersColumnFamily);
+ noHeadersColumnFamily.close();
+ }
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
index e48cb6061fc..48ea130ab31 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
@@ -34,13 +34,17 @@ import java.util.Optional;
/**
* A RocksDB backed time-ordered segmented bytes store for window key schema.
*/
-public class RocksDBTimeOrderedKeyValueBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+public class RocksDBTimeOrderedKeyValueBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
private long minTimestamp;
RocksDBTimeOrderedKeyValueBytesStore(final String name,
final String metricsScope) {
- super(name, metricsScope, Long.MAX_VALUE, Long.MAX_VALUE, new
TimeFirstWindowKeySchema(), Optional.empty());
+ super(name,
+ Long.MAX_VALUE,
+ new TimeFirstWindowKeySchema(),
+ Optional.empty(),
+ new KeyValueSegments(name, metricsScope, Long.MAX_VALUE,
Long.MAX_VALUE));
minTimestamp = Long.MAX_VALUE;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index bef0410dae6..3e1f4c5ea5c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -39,7 +39,7 @@ import java.util.Optional;
/**
* A RocksDB backed time-ordered segmented bytes store for session key schema.
*/
-public class RocksDBTimeOrderedSessionSegmentedBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+public class RocksDBTimeOrderedSessionSegmentedBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
private class SessionKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
@@ -60,8 +60,11 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore
extends AbstractRocksD
final long retention,
final long segmentInterval,
final boolean withIndex) {
- super(name, metricsScope, retention, segmentInterval, new
TimeFirstSessionKeySchema(),
- Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() :
null));
+ super(name,
+ retention,
+ new TimeFirstSessionKeySchema(),
+ Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() :
null),
+ new KeyValueSegments(name, metricsScope, retention,
segmentInterval));
}
public byte[] fetchSession(final Bytes key,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
index 269f273590c..8829c7fb063 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
-import org.apache.kafka.streams.state.KeyValueIterator;
import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
@@ -35,38 +34,34 @@ import java.util.Optional;
/**
* A RocksDB backed time-ordered segmented bytes store for window key schema.
+ * <p>
+ * This class supports different segment implementations via the generic type
parameter.
+ * It can be used with {@link KeyValueSegment} for regular stores or {@link
WindowSegmentWithHeaders}
+ * for stores with headers support.
+ *
+ * @param <S> the segment type
*/
-public class RocksDBTimeOrderedWindowSegmentedBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore {
-
- private class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
+public class RocksDBTimeOrderedWindowSegmentedBytesStore<S extends Segment>
extends AbstractRocksDBTimeOrderedSegmentedBytesStore<S> {
RocksDBTimeOrderedWindowSegmentedBytesStore(final String name,
- final String metricsScope,
final long retention,
- final long segmentInterval,
- final boolean withIndex) {
- super(name, metricsScope, retention, segmentInterval, new
TimeFirstWindowKeySchema(),
- Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() :
null));
+ final boolean withIndex,
+ final AbstractSegments<S>
segments) {
+ super(name,
+ retention,
+ new TimeFirstWindowKeySchema(),
+ Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() :
null),
+ segments);
}
+ @Override
public void put(final Bytes key, final long timestamp, final int seqnum,
final byte[] value) {
final Bytes baseKey = TimeFirstWindowKeySchema.toStoreKeyBinary(key,
timestamp, seqnum);
put(baseKey, value);
}
- byte[] fetch(final Bytes key, final long timestamp, final int seqnum) {
+ @Override
+ public byte[] fetch(final Bytes key, final long timestamp, final int
seqnum) {
return get(TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp,
seqnum));
}
@@ -80,7 +75,7 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore
extends AbstractRocksDB
}
@Override
- Map<KeyValueSegment, WriteBatch> getWriteBatches(
+ Map<S, WriteBatch> getWriteBatches(
final Collection<ConsumerRecord<byte[], byte[]>> records) {
// advance stream time to the max timestamp in the batch
for (final ConsumerRecord<byte[], byte[]> record : records) {
@@ -88,11 +83,11 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore
extends AbstractRocksDB
observedStreamTime = Math.max(observedStreamTime, timestamp);
}
- final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
+ final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
for (final ConsumerRecord<byte[], byte[]> record : records) {
final long timestamp =
WindowKeySchema.extractStoreTimestamp(record.key());
final long segmentId = segments.segmentId(timestamp);
- final KeyValueSegment segment =
segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext,
observedStreamTime);
+ final S segment = segments.getOrCreateSegmentIfLive(segmentId,
internalProcessorContext, observedStreamTime);
if (segment != null) {
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
record,
@@ -123,7 +118,7 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore
extends AbstractRocksDB
@Override
protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(
- final SegmentIterator<KeyValueSegment> segmentIterator) {
+ final SegmentIterator<S> segmentIterator) {
return new WindowKeySchemaIndexToBaseStoreIterator(segmentIterator);
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index 41708ed7b42..79e3541af58 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -36,7 +36,7 @@ import java.util.Objects;
public class RocksDBTimeOrderedWindowStore
- extends WrappedStateStore<RocksDBTimeOrderedWindowSegmentedBytesStore,
Object, Object>
+ extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<?
extends Segment>, Object, Object>
implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
private final boolean retainDuplicates;
@@ -46,7 +46,7 @@ public class RocksDBTimeOrderedWindowStore
private StateStoreContext stateStoreContext;
RocksDBTimeOrderedWindowStore(
- final RocksDBTimeOrderedWindowSegmentedBytesStore store,
+ final AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment>
store,
final boolean retainDuplicates,
final long windowSize
) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
index 7c6c300617d..f258cf57710 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
@@ -16,6 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
@@ -26,15 +31,10 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
* {@link TimestampedBytesStore} (for timestamp support) and {@link
HeadersBytesStore}
* (for header support) marker interfaces.
* <p>
- * The storage format for values is:
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * This store returns {@link QueryResult#forUnknownQueryType(Query, Object)}
for all queries,
+ * as IQv2 query handling is done at the metered layer.
* <p>
- * This implementation uses segment-level versioning for backward
compatibility:
- * <ul>
- * <li>Old segments continue to use the legacy format without headers</li>
- * <li>New segments use the header-embedded format</li>
- * <li>Legacy values are served with empty headers on read</li>
- * <li>All new writes use the new format</li>
- * </ul>
+ * The storage format for values is:
[headersSize(varint)][headersBytes][timestamp(8)][value]
*
* @see RocksDBTimeOrderedWindowStore
* @see HeadersBytesStore
@@ -42,9 +42,30 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
*/
class RocksDBTimeOrderedWindowStoreWithHeaders extends
RocksDBTimeOrderedWindowStore implements TimestampedBytesStore,
HeadersBytesStore {
- RocksDBTimeOrderedWindowStoreWithHeaders(final
RocksDBTimeOrderedWindowSegmentedBytesStore store,
+ RocksDBTimeOrderedWindowStoreWithHeaders(final
RocksDBTimeOrderedWindowSegmentedBytesStore<WindowSegmentWithHeaders> store,
final boolean retainDuplicates,
final long windowSize) {
super(store, retainDuplicates, windowSize);
}
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+ final Position position = getPosition();
+
+ synchronized (position) {
+ result = QueryResult.forUnknownQueryType(query, this);
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ );
+ }
+ result.setPosition(position.copy());
+ }
+ return result;
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index c9c9a015c83..332d2d6973c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -118,32 +118,29 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
switch (windowStoreType) {
case DEFAULT_WINDOW_STORE:
return new RocksDBTimeOrderedWindowStore(
- new RocksDBTimeOrderedWindowSegmentedBytesStore(
+ new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
name,
- metricsScope(),
retentionPeriod,
- segmentInterval,
- false),
+ false,
+ new KeyValueSegments(name, metricsScope(),
retentionPeriod, segmentInterval)),
retainDuplicates,
windowSize);
case INDEXED_WINDOW_STORE:
return new RocksDBTimeOrderedWindowStore(
- new RocksDBTimeOrderedWindowSegmentedBytesStore(
+ new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
name,
- metricsScope(),
retentionPeriod,
- segmentInterval,
- true),
+ true,
+ new KeyValueSegments(name, metricsScope(),
retentionPeriod, segmentInterval)),
retainDuplicates,
windowSize);
case INDEXED_WINDOW_STORE_WITH_HEADERS:
return new RocksDBTimeOrderedWindowStoreWithHeaders(
- new RocksDBTimeOrderedWindowSegmentedBytesStore(
+ new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
name,
- metricsScope(),
retentionPeriod,
- segmentInterval,
- true),
+ true,
+ new WindowSegmentsWithHeaders(name, metricsScope(),
retentionPeriod, segmentInterval)),
retainDuplicates,
windowSize);
default:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 3714643d43b..5689ad229f3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -115,6 +115,9 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
return true;
}
+ if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {
+ return true;
+ }
if (stateStore instanceof WrappedStateStore) {
return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>)
stateStore).wrapped());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentWithHeaders.java
new file mode 100644
index 00000000000..468afd827a4
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentWithHeaders.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A segment that stores window key-value pairs with headers support.
+ * <p>
+ * This segment extends {@link RocksDBMigratingWindowStoreWithHeaders} to
provide
+ * header-aware storage with dual-column-family migration support from
+ * plain value format to headers format.
+ * <p>
+ * Value format:
+ * <ul>
+ * <li>Old format (DEFAULT CF): {@code [value]}</li>
+ * <li>New format (HEADERS CF): {@code
[headersSize(varint)][headersBytes][value]}</li>
+ * </ul>
+ */
+class WindowSegmentWithHeaders extends RocksDBMigratingWindowStoreWithHeaders
implements Segment {
+
+ private final long id;
+
+ WindowSegmentWithHeaders(final String segmentName,
+ final String windowName,
+ final long id,
+ final Position position,
+ final RocksDBMetricsRecorder metricsRecorder) {
+ super(segmentName, windowName, metricsRecorder);
+ this.id = id;
+ this.position = position;
+ }
+
+ @Override
+ public long id() {
+ return id;
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ Utils.delete(dbDir);
+ }
+
+ @Override
+ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void openDB(final Map<String, Object> configs, final File stateDir)
{
+ super.openDB(configs, stateDir);
+ // skip the registering step
+ }
+
+ @Override
+ public String toString() {
+ return "WindowSegmentWithHeaders(id=" + id + ", name=" + name() + ")";
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final WindowSegmentWithHeaders segment = (WindowSegmentWithHeaders)
obj;
+ return id == segment.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
new file mode 100644
index 00000000000..c6fc08f467e
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+/**
+ * Manages segments for time-ordered window stores with headers support.
+ * <p>
+ * Creates {@link WindowSegmentWithHeaders} instances which store values in
the format
+ * {@code [headers][value]} (without timestamp in value, as timestamp is in
the key).
+ *
+ * @see WindowSegmentWithHeaders
+ */
+class WindowSegmentsWithHeaders extends
AbstractSegments<WindowSegmentWithHeaders> {
+
+ private final RocksDBMetricsRecorder metricsRecorder;
+
+ WindowSegmentsWithHeaders(final String name,
+ final String metricsScope,
+ final long retentionPeriod,
+ final long segmentInterval) {
+ super(name, retentionPeriod, segmentInterval);
+ metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+ }
+
+ @Override
+ protected WindowSegmentWithHeaders createSegment(final long segmentId,
final String segmentName) {
+ return new WindowSegmentWithHeaders(segmentName, name, segmentId,
position, metricsRecorder);
+ }
+
+ @Override
+ protected void openSegmentDB(final WindowSegmentWithHeaders segment, final
StateStoreContext context) {
+ segment.openDB(context.appConfigs(), context.stateDir());
+ }
+
+ @Override
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
+ metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
+ super.openExisting(context, streamTime);
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index 394a5620eac..b118b0b2c31 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.streams.state.internals.CachingWindowStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStoreWithHeaders;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.internals.TimeOrderedCachingWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.junit.jupiter.api.BeforeEach;
@@ -227,6 +228,7 @@ public class WindowStoreMaterializerTest {
@Test
public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
+
doReturn("headers").when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
@@ -235,7 +237,7 @@ public class WindowStoreMaterializerTest {
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(CachingWindowStore.class, wrapped);
+ assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 2cb1f410c7f..c4d1ef855f7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -169,20 +169,18 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment>
getBytesStore() {
switch (schemaType()) {
case WindowSchemaWithIndex:
- return new RocksDBTimeOrderedWindowSegmentedBytesStore(
+ return new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
storeName,
- METRICS_SCOPE,
retention,
- segmentInterval,
- true
+ true,
+ new KeyValueSegments(storeName, METRICS_SCOPE,
retention, segmentInterval)
);
case WindowSchemaWithoutIndex:
- return new RocksDBTimeOrderedWindowSegmentedBytesStore(
+ return new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
storeName,
- METRICS_SCOPE,
retention,
- segmentInterval,
- false
+ false,
+ new KeyValueSegments(storeName, METRICS_SCOPE,
retention, segmentInterval)
);
case SessionSchemaWithIndex:
return new RocksDBTimeOrderedSessionSegmentedBytesStore(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
index fd8569c18ef..0fbae858ae8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java
@@ -58,7 +58,9 @@ public abstract class AbstractRocksDBWindowStoreTest extends
AbstractWindowBytes
enum StoreType {
RocksDBWindowStore,
RocksDBTimeOrderedWindowStoreWithIndex,
- RocksDBTimeOrderedWindowStoreWithoutIndex
+ RocksDBTimeOrderedWindowStoreWithoutIndex,
+ RocksDBTimeOrderedWindowStoreWithHeadersWithIndex,
+ RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndex
}
abstract StoreType storeType();
@@ -102,6 +104,26 @@ public abstract class AbstractRocksDBWindowStoreTest
extends AbstractWindowBytes
valueSerde
).build();
}
+ case RocksDBTimeOrderedWindowStoreWithHeadersWithIndex: {
+ final long defaultSegmentInterval = Math.max(retentionPeriod /
2, 60_000L);
+ return Stores.windowStoreBuilder(
+ new
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME,
+ retentionPeriod, defaultSegmentInterval,
windowSize, retainDuplicates,
+ true, true),
+ keySerde,
+ valueSerde
+ ).build();
+ }
+ case RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndex: {
+ final long defaultSegmentInterval = Math.max(retentionPeriod /
2, 60_000L);
+ return Stores.windowStoreBuilder(
+ new
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME,
+ retentionPeriod, defaultSegmentInterval,
windowSize, retainDuplicates,
+ false, true),
+ keySerde,
+ valueSerde
+ ).build();
+ }
default:
throw new IllegalStateException("Unknown StoreType: " +
storeType());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersTest.java
new file mode 100644
index 00000000000..13994dd36af
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RocksDBTimeOrderedWindowStoreWithHeadersTest {
+
+ private static final String STORE_NAME = "test-time-ordered-window-store";
+ private static final long WINDOW_SIZE = 10_000L;
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private RocksDBTimeOrderedWindowStoreWithHeaders windowStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ windowStore = new RocksDBTimeOrderedWindowStoreWithHeaders(
+ new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
+ STORE_NAME,
+ RETENTION_PERIOD,
+ true,
+ new WindowSegmentsWithHeaders(STORE_NAME,
"test-metrics-scope", RETENTION_PERIOD, SEGMENT_INTERVAL)
+ ),
+ false,
+ WINDOW_SIZE
+ );
+ windowStore.init(context, windowStore);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (windowStore != null) {
+ windowStore.close();
+ }
+ }
+
+ @Test
+ public void shouldReturnUnknownQueryTypeForWindowKeyQuery() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final QueryResult<WindowStoreIterator<byte[]>> result =
+ windowStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertFalse(result.isSuccess());
+ assertEquals(FailureReason.UNKNOWN_QUERY_TYPE,
result.getFailureReason());
+ assertNotNull(result.getPosition());
+ }
+
+ @Test
+ public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+ final WindowRangeQuery<Bytes, byte[]> query =
WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final
QueryResult<KeyValueIterator<org.apache.kafka.streams.kstream.Windowed<Bytes>,
byte[]>> result =
+ windowStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertFalse(result.isSuccess());
+ assertEquals(FailureReason.UNKNOWN_QUERY_TYPE,
result.getFailureReason());
+ assertNotNull(result.getPosition());
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoWhenRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final QueryResult<WindowStoreIterator<byte[]>> result =
+ windowStore.query(query, PositionBound.unbounded(), new
QueryConfig(true));
+
+ assertFalse(result.getExecutionInfo().isEmpty());
+ assertTrue(result.getExecutionInfo().get(0).contains("Handled in"));
+ assertTrue(result.getExecutionInfo().get(0).contains(
+ RocksDBTimeOrderedWindowStoreWithHeaders.class.getName()));
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final QueryResult<WindowStoreIterator<byte[]>> result =
+ windowStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.getExecutionInfo().isEmpty());
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest.java
new file mode 100644
index 00000000000..cc992b063e8
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimeOrderedWindowStoreWithHeadersWithIndexTest extends
AbstractRocksDBWindowStoreTest {
+
+ @Override
+ StoreType storeType() {
+ return StoreType.RocksDBTimeOrderedWindowStoreWithHeadersWithIndex;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest.java
new file mode 100644
index 00000000000..1ea609e8ee9
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndexTest extends
AbstractRocksDBWindowStoreTest {
+
+ @Override
+ StoreType storeType() {
+ return StoreType.RocksDBTimeOrderedWindowStoreWithHeadersWithoutIndex;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
index dbe1bcf81ef..ea7c2d56acf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
@@ -80,6 +80,6 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertThat(store,
instanceOf(RocksDBTimeOrderedWindowStoreWithHeaders.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
- assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore)
wrapped).hasIndex());
+ assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>)
wrapped).hasIndex());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index e718fc53c54..e73230428f5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -108,12 +108,13 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
private TimeFirstWindowKeySchema baseKeySchema;
private WindowStore<Bytes, byte[]> underlyingStore;
private TimeOrderedCachingWindowStore cachingStore;
- private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore;
+ private RocksDBTimeOrderedWindowSegmentedBytesStore<KeyValueSegment>
bytesStore;
private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
private void setUp(final boolean hasIndex) {
baseKeySchema = new TimeFirstWindowKeySchema();
- bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test",
"metrics-scope", 100, SEGMENT_INTERVAL, hasIndex);
+ bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test",
100, hasIndex,
+ new KeyValueSegments("test", "metrics-scope", 100,
SEGMENT_INTERVAL));
underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false,
WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index c1e9a346ff0..42ab3946a0e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -103,7 +103,7 @@ public class TimeOrderedWindowStoreTest {
private static final String CACHE_NAMESPACE = "0_0-store-name";
private InternalMockProcessorContext<?, ?> context;
- private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore;
+ private RocksDBTimeOrderedWindowSegmentedBytesStore<KeyValueSegment>
bytesStore;
private RocksDBTimeOrderedWindowStore underlyingStore;
private TimeOrderedCachingWindowStore cachingStore;
private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
@@ -112,7 +112,8 @@ public class TimeOrderedWindowStoreTest {
public void setUp(final boolean hasIndex) {
baseKeySchema = new TimeFirstWindowKeySchema();
- bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test",
"metrics-scope", 100, SEGMENT_INTERVAL, hasIndex);
+ bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test",
100, hasIndex,
+ new KeyValueSegments("test", "metrics-scope", 100,
SEGMENT_INTERVAL));
underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false,
WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
new file mode 100644
index 00000000000..df278a35972
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.delete;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests upgrade path from time-ordered window store (without headers) to
+ * time-ordered window store with headers using direct supplier creation.
+ * <p>
+ * This test validates lazy migration from ts-CF to headers-CF and ensures
+ * the store can read old data after upgrade.
+ */
+public class TimeOrderedWindowStoreUpgradeTest {
+
+ private static final String STORE_NAME = "time-ordered-window-store";
+ private static final long WINDOW_SIZE_MS = 1000L;
+ private static final long RETENTION_MS = Duration.ofMinutes(5).toMillis();
+ private static final long SEGMENT_INTERVAL_MS =
Duration.ofMinutes(1).toMillis();
+
+ private InternalMockProcessorContext<Bytes, byte[]> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.Bytes(),
+ Serdes.ByteArray(),
+ new StreamsConfig(props)
+ );
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (baseDir != null) {
+ try {
+ delete(baseDir);
+ } catch (final Exception e) {
+ // Ignore
+ }
+ }
+ }
+
+ /**
+ * Helper method to serialize value with headers in [headers][value]
format.
+ * This is used for window stores with headers where the format is:
+ * [headersSize(varint)][headersBytes][value]
+ */
+ private static byte[] serializeValueWithHeaders(final byte[] value, final
Headers headers) {
+ if (value == null) {
+ return null;
+ }
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
+ final byte[] rawHeaders = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+ ByteUtils.writeVarint(rawHeaders.length, out);
+ out.write(rawHeaders);
+ out.write(value);
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to serialize value with
headers", e);
+ }
+ }
+
+ /**
+ * Helper method to extract raw value from [headers][value] format.
+ * This is used for window stores with headers where the format is:
+ * [headersSize(varint)][headersBytes][value]
+ */
+ private static byte[] extractRawValue(final byte[] valueWithHeaders) {
+ if (valueWithHeaders == null) {
+ return null;
+ }
+ final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
+ final byte[] rawValue = new byte[buffer.remaining()];
+ buffer.get(rawValue);
+ return rawValue;
+ }
+
+ /**
+ * Helper method to extract headers from [headers][value] format.
+ * This is used for window stores with headers where the format is:
+ * [headersSize(varint)][headersBytes][value]
+ */
+ private static Headers extractHeaders(final byte[] valueWithHeaders) {
+ if (valueWithHeaders == null) {
+ return null;
+ }
+ final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ final byte[] headerBytes = new byte[headersSize];
+ buffer.get(headerBytes);
+ return HeadersDeserializer.deserialize(headerBytes);
+ }
+
+ @Test
+ public void shouldMigrateFromWithoutHeadersToWithHeaders() {
+ final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
+ new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ STORE_NAME,
+ RETENTION_MS,
+ SEGMENT_INTERVAL_MS,
+ WINDOW_SIZE_MS,
+ false,
+ true,
+ false // withHeaders = FALSE (old format)
+ );
+
+ final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ // Write data in old format
+ final long baseTime = System.currentTimeMillis();
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+ final Bytes key2 = Bytes.wrap("key2".getBytes());
+ final Bytes key3 = Bytes.wrap("key3".getBytes());
+
+ oldStore.put(key1, "value1".getBytes(), baseTime + 100);
+ oldStore.put(key2, "value2".getBytes(), baseTime + 200);
+ oldStore.put(key3, "value3".getBytes(), baseTime + 300);
+
+ // Verify old data
+ assertEquals("value1", new String(oldStore.fetch(key1, baseTime +
100)));
+ assertEquals("value2", new String(oldStore.fetch(key2, baseTime +
200)));
+ assertEquals("value3", new String(oldStore.fetch(key3, baseTime +
300)));
+
+ oldStore.close();
+
+ final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
+ new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ STORE_NAME,
+ RETENTION_MS,
+ SEGMENT_INTERVAL_MS,
+ WINDOW_SIZE_MS,
+ false, // retainDuplicates
+ true, // withIndex
+ true // withHeaders = TRUE (new format with headers support)
+ );
+
+ final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify we can read old data (lazy migration)
+ byte[] fetch = newStore.fetch(key1, baseTime + 100);
+ assertEquals("value1", new String(extractRawValue(fetch)));
+ assertEquals(0, extractHeaders(fetch).toArray().length, "Old data
should have empty headers after migration");
+ fetch = newStore.fetch(key2, baseTime + 200);
+ assertEquals("value2", new String(extractRawValue(fetch)));
+ assertEquals(0, extractHeaders(fetch).toArray().length, "Old data
should have empty headers after migration");
+ fetch = newStore.fetch(key3, baseTime + 300);
+ assertEquals("value3", new String(extractRawValue(fetch)));
+ assertEquals(0, extractHeaders(fetch).toArray().length, "Old data
should have empty headers after migration");
+
+
+ // Write new data (should use headers-CF)
+ final Bytes key4 = Bytes.wrap("key4".getBytes());
+
+ // Write key3 with empty headers
+ newStore.put(key3,
serializeValueWithHeaders("value3-updated".getBytes(), new RecordHeaders()),
baseTime + 350);
+
+ // Write key4 with actual headers
+ final RecordHeaders headersWithData = new RecordHeaders();
+ headersWithData.add("header-key-1", "header-value-1".getBytes());
+ headersWithData.add("header-key-2", "header-value-2".getBytes());
+ newStore.put(key4, serializeValueWithHeaders("value4".getBytes(),
headersWithData), baseTime + 400);
+
+ // Verify new data - raw values
+ assertEquals("value3-updated", new
String(extractRawValue(newStore.fetch(key3, baseTime + 350))));
+ assertEquals("value4", new String(extractRawValue(newStore.fetch(key4,
baseTime + 400))));
+
+ // Verify headers for key3 (empty headers)
+ final byte[] fetchedKey3Value = newStore.fetch(key3, baseTime + 350);
+ final Headers retrievedKey3Headers = extractHeaders(fetchedKey3Value);
+ assertEquals(0, retrievedKey3Headers.toArray().length);
+
+ // Verify headers for key4 (headers with data)
+ final byte[] fetchedKey4Value = newStore.fetch(key4, baseTime + 400);
+ final Headers retrievedKey4Headers = extractHeaders(fetchedKey4Value);
+ assertEquals(2, retrievedKey4Headers.toArray().length);
+ assertEquals("header-value-1", new
String(retrievedKey4Headers.lastHeader("header-key-1").value()));
+ assertEquals("header-value-2", new
String(retrievedKey4Headers.lastHeader("header-key-2").value()));
+
+ newStore.close();
+ }
+
+ @Test
+ public void shouldMigrateFromWithIndexToWithIndexAndHeaders() {
+ // Test: withIndex=true, withHeaders=false → withIndex=true,
withHeaders=true
+ final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
+ new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+ false, true, false // withIndex=true, withHeaders=false
+ );
+
+ final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ final long baseTime = System.currentTimeMillis();
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+
+ oldStore.put(key1, "value1".getBytes(), baseTime + 100);
+ oldStore.close();
+
+ // Upgrade to headers
+ final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
+ new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+ false, true, true // withIndex=true, withHeaders=true
+ );
+
+ final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify old data still accessible
+ assertEquals("value1", new String(extractRawValue(newStore.fetch(key1,
baseTime + 100))));
+
+ newStore.close();
+ }
+
+ @Test
+ public void shouldMigrateFromWithoutIndexToWithIndexAndHeaders() {
+ // Test: withIndex=false, withHeaders=false → withIndex=true,
withHeaders=true
+ final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier oldSupplier =
+ new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+ false, false, false // withIndex=false, withHeaders=false
+ );
+
+ final WindowStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ final long baseTime = System.currentTimeMillis();
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+
+ oldStore.put(key1, "value1".getBytes(), baseTime + 100);
+ oldStore.close();
+
+ // Upgrade to both index and headers
+ final RocksDbIndexedTimeOrderedWindowBytesStoreSupplier newSupplier =
+ new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, SEGMENT_INTERVAL_MS, WINDOW_SIZE_MS,
+ false, true, true // withIndex=true, withHeaders=true
+ );
+
+ final WindowStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify old data still accessible
+ assertEquals("value1", new String(extractRawValue(newStore.fetch(key1,
baseTime + 100))));
+
+ newStore.close();
+ }
+}