This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 0470baa97a3 KAFKA-20318: Add time ordered session store support to the
DSL (#21794)
0470baa97a3 is described below
commit 0470baa97a33be32dd3fd91acd2ecace03633055
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Mar 19 14:28:43 2026 -0400
KAFKA-20318: Add time ordered session store support to the DSL (#21794)
This PR fixes a bug where
`RocksDbTimeOrderedSessionBytesStoreSupplier.get` with
`withHeaders=true` created segments using `KeyValueSegment` (default-CF
only) instead of `SessionSegmentWithHeaders` (dual-CF with lazy
migration), which would cause the upgrade path from non-headers to
headers format to fail. The fix introduces
`RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders`, widens
`RocksDBTimeOrderedSessionStore` to accept either segment type, and
updates the supplier to create the correct bytes store. Upgrade tests
validate lazy migration at both the store layer and DSL supplier path,
plus a `TopologyTestDriver` end-to-end test exercises ON_WINDOW_CLOSE
with both PLAIN and HEADERS formats.
Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
<[email protected]>
Co-authored-by: Matthias J. Sax <[email protected]>
---
...tractRocksDBTimeOrderedSegmentedBytesStore.java | 106 ++++--
.../streams/state/internals/AbstractSegments.java | 4 +-
.../streams/state/internals/HasNextCondition.java | 2 +-
.../streams/state/internals/KeyValueSegment.java | 2 +-
.../state/internals/LogicalKeyValueSegment.java | 4 +-
.../state/internals/LogicalKeyValueSegments.java | 5 +-
.../internals/RocksDBSessionStoreWithHeaders.java | 3 +-
.../RocksDBTimeOrderedKeyValueBytesStore.java | 41 +--
...cksDBTimeOrderedSessionSegmentedBytesStore.java | 73 +---
...eredSessionSegmentedBytesStoreWithHeaders.java} | 51 ++-
.../internals/RocksDBTimeOrderedSessionStore.java | 4 +-
.../RocksDBTimeOrderedSessionStoreWithHeaders.java | 5 +-
...ocksDBTimeOrderedWindowSegmentedBytesStore.java | 81 ++--
.../internals/RocksDBTimeOrderedWindowStore.java | 6 +-
.../RocksDBTimeOrderedWindowStoreWithHeaders.java | 5 +-
...IndexedTimeOrderedWindowBytesStoreSupplier.java | 4 +-
...ocksDbTimeOrderedSessionBytesStoreSupplier.java | 23 +-
.../streams/state/internals/SegmentIterator.java | 2 +-
.../kafka/streams/state/internals/Segments.java | 2 +-
.../internals/TimeOrderedCachingWindowStore.java | 6 +-
.../TimestampedWindowStoreWithHeadersBuilder.java | 3 -
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 46 ++-
.../internals/AbstractSessionBytesStoreTest.java | 18 +-
.../internals/LogicalKeyValueSegmentsTest.java | 6 +-
...ksDBTimeOrderedSessionStoreWithHeadersTest.java | 2 +-
...xedTimeOrderedWindowBytesStoreSupplierTest.java | 16 +-
...imeOrderedCachingPersistentWindowStoreTest.java | 7 +-
.../TimeOrderedSessionStoreUpgradeTest.java | 410 +++++++++++++++++++++
.../internals/TimeOrderedWindowStoreTest.java | 8 +-
.../TimestampedWindowStoreBuilderTest.java | 3 +-
30 files changed, 679 insertions(+), 269 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index e87403b4289..bec19c91abc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -16,18 +16,26 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.Windowed;
+import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.state.KeyValueIterator;
-import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
-import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
+import java.util.function.Function;
/**
* RocksDB store backed by two SegmentedBytesStores which can optimize scan by
time as well as window
@@ -59,9 +67,11 @@ import java.util.Optional;
* @see RocksDBTimeOrderedWindowSegmentedBytesStore
*/
public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
- abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes,
byte[]> {
+ private long minTimestamp;
+
+ public abstract class IndexToBaseStoreIterator implements
KeyValueIterator<Bytes, byte[]> {
private final KeyValueIterator<Bytes, byte[]> indexIterator;
private byte[] cachedValue;
@@ -115,32 +125,64 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends Se
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
-
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
final long retention,
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
final AbstractSegments<S>
segments) {
super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+ minTimestamp = Long.MAX_VALUE;
+ }
+
+ Map<S, WriteBatch> getWriteBatches(
+ final Collection<ConsumerRecord<byte[], byte[]>> records,
+ final Function<byte[], Long> timestampExtractor,
+ final Function<byte[], byte[]> indexedKeyExtractor,
+ final Function<byte[], byte[]> baseKeyExtractor
+ ) {
+ // advance stream time to the max timestamp in the batch
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final long timestamp = timestampExtractor.apply(record.key());
+ minTimestamp = Math.min(minTimestamp, timestamp);
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ }
+
+ final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final long timestamp = timestampExtractor.apply(record.key());
+ final long segmentId = segments.segmentId(timestamp);
+ final S segment = segments.getOrCreateSegmentIfLive(segmentId,
internalProcessorContext, observedStreamTime);
+ if (segment != null) {
+
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
+ record,
+ consistencyEnabled,
+ position
+ );
+ try {
+ final WriteBatch batch =
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
+
+ // Assuming changelog record is serialized using
SessionKeySchema
+ // from ChangeLoggingSessionBytesStore. Reconstruct
key/value to restore
+ if (hasIndex()) {
+ final byte[] indexKey =
indexedKeyExtractor.apply(record.key());
+ // Take care of tombstone
+ final byte[] value = record.value() == null ? null :
new byte[0];
+ segment.addToBatch(new KeyValue<>(indexKey, value),
batch);
+ }
+
+ final byte[] baseKey =
baseKeyExtractor.apply(record.key());
+ segment.addToBatch(new KeyValue<>(baseKey,
record.value()), batch);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error restoring batch
to store " + name(), e);
+ }
+ }
+ }
+ return writeBatchMap;
+ }
+
+ protected long minTimestamp() {
+ return minTimestamp;
}
@Override
@@ -167,6 +209,22 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends Se
throw new UnsupportedOperationException("This store does not support
fetch with timestamp and seqnum");
}
+ public byte[] fetchSession(final Bytes key, final long sessionStartTime,
final long sessionEndTime) {
+ throw new UnsupportedOperationException("This store does not support
fetchSession");
+ }
+
+ public KeyValueIterator<Bytes, byte[]> fetchSessions(final long
earliestSessionEndTime, final long latestSessionEndTime) {
+ throw new UnsupportedOperationException("This store does not support
fetchSessions");
+ }
+
+ public void remove(final Windowed<Bytes> key) {
+ throw new UnsupportedOperationException("This store does not support
remove with Windowed key");
+ }
+
+ public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+ throw new UnsupportedOperationException("This store does not support
put with Windowed key");
+ }
+
KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
final long to,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index ae292a44f8b..f386b24c18f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -38,7 +38,7 @@ import java.util.NavigableMap;
import java.util.SimpleTimeZone;
import java.util.TreeMap;
-abstract class AbstractSegments<S extends Segment> implements Segments<S> {
+public abstract class AbstractSegments<S extends Segment> implements
Segments<S> {
private static final Logger log =
LoggerFactory.getLogger(AbstractSegments.class);
final TreeMap<Long, S> segments = new TreeMap<>();
@@ -184,7 +184,7 @@ abstract class AbstractSegments<S extends Segment>
implements Segments<S> {
}
}
- @SuppressWarnings("deprecation")
+ @Deprecated
@Override
public boolean managesOffsets() {
return true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
index 8784dbad1bc..0b60cd13f34 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
@@ -19,6 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
-interface HasNextCondition {
+public interface HasNextCondition {
boolean hasNext(final KeyValueIterator<Bytes, ?> iterator);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 9935b80abf1..69400f497ff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -26,7 +26,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
-class KeyValueSegment extends RocksDBStore implements Segment {
+public class KeyValueSegment extends RocksDBStore implements Segment {
private final long id;
KeyValueSegment(final String segmentName,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index 1adbac447b0..ba6215ecca0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -53,7 +53,7 @@ import static
org.apache.kafka.streams.state.internals.RocksDBStore.incrementWit
* stores a key into a shared physical store by prepending the key with a
prefix (unique to
* the specific logical segment), and storing the combined key into the
physical store.
*/
-class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
+public class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
private static final Logger log =
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
private final long id;
@@ -156,7 +156,7 @@ class LogicalKeyValueSegment implements Segment,
VersionedStoreSegment {
iterators = new HashSet<>(openIterators);
openIterators.clear();
}
- if (iterators.size() != 0) {
+ if (!iterators.isEmpty()) {
log.warn("Closing {} open iterators for store {}",
iterators.size(), name);
for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
iterator.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index fbf7722629c..01c91efd602 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -86,8 +86,8 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
}
// VisibleForTesting
- LogicalKeyValueSegment getReservedSegment(final long segmentId) {
- return reservedSegments.get(segmentId);
+ LogicalKeyValueSegment getReservedSegment() {
+ return reservedSegments.get(-1L);
}
@Override
@@ -108,6 +108,7 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
}
@SuppressWarnings("deprecation")
+ @Deprecated
@Override
public boolean managesOffsets() {
return physicalStore.managesOffsets();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
index 77fb98fabff..7e171ba952a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -27,7 +28,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
* RocksDB-backed session store with support for record headers.
* <p>
* This store extends {@link RocksDBSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,
* as IQv2 query handling is done at the metered layer.
* <p>
* The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
index 48ea130ab31..37c6dce156b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java
@@ -19,15 +19,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
-import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
-import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -36,8 +32,6 @@ import java.util.Optional;
*/
public class RocksDBTimeOrderedKeyValueBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
- private long minTimestamp;
-
RocksDBTimeOrderedKeyValueBytesStore(final String name,
final String metricsScope) {
super(name,
@@ -45,7 +39,6 @@ public class RocksDBTimeOrderedKeyValueBytesStore extends
AbstractRocksDBTimeOrd
new TimeFirstWindowKeySchema(),
Optional.empty(),
new KeyValueSegments(name, metricsScope, Long.MAX_VALUE,
Long.MAX_VALUE));
- minTimestamp = Long.MAX_VALUE;
}
@Override
@@ -55,31 +48,12 @@ public class RocksDBTimeOrderedKeyValueBytesStore extends
AbstractRocksDBTimeOrd
@Override
Map<KeyValueSegment, WriteBatch> getWriteBatches(final
Collection<ConsumerRecord<byte[], byte[]>> records) {
- final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- final long timestamp =
WindowKeySchema.extractStoreTimestamp(record.key());
- observedStreamTime = Math.max(observedStreamTime, timestamp);
- minTimestamp = Math.min(minTimestamp, timestamp);
- final long segmentId = segments.segmentId(timestamp);
- final KeyValueSegment segment =
segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext,
observedStreamTime);
- if (segment != null) {
- //null segment is if it has expired, so we don't want those
records
-
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
- record,
- consistencyEnabled,
- position
- );
- try {
- final WriteBatch batch =
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
-
- final byte[] baseKey =
TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
- segment.addToBatch(new KeyValue<>(baseKey,
record.value()), batch);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error restoring batch
to store " + name(), e);
- }
- }
- }
- return writeBatchMap;
+ return getWriteBatches(
+ records,
+ WindowKeySchema::extractStoreTimestamp,
+ null, // never an indexed store -- not needed
+ TimeFirstWindowKeySchema::fromNonPrefixWindowKey
+ );
}
@Override
@@ -87,7 +61,4 @@ public class RocksDBTimeOrderedKeyValueBytesStore extends
AbstractRocksDBTimeOrd
throw new UnsupportedOperationException("Do not use for
TimeOrderedKeyValueStore");
}
- protected long minTimestamp() {
- return minTimestamp;
- }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index 3e1f4c5ea5c..dbb7713ab35 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -19,19 +19,15 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.state.KeyValueIterator;
import
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema;
import
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema;
-import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -39,7 +35,7 @@ import java.util.Optional;
/**
* A RocksDB backed time-ordered segmented bytes store for session key schema.
*/
-public class RocksDBTimeOrderedSessionSegmentedBytesStore extends
AbstractRocksDBTimeOrderedSegmentedBytesStore<KeyValueSegment> {
+public class RocksDBTimeOrderedSessionSegmentedBytesStore<S extends Segment>
extends AbstractRocksDBTimeOrderedSegmentedBytesStore<S> {
private class SessionKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
@@ -50,23 +46,24 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore
extends AbstractRocksD
protected Bytes getBaseKey(final Bytes indexKey) {
final Window window =
KeyFirstSessionKeySchema.extractWindow(indexKey.get());
final byte[] key =
KeyFirstSessionKeySchema.extractKeyBytes(indexKey.get());
-
return TimeFirstSessionKeySchema.toBinary(Bytes.wrap(key),
window.start(), window.end());
}
}
RocksDBTimeOrderedSessionSegmentedBytesStore(final String name,
- final String metricsScope,
final long retention,
- final long segmentInterval,
- final boolean withIndex) {
- super(name,
+ final boolean withIndex,
+ final AbstractSegments<S>
segments) {
+ super(
+ name,
retention,
new TimeFirstSessionKeySchema(),
Optional.ofNullable(withIndex ? new KeyFirstSessionKeySchema() :
null),
- new KeyValueSegments(name, metricsScope, retention,
segmentInterval));
+ segments
+ );
}
+ @Override
public byte[] fetchSession(final Bytes key,
final long sessionStartTime,
final long sessionEndTime) {
@@ -77,9 +74,10 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore
extends AbstractRocksD
));
}
+ @Override
public KeyValueIterator<Bytes, byte[]> fetchSessions(final long
earliestSessionEndTime,
final long
latestSessionEndTime) {
- final List<KeyValueSegment> searchSpace =
segments.segments(earliestSessionEndTime, latestSessionEndTime, true);
+ final List<S> searchSpace = segments.segments(earliestSessionEndTime,
latestSessionEndTime, true);
// here we want [0, latestSE, FF] as the upper bound to cover any
possible keys,
// but since we can only get upper bound based on timestamps, we use a
slight larger upper bound as [0, latestSE+1]
@@ -107,10 +105,12 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore
extends AbstractRocksD
true);
}
+ @Override
public void remove(final Windowed<Bytes> key) {
remove(TimeFirstSessionKeySchema.toBinary(key));
}
+ @Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
put(TimeFirstSessionKeySchema.toBinary(sessionKey), aggregate);
}
@@ -123,50 +123,17 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore
extends AbstractRocksD
}
@Override
- Map<KeyValueSegment, WriteBatch> getWriteBatches(
- final Collection<ConsumerRecord<byte[], byte[]>> records) {
- // advance stream time to the max timestamp in the batch
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- final long timestamp =
SessionKeySchema.extractEndTimestamp(record.key());
- observedStreamTime = Math.max(observedStreamTime, timestamp);
- }
-
- final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- final long timestamp =
SessionKeySchema.extractEndTimestamp(record.key());
- final long segmentId = segments.segmentId(timestamp);
- final KeyValueSegment segment =
segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext,
observedStreamTime);
- if (segment != null) {
-
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
- record,
- consistencyEnabled,
- position
- );
- try {
- final WriteBatch batch =
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
-
- // Assuming changelog record is serialized using
SessionKeySchema
- // from ChangeLoggingSessionBytesStore. Reconstruct
key/value to restore
- if (hasIndex()) {
- final byte[] indexKey =
KeyFirstSessionKeySchema.prefixNonPrefixSessionKey(record.key());
- // Take care of tombstone
- final byte[] value = record.value() == null ? null :
new byte[0];
- segment.addToBatch(new KeyValue<>(indexKey, value),
batch);
- }
-
- final byte[] baseKey =
TimeFirstSessionKeySchema.extractWindowBytesFromNonPrefixSessionKey(record.key());
- segment.addToBatch(new KeyValue<>(baseKey,
record.value()), batch);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error restoring batch
to store " + name(), e);
- }
- }
- }
- return writeBatchMap;
+ Map<S, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[],
byte[]>> records) {
+ return getWriteBatches(
+ records,
+ SessionKeySchema::extractEndTimestamp,
+ KeyFirstSessionKeySchema::prefixNonPrefixSessionKey,
+
TimeFirstSessionKeySchema::extractWindowBytesFromNonPrefixSessionKey
+ );
}
@Override
- protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(
- final SegmentIterator<KeyValueSegment> segmentIterator) {
+ protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(final
SegmentIterator<S> segmentIterator) {
return new SessionKeySchemaIndexToBaseStoreIterator(segmentIterator);
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java
similarity index 51%
copy from
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
copy to
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java
index aa513138d9f..4ac63da1110 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java
@@ -21,29 +21,56 @@ import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
-import org.apache.kafka.streams.state.HeadersBytesStore;
/**
- * RocksDB-backed time-ordered session store with support for record headers.
+ * A RocksDB-backed time-ordered segmented bytes store with headers support
for session key schema.
* <p>
- * This store extends {@link RocksDBTimeOrderedSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
- * as IQv2 query handling is done at the metered layer.
+ * This store extends {@link AbstractRocksDBTimeOrderedSegmentedBytesStore}
and uses
+ * {@link SessionSegmentsWithHeaders} to manage segments with full header
support,
+ * including Column Family management and lazy migration from legacy formats.
* <p>
- * The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
+ * The store maintains a dual-schema architecture:
+ * <ul>
+ * <li>Base store: Time-first session key schema for efficient time-range
queries</li>
+ * <li>Index store (optional): Key-first session key schema for efficient
key-based queries</li>
+ * </ul>
+ * <p>
+ * Headers are managed at the segment level by {@link
SessionSegmentWithHeaders}.
+ * <p>
+ * Value format (timestamps are in the key, not in the value):
+ * <ul>
+ * <li>Old format: {@code [aggregationBytes]}</li>
+ * <li>New format: {@code
[headersSize(varint)][headersBytes][aggregationBytes]}</li>
+ * </ul>
*
* @see RocksDBTimeOrderedSessionStore
+ * @see SessionSegmentsWithHeaders
+ * @see SessionSegmentWithHeaders
*/
-class RocksDBTimeOrderedSessionStoreWithHeaders extends
RocksDBTimeOrderedSessionStore implements HeadersBytesStore {
+class RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders
+ extends
RocksDBTimeOrderedSessionSegmentedBytesStore<SessionSegmentWithHeaders> {
- RocksDBTimeOrderedSessionStoreWithHeaders(final
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
- super(store);
+ RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders(
+ final String name,
+ final String metricsScope,
+ final long retention,
+ final long segmentInterval,
+ final boolean withIndex
+ ) {
+ super(
+ name,
+ retention,
+ withIndex,
+ new SessionSegmentsWithHeaders(name, metricsScope, retention,
segmentInterval)
+ );
}
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
final QueryResult<R> result;
final Position position = getPosition();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index cf46e2d5f2d..9e2caca3699 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -31,12 +31,12 @@ import
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFi
import java.util.Objects;
public class RocksDBTimeOrderedSessionStore
- extends WrappedStateStore<RocksDBTimeOrderedSessionSegmentedBytesStore,
Object, Object>
+ extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<?
extends Segment>, Object, Object>
implements SessionStore<Bytes, byte[]> {
private StateStoreContext stateStoreContext;
- RocksDBTimeOrderedSessionStore(final
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
+ RocksDBTimeOrderedSessionStore(final
AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment> store) {
super(store);
Objects.requireNonNull(store, "store is null");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
index aa513138d9f..e4e517dc1e2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -27,7 +28,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
* RocksDB-backed time-ordered session store with support for record headers.
* <p>
* This store extends {@link RocksDBTimeOrderedSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,
* as IQv2 query handling is done at the metered layer.
* <p>
* The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
@@ -36,7 +37,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
*/
class RocksDBTimeOrderedSessionStoreWithHeaders extends
RocksDBTimeOrderedSessionStore implements HeadersBytesStore {
- RocksDBTimeOrderedSessionStoreWithHeaders(final
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
+ RocksDBTimeOrderedSessionStoreWithHeaders(final
RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders store) {
super(store);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
index 8829c7fb063..249933df1ff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowSegmentedBytesStore.java
@@ -19,16 +19,13 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
-import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.state.KeyValueIterator;
import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
import
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
-import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -43,15 +40,37 @@ import java.util.Optional;
*/
public class RocksDBTimeOrderedWindowSegmentedBytesStore<S extends Segment>
extends AbstractRocksDBTimeOrderedSegmentedBytesStore<S> {
+ /**
+ * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
+ * Converts index keys (key-first schema) to base store keys (time-first
schema).
+ * <p>
+ * This can be reused by both window store implementations (with and
without headers).
+ */
+ class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
+ WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
+ super(indexIterator);
+ }
+
+ @Override
+ protected Bytes getBaseKey(final Bytes indexKey) {
+ final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
+ final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
+ final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
+ return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
+ }
+ }
+
RocksDBTimeOrderedWindowSegmentedBytesStore(final String name,
final long retention,
final boolean withIndex,
final AbstractSegments<S>
segments) {
- super(name,
+ super(
+ name,
retention,
new TimeFirstWindowKeySchema(),
Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() :
null),
- segments);
+ segments
+ );
}
@Override
@@ -70,55 +89,21 @@ public class RocksDBTimeOrderedWindowSegmentedBytesStore<S
extends Segment> exte
final byte[] key =
TimeFirstWindowKeySchema.extractStoreKeyBytes(baseKey.get());
final long timestamp =
TimeFirstWindowKeySchema.extractStoreTimestamp(baseKey.get());
final int seqnum =
TimeFirstWindowKeySchema.extractStoreSequence(baseKey.get());
-
return KeyValue.pair(KeyFirstWindowKeySchema.toStoreKeyBinary(key,
timestamp, seqnum), new byte[0]);
}
@Override
- Map<S, WriteBatch> getWriteBatches(
- final Collection<ConsumerRecord<byte[], byte[]>> records) {
- // advance stream time to the max timestamp in the batch
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- final long timestamp =
WindowKeySchema.extractStoreTimestamp(record.key());
- observedStreamTime = Math.max(observedStreamTime, timestamp);
- }
-
- final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- final long timestamp =
WindowKeySchema.extractStoreTimestamp(record.key());
- final long segmentId = segments.segmentId(timestamp);
- final S segment = segments.getOrCreateSegmentIfLive(segmentId,
internalProcessorContext, observedStreamTime);
- if (segment != null) {
-
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
- record,
- consistencyEnabled,
- position
- );
- try {
- final WriteBatch batch =
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
-
- // Assuming changelog record is serialized using
WindowKeySchema
- // from ChangeLoggingTimestampedWindowBytesStore.
Reconstruct key/value to restore
- if (hasIndex()) {
- final byte[] indexKey =
KeyFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
- // Take care of tombstone
- final byte[] value = record.value() == null ? null :
new byte[0];
- segment.addToBatch(new KeyValue<>(indexKey, value),
batch);
- }
-
- final byte[] baseKey =
TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
- segment.addToBatch(new KeyValue<>(baseKey,
record.value()), batch);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error restoring batch
to store " + name(), e);
- }
- }
- }
- return writeBatchMap;
+ Map<S, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[],
byte[]>> records) {
+ return getWriteBatches(
+ records,
+ WindowKeySchema::extractStoreTimestamp,
+ KeyFirstWindowKeySchema::fromNonPrefixWindowKey,
+ TimeFirstWindowKeySchema::fromNonPrefixWindowKey
+ );
}
@Override
- protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(
- final SegmentIterator<S> segmentIterator) {
+ protected IndexToBaseStoreIterator getIndexToBaseStoreIterator(final
SegmentIterator<S> segmentIterator) {
return new WindowKeySchemaIndexToBaseStoreIterator(segmentIterator);
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index 79e3541af58..ebe1f4c5614 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -35,8 +35,8 @@ import java.util.Map;
import java.util.Objects;
-public class RocksDBTimeOrderedWindowStore
- extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<?
extends Segment>, Object, Object>
+public class RocksDBTimeOrderedWindowStore<S extends Segment>
+ extends
WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<S>, Object,
Object>
implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
private final boolean retainDuplicates;
@@ -46,7 +46,7 @@ public class RocksDBTimeOrderedWindowStore
private StateStoreContext stateStoreContext;
RocksDBTimeOrderedWindowStore(
- final AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment>
store,
+ final AbstractRocksDBTimeOrderedSegmentedBytesStore<S> store,
final boolean retainDuplicates,
final long windowSize
) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
index f258cf57710..8d4d5a23b44 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreWithHeaders.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -31,7 +32,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
* {@link TimestampedBytesStore} (for timestamp support) and {@link
HeadersBytesStore}
* (for header support) marker interfaces.
* <p>
- * This store returns {@link QueryResult#forUnknownQueryType(Query, Object)}
for all queries,
+ * This store returns {@link QueryResult#forUnknownQueryType(Query,
StateStore)} for all queries,
* as IQv2 query handling is done at the metered layer.
* <p>
* The storage format for values is:
[headersSize(varint)][headersBytes][timestamp(8)][value]
@@ -40,7 +41,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
* @see HeadersBytesStore
* @see TimestampedBytesStore
*/
-class RocksDBTimeOrderedWindowStoreWithHeaders extends
RocksDBTimeOrderedWindowStore implements TimestampedBytesStore,
HeadersBytesStore {
+class RocksDBTimeOrderedWindowStoreWithHeaders extends
RocksDBTimeOrderedWindowStore<WindowSegmentWithHeaders> implements
TimestampedBytesStore, HeadersBytesStore {
RocksDBTimeOrderedWindowStoreWithHeaders(final
RocksDBTimeOrderedWindowSegmentedBytesStore<WindowSegmentWithHeaders> store,
final boolean retainDuplicates,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index 332d2d6973c..968821eb5c4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -117,7 +117,7 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
public WindowStore<Bytes, byte[]> get() {
switch (windowStoreType) {
case DEFAULT_WINDOW_STORE:
- return new RocksDBTimeOrderedWindowStore(
+ return new RocksDBTimeOrderedWindowStore<>(
new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
name,
retentionPeriod,
@@ -126,7 +126,7 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
retainDuplicates,
windowSize);
case INDEXED_WINDOW_STORE:
- return new RocksDBTimeOrderedWindowStore(
+ return new RocksDBTimeOrderedWindowStore<>(
new RocksDBTimeOrderedWindowSegmentedBytesStore<>(
name,
retentionPeriod,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
index 25473fb97d7..6212e235604 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
@@ -49,17 +49,24 @@ public class RocksDbTimeOrderedSessionBytesStoreSupplier
implements SessionBytes
@Override
public SessionStore<Bytes, byte[]> get() {
- final RocksDBTimeOrderedSessionSegmentedBytesStore bytesStore =
- new RocksDBTimeOrderedSessionSegmentedBytesStore(
+ if (withHeaders) {
+ final RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders
bytesStore =
+ new RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders(
+ name,
+ metricsScope(),
+ retentionPeriod,
+ segmentIntervalMs(),
+ withIndex
+ );
+ return new RocksDBTimeOrderedSessionStore(bytesStore);
+ }
+ final RocksDBTimeOrderedSessionSegmentedBytesStore<KeyValueSegment>
bytesStore =
+ new RocksDBTimeOrderedSessionSegmentedBytesStore<>(
name,
- metricsScope(),
retentionPeriod,
- segmentIntervalMs(),
- withIndex
+ withIndex,
+ new KeyValueSegments(name, metricsScope(), retentionPeriod,
segmentIntervalMs())
);
- if (withHeaders) {
- return new RocksDBTimeOrderedSessionStoreWithHeaders(bytesStore);
- }
return new RocksDBTimeOrderedSessionStore(bytesStore);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 9aabc787c89..a0c52db873b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -27,7 +27,7 @@ import java.util.NoSuchElementException;
/**
* Iterate over multiple KeyValueSegments
*/
-class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes,
byte[]> {
+public class SegmentIterator<S extends Segment> implements
KeyValueIterator<Bytes, byte[]> {
private final Bytes from;
private final Bytes to;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 9c902da34a1..dbaea7dc302 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -42,7 +42,7 @@ interface Segments<S extends Segment> {
void commit(final Map<TopicPartition, Long> changelogOffsets);
- @SuppressWarnings("deprecation")
+ @Deprecated
boolean managesOffsets();
Long committedOffset(final TopicPartition partition);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 4ee73bdbd0b..4644f166cc1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -91,7 +91,7 @@ public class TimeOrderedCachingWindowStore
}
private void enforceWrappedStore(final WindowStore<Bytes, byte[]>
underlying) {
- final RocksDBTimeOrderedWindowStore timeOrderedWindowStore =
getWrappedStore(underlying);
+ final RocksDBTimeOrderedWindowStore<?> timeOrderedWindowStore =
getWrappedStore(underlying);
if (timeOrderedWindowStore == null) {
throw new IllegalArgumentException("TimeOrderedCachingWindowStore
only supports RocksDBTimeOrderedWindowStore backed store");
}
@@ -100,9 +100,9 @@ public class TimeOrderedCachingWindowStore
}
@SuppressWarnings("unchecked")
- private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore
wrapped) {
+ private RocksDBTimeOrderedWindowStore<?> getWrappedStore(final StateStore
wrapped) {
if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
- return (RocksDBTimeOrderedWindowStore) wrapped;
+ return (RocksDBTimeOrderedWindowStore<?>) wrapped;
}
if (wrapped instanceof WrappedStateStore) {
return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>)
wrapped).wrapped());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 5689ad229f3..3714643d43b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -115,9 +115,6 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
return true;
}
- if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {
- return true;
- }
if (stateStore instanceof WrappedStateStore) {
return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>)
stateStore).wrapped());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index c4d1ef855f7..3b2265959fe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -183,20 +183,18 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
new KeyValueSegments(storeName, METRICS_SCOPE,
retention, segmentInterval)
);
case SessionSchemaWithIndex:
- return new RocksDBTimeOrderedSessionSegmentedBytesStore(
- storeName,
- METRICS_SCOPE,
- retention,
- segmentInterval,
- true
+ return new RocksDBTimeOrderedSessionSegmentedBytesStore<>(
+ storeName,
+ retention,
+ true,
+ new KeyValueSegments(storeName, METRICS_SCOPE, retention,
segmentInterval)
);
case SessionSchemaWithoutIndex:
- return new RocksDBTimeOrderedSessionSegmentedBytesStore(
- storeName,
- METRICS_SCOPE,
- retention,
- segmentInterval,
- false
+ return new RocksDBTimeOrderedSessionSegmentedBytesStore<>(
+ storeName,
+ retention,
+ false,
+ new KeyValueSegments(storeName, METRICS_SCOPE, retention,
segmentInterval)
);
default:
throw new IllegalStateException("Unknown SchemaType: " +
schemaType());
@@ -852,28 +850,28 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])),
expectedValue4);
// Record expired as timestampFromRawKey = 1000 while
observedStreamTime = 60,000 and retention = 1000.
- final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
+ final byte[] value1 =
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
key1, windows[0].start(), windows[0].end());
assertNull(value1);
// Record expired as timestampFromRawKey = 1000 while
observedStreamTime = 60,000 and retention = 1000.
- final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
+ final byte[] value2 =
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
key1, windows[1].start(), windows[1].end());
assertNull(value2);
// expired record
// timestampFromRawKey = 1500 while observedStreamTime = 60,000 and
retention = 1000.
- final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
+ final byte[] value3 =
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
key2, windows[2].start(), windows[2].end());
assertNull(value3);
// only non-expired record
// timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and
retention = 1000.
- final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
+ final byte[] value4 =
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
key3, windows[3].start(), windows[3].end());
assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
- final byte[] noValue = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
+ final byte[] noValue =
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>) bytesStore).fetchSession(
key3, 2000, 3000);
assertNull(noValue);
}
@@ -900,7 +898,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
// Fetch point
assertEquals(
Collections.singletonList(KeyValue.pair(new Windowed<>(keyA,
sessionWindows[0]), 10L)),
-
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSessions(100L, 100L))
+
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(100L, 100L))
);
// Fetch partial boundary
@@ -909,7 +907,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
),
-
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSessions(100L, 200L))
+
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(100L, 200L))
);
// Fetch partial
@@ -918,11 +916,11 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
),
-
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSessions(99L, 201L))
+
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(99L, 201L))
);
// Fetch partial
- try (final KeyValueIterator<Bytes, byte[]> values =
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L,
199L)) {
+ try (final KeyValueIterator<Bytes, byte[]> values =
((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(101L, 199L)) {
assertTrue(toListAndCloseIterator(values).isEmpty());
}
@@ -933,7 +931,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
),
-
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSessions(100L, 300L))
+
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(100L, 300L))
);
// Fetch all
@@ -943,13 +941,13 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
),
-
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSessions(99L, 301L))
+
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(99L, 301L))
);
// Fetch all
assertEquals(
Collections.singletonList(KeyValue.pair(new Windowed<>(keyB,
sessionWindows[1]), 100L)),
-
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSessions(101L, 299L))
+
toListAndCloseIterator(((RocksDBTimeOrderedSessionSegmentedBytesStore<?>)
bytesStore).fetchSessions(101L, 299L))
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index d70bcc964db..a3855dd41de 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -150,28 +150,14 @@ public abstract class AbstractSessionBytesStoreTest {
}
case RocksDBTimeOrderedSessionStoreWithHeadersWithIndex: {
return Stores.sessionStoreBuilder(
- new
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME,
retentionPeriod, true) {
- @Override
- public SessionStore<Bytes, byte[]> get() {
- return new
RocksDBTimeOrderedSessionStoreWithHeaders(
- new
RocksDBTimeOrderedSessionSegmentedBytesStore(
- name(), metricsScope(),
retentionPeriod(), segmentIntervalMs(), true));
- }
- },
+ new
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME,
retentionPeriod, true, true),
keySerde,
valueSerde
).build();
}
case RocksDBTimeOrderedSessionStoreWithHeadersWithoutIndex: {
return Stores.sessionStoreBuilder(
- new
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME,
retentionPeriod, false) {
- @Override
- public SessionStore<Bytes, byte[]> get() {
- return new
RocksDBTimeOrderedSessionStoreWithHeaders(
- new
RocksDBTimeOrderedSessionSegmentedBytesStore(
- name(), metricsScope(),
retentionPeriod(), segmentIntervalMs(), false));
- }
- },
+ new
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME,
retentionPeriod, false, true),
keySerde,
valueSerde
).build();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 4e053da9b7c..2fbbfe4e947 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -51,7 +51,7 @@ public class LogicalKeyValueSegmentsTest {
private static final String METRICS_SCOPE = "metrics-scope";
private static final String DB_FILE_DIR = "rocksdb";
- private InternalMockProcessorContext context;
+ private InternalMockProcessorContext<?, ?> context;
private LogicalKeyValueSegments segments;
@@ -157,7 +157,7 @@ public class LogicalKeyValueSegmentsTest {
final List<LogicalKeyValueSegment> allSegments =
segments.allSegments(true);
assertEquals(1, allSegments.size());
assertEquals(segment2, allSegments.get(0));
- assertEquals(reservedSegment, segments.getReservedSegment(-1));
+ assertEquals(reservedSegment, segments.getReservedSegment());
}
@Test
@@ -225,7 +225,7 @@ public class LogicalKeyValueSegmentsTest {
segments.close();
assertThat(segments.segmentForTimestamp(0), is(nullValue()));
- assertThat(segments.getReservedSegment(-1), is(nullValue()));
+ assertThat(segments.getReservedSegment(), is(nullValue()));
// verify iterators closed as well
assertThrows(InvalidStateStoreException.class, all1::hasNext);
assertThrows(InvalidStateStoreException.class, all2::hasNext);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
index 3f9c77bf30a..d9647494a66 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
@@ -64,7 +64,7 @@ public class RocksDBTimeOrderedSessionStoreWithHeadersTest {
);
sessionStore = new RocksDBTimeOrderedSessionStoreWithHeaders(
- new RocksDBTimeOrderedSessionSegmentedBytesStore(
+ new RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders(
STORE_NAME,
"test-metrics-scope",
RETENTION_PERIOD,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
index ea7c2d56acf..d16df93141d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
@@ -58,26 +58,26 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
@Test
public void shouldCreateRocksDbTimeOrderedWindowStoreWithIndex() {
- final WindowStore store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true, false).get();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true, false).get();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
- assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore)
wrapped).hasIndex());
+ assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>)
wrapped).hasIndex());
}
@Test
public void shouldCreateRocksDbTimeOrderedWindowStoreWithoutIndex() {
- final WindowStore store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, false, false).get();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, false, false).get();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
- assertFalse(((RocksDBTimeOrderedWindowSegmentedBytesStore)
wrapped).hasIndex());
+ assertFalse(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>)
wrapped).hasIndex());
}
@Test
public void shouldCreateRocksDbTimeOrderedWindowStoreWithHeaders() {
- final WindowStore store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true, true).get();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true, true).get();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(store,
instanceOf(RocksDBTimeOrderedWindowStoreWithHeaders.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
assertTrue(((RocksDBTimeOrderedWindowSegmentedBytesStore<?>)
wrapped).hasIndex());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index e73230428f5..e6bfa0b27c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -115,7 +115,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
baseKeySchema = new TimeFirstWindowKeySchema();
bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test",
100, hasIndex,
new KeyValueSegments("test", "metrics-scope", 100,
SEGMENT_INTERVAL));
- underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false,
WINDOW_SIZE);
+ underlyingStore = new RocksDBTimeOrderedWindowStore<>(bytesStore,
false, WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);
cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new
StringDeserializer());
@@ -136,7 +136,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
@ValueSource(booleans = {true, false})
public void shouldDelegateInit(final boolean hasIndex) {
setUp(hasIndex);
- final RocksDBTimeOrderedWindowStore inner =
mock(RocksDBTimeOrderedWindowStore.class);
+ final RocksDBTimeOrderedWindowStore<?> inner =
mock(RocksDBTimeOrderedWindowStore.class);
when(inner.hasIndex()).thenReturn(hasIndex);
final TimeOrderedCachingWindowStore outer = new
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
@@ -154,7 +154,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
assertThat(e.getMessage(),
containsString("TimeOrderedCachingWindowStore only supports
RocksDBTimeOrderedWindowStore backed store"));
- final RocksDBTimeOrderedWindowStore inner =
mock(RocksDBTimeOrderedWindowStore.class);
+ final RocksDBTimeOrderedWindowStore<?> inner =
mock(RocksDBTimeOrderedWindowStore.class);
// Nothing happens
new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE,
SEGMENT_INTERVAL);
}
@@ -1257,6 +1257,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
verifyAndTearDownCloseTests();
}
+ @SuppressWarnings("unchecked")
private void setUpCloseTests() {
underlyingStore = mock(RocksDBTimeOrderedWindowStore.class);
when(underlyingStore.name()).thenReturn("store-name");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
new file mode 100644
index 00000000000..d9edd2cacdc
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
+import org.apache.kafka.streams.state.DslSessionParams;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.delete;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests upgrade path from time-ordered session store (without headers) to
+ * time-ordered session store with headers using direct supplier creation.
+ * <p>
+ * This test validates lazy migration from DEFAULT CF to headers CF and ensures
+ * the store can read old data after upgrade.
+ */
+public class TimeOrderedSessionStoreUpgradeTest {
+
+ private static final String STORE_NAME = "time-ordered-session-store";
+ private static final long RETENTION_MS = Duration.ofMinutes(5).toMillis();
+
+ private InternalMockProcessorContext<Bytes, byte[]> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.Bytes(),
+ Serdes.ByteArray(),
+ new StreamsConfig(props)
+ );
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (baseDir != null) {
+ try {
+ delete(baseDir);
+ } catch (final Exception e) {
+ // Ignore
+ }
+ }
+ }
+
+ private static byte[] serializeValueWithHeaders(final byte[] value, final
Headers headers) {
+ if (value == null) {
+ return null;
+ }
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
+ final byte[] rawHeaders = HeadersSerializer
+ .serialize(preSerializedHeaders,
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders))
+ .array();
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+ ByteUtils.writeVarint(rawHeaders.length, out);
+ out.write(rawHeaders);
+ out.write(value);
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to serialize value with
headers", e);
+ }
+ }
+
+ @Test
+ public void shouldMigrateFromWithoutHeadersToWithHeaders() {
+ final RocksDbTimeOrderedSessionBytesStoreSupplier oldSupplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, true, false);
+
+ final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+ final Bytes key2 = Bytes.wrap("key2".getBytes());
+ final Bytes key3 = Bytes.wrap("key3".getBytes());
+
+ oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)),
"value1".getBytes());
+ oldStore.put(new Windowed<>(key2, new SessionWindow(150, 250)),
"value2".getBytes());
+ oldStore.put(new Windowed<>(key3, new SessionWindow(200, 300)),
"value3".getBytes());
+
+ // Verify old data
+ assertEquals("value1", new String(oldStore.fetchSession(key1, 100,
200)));
+ assertEquals("value2", new String(oldStore.fetchSession(key2, 150,
250)));
+ assertEquals("value3", new String(oldStore.fetchSession(key3, 200,
300)));
+
+ oldStore.close();
+
+ // Reopen with headers
+ final RocksDbTimeOrderedSessionBytesStoreSupplier newSupplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, true, true);
+
+ final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify old data readable with empty headers via lazy migration
+ byte[] fetch = newStore.fetchSession(key1, 100, 200);
+ assertNotNull(fetch);
+ assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+ assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
+
+ fetch = newStore.fetchSession(key2, 150, 250);
+ assertNotNull(fetch);
+ assertEquals("value2", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+ assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
+
+ fetch = newStore.fetchSession(key3, 200, 300);
+ assertNotNull(fetch);
+ assertEquals("value3", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+ assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
+
+ newStore.close();
+ }
+
+ @Test
+ public void shouldMigrateFromWithIndexToWithIndexAndHeaders() {
+ final RocksDbTimeOrderedSessionBytesStoreSupplier oldSupplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, true, false);
+
+ final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+ oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)),
"value1".getBytes());
+ oldStore.close();
+
+ // Upgrade to headers
+ final RocksDbTimeOrderedSessionBytesStoreSupplier newSupplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, true, true);
+
+ final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify old data still accessible
+ final byte[] fetch = newStore.fetchSession(key1, 100, 200);
+ assertNotNull(fetch);
+ assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+
+ newStore.close();
+ }
+
+ @Test
+ public void shouldMigrateFromWithoutIndexToWithIndexAndHeaders() {
+ final RocksDbTimeOrderedSessionBytesStoreSupplier oldSupplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, false, false);
+
+ final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+ oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)),
"value1".getBytes());
+ oldStore.close();
+
+ // Upgrade to both index and headers
+ final RocksDbTimeOrderedSessionBytesStoreSupplier newSupplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, true, true);
+
+ final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify old data still accessible
+ final byte[] fetch = newStore.fetchSession(key1, 100, 200);
+ assertNotNull(fetch);
+ assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+
+ newStore.close();
+ }
+
+ @Test
+ public void shouldWriteAndReadWithHeaders() {
+ // Start fresh with headers
+ final RocksDbTimeOrderedSessionBytesStoreSupplier supplier =
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ STORE_NAME, RETENTION_MS, true, true);
+
+ final SessionStore<Bytes, byte[]> store = supplier.get();
+ store.init(context, store);
+
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+ final Bytes key2 = Bytes.wrap("key2".getBytes());
+
+ // Write with empty headers
+ store.put(new Windowed<>(key1, new SessionWindow(100, 200)),
+ serializeValueWithHeaders("value1".getBytes(), new
RecordHeaders()));
+
+ // Write with actual headers
+ final RecordHeaders headersWithData = new RecordHeaders();
+ headersWithData.add("header-key-1", "header-value-1".getBytes());
+ headersWithData.add("header-key-2", "header-value-2".getBytes());
+ store.put(new Windowed<>(key2, new SessionWindow(150, 250)),
+ serializeValueWithHeaders("value2".getBytes(), headersWithData));
+
+ // Verify values
+ assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key1,
100, 200))));
+ assertEquals("value2", new
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key2,
150, 250))));
+
+ // Verify headers for key1 (empty)
+ final Headers key1Headers =
AggregationWithHeadersDeserializer.headers(store.fetchSession(key1, 100, 200));
+ assertEquals(0, key1Headers.toArray().length);
+
+ // Verify headers for key2 (with data)
+ final Headers key2Headers =
AggregationWithHeadersDeserializer.headers(store.fetchSession(key2, 150, 250));
+ assertEquals(2, key2Headers.toArray().length);
+ assertEquals("header-value-1", new
String(key2Headers.lastHeader("header-key-1").value()));
+ assertEquals("header-value-2", new
String(key2Headers.lastHeader("header-key-2").value()));
+
+ // Verify findSessions works
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iter =
store.findSessions(100, 250)) {
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ assertEquals(2, count);
+ }
+
+ store.close();
+ }
+
+ // --- DSL-level supplier resolution tests ---
+
+ /**
+ * Uses {@link
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers#sessionStore(DslSessionParams)}
+ * to create stores — the same code path the DSL materializer uses when
+ * {@code ON_WINDOW_CLOSE} is set. Validates the upgrade from PLAIN to
HEADERS format.
+ */
+ @Test
+ public void shouldMigrateViaDslSupplierPath() {
+ final BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers dslSuppliers =
+ new BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers();
+
+ // Phase 1: create store via DSL supplier with PLAIN format
+ final SessionBytesStoreSupplier oldSupplier =
dslSuppliers.sessionStore(
+ new DslSessionParams(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+ EmitStrategy.onWindowClose(), DslStoreFormat.PLAIN));
+
+ final SessionStore<Bytes, byte[]> oldStore = oldSupplier.get();
+ oldStore.init(context, oldStore);
+
+ final Bytes key1 = Bytes.wrap("key1".getBytes());
+ oldStore.put(new Windowed<>(key1, new SessionWindow(100, 200)),
"value1".getBytes());
+
+ assertEquals("value1", new String(oldStore.fetchSession(key1, 100,
200)));
+ oldStore.close();
+
+ // Phase 2: create store via DSL supplier with HEADERS format
+ final SessionBytesStoreSupplier newSupplier =
dslSuppliers.sessionStore(
+ new DslSessionParams(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+ EmitStrategy.onWindowClose(), DslStoreFormat.HEADERS));
+
+ final SessionStore<Bytes, byte[]> newStore = newSupplier.get();
+ newStore.init(context, newStore);
+
+ // Verify old data readable with empty headers via lazy migration
+ final byte[] fetch = newStore.fetchSession(key1, 100, 200);
+ assertNotNull(fetch, "Old data should be readable after upgrade via
DSL supplier path");
+ assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+ assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length,
+ "Old data should have empty headers after migration");
+
+ newStore.close();
+ }
+
+ // --- DSL end-to-end tests via TopologyTestDriver ---
+
+ private static final String INPUT_TOPIC = "input-topic";
+ private static final String MATERIALIZED_STORE = "session-count-store";
+
+ static Stream<Arguments> dslStoreFormats() {
+ return Stream.of(
+ Arguments.of(false),
+ Arguments.of(true)
+ );
+ }
+
+ /**
+ * Builds a real DSL session aggregation topology with {@code
ON_WINDOW_CLOSE} and verifies
+ * the full chain from DSL → materializer → supplier → store works
correctly with both
+ * PLAIN and HEADERS formats.
+ */
+ @ParameterizedTest
+ @MethodSource("dslStoreFormats")
+ public void shouldAggregateSessionsViaDslWithOnWindowClose(final boolean
withHeaders) {
+ final Properties dslProps =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ if (withHeaders) {
+ dslProps.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+ }
+
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
+ .emitStrategy(EmitStrategy.onWindowClose())
+ .count(Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as(MATERIALIZED_STORE)
+ .withRetention(Duration.ofMinutes(5)));
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(streamsBuilder.build(), dslProps)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(INPUT_TOPIC, new StringSerializer(),
new StringSerializer());
+
+ // Send records within the same session window (gap = 500ms)
+ inputTopic.pipeInput("A", "v1", 100);
+ inputTopic.pipeInput("A", "v2", 200);
+ inputTopic.pipeInput("B", "v3", 150);
+
+ // Advance time past the session gap to close windows
+ inputTopic.pipeInput("A", "v4", 2000);
+
+ // Query the materialized store
+ final SessionStore<String, Long> store =
driver.getSessionStore(MATERIALIZED_STORE);
+ assertNotNull(store, "Session store should be materialized");
+
+ final List<KeyValue<Windowed<String>, Long>> results =
toList(store.fetch("A", "B"));
+ // Expect: A's session [100,200] with count=2, B's session
[150,150] with count=1
+ // A's new session [2000,2000] may or may not be closed yet
+ boolean foundASession = false;
+ boolean foundBSession = false;
+ for (final KeyValue<Windowed<String>, Long> kv : results) {
+ if (kv.key.key().equals("A") && kv.key.window().start() == 100
&& kv.key.window().end() == 200) {
+ assertEquals(2L, kv.value);
+ foundASession = true;
+ }
+ if (kv.key.key().equals("B") && kv.key.window().start() == 150
&& kv.key.window().end() == 150) {
+ assertEquals(1L, kv.value);
+ foundBSession = true;
+ }
+ }
+ assertTrue(foundASession, "Should find A's session [100,200]");
+ assertTrue(foundBSession, "Should find B's session [150,150]");
+ }
+ }
+
+ private static <K, V> List<KeyValue<K, V>> toList(final
KeyValueIterator<K, V> iterator) {
+ final List<KeyValue<K, V>> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ iterator.close();
+ return result;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 42ab3946a0e..4b77ce64439 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -104,7 +104,7 @@ public class TimeOrderedWindowStoreTest {
private InternalMockProcessorContext<?, ?> context;
private RocksDBTimeOrderedWindowSegmentedBytesStore<KeyValueSegment>
bytesStore;
- private RocksDBTimeOrderedWindowStore underlyingStore;
+ private RocksDBTimeOrderedWindowStore<?> underlyingStore;
private TimeOrderedCachingWindowStore cachingStore;
private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
private ThreadCache cache;
@@ -114,7 +114,7 @@ public class TimeOrderedWindowStoreTest {
baseKeySchema = new TimeFirstWindowKeySchema();
bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore<>("test",
100, hasIndex,
new KeyValueSegments("test", "metrics-scope", 100,
SEGMENT_INTERVAL));
- underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false,
WINDOW_SIZE);
+ underlyingStore = new RocksDBTimeOrderedWindowStore<>(bytesStore,
false, WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);
cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new
StringDeserializer());
@@ -135,7 +135,7 @@ public class TimeOrderedWindowStoreTest {
@ValueSource(booleans = {true, false})
public void shouldDelegateInit(final boolean hasIndex) {
setUp(hasIndex);
- final RocksDBTimeOrderedWindowStore inner =
mock(RocksDBTimeOrderedWindowStore.class);
+ final RocksDBTimeOrderedWindowStore<?> inner =
mock(RocksDBTimeOrderedWindowStore.class);
when(inner.hasIndex()).thenReturn(hasIndex);
final TimeOrderedCachingWindowStore outer = new
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
@@ -156,7 +156,7 @@ public class TimeOrderedWindowStoreTest {
assertThat(e.getMessage(),
containsString("TimeOrderedCachingWindowStore only supports
RocksDBTimeOrderedWindowStore backed store"));
- final RocksDBTimeOrderedWindowStore inner =
mock(RocksDBTimeOrderedWindowStore.class);
+ final RocksDBTimeOrderedWindowStore<?> inner =
mock(RocksDBTimeOrderedWindowStore.class);
// Nothing happens
new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE,
SEGMENT_INTERVAL);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index 8ddab06aac3..c403614e2b7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -58,7 +58,7 @@ public class TimestampedWindowStoreBuilderTest {
@Mock
private RocksDBTimestampedWindowStore timestampedStore;
@Mock
- private RocksDBTimeOrderedWindowStore timeOrderedStore;
+ private RocksDBTimeOrderedWindowStore<?> timeOrderedStore;
private TimestampedWindowStoreBuilder<String, String> builder;
private boolean isTimeOrderedStore;
private WindowStore inner;
@@ -199,7 +199,6 @@ public class TimestampedWindowStoreBuilderTest {
}
@ValueSource(strings = {TIMESTAMP_STORE_NAME, TIMEORDERED_STORE_NAME})
- @SuppressWarnings("unchecked")
@ParameterizedTest
public void shouldDisableCachingWithRetainDuplicates(final String
storeName) {
setUpWithoutInner(storeName);