This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 686fbedc96c KAFKA-20194: Ensure backward compatibility for Windowed
Store (#21922)
686fbedc96c is described below
commit 686fbedc96c258be8b52e60c9c9df2bbd52c7d85
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Apr 2 14:42:55 2026 -0700
KAFKA-20194: Ensure backward compatibility for Windowed Store (#21922)
By default, the DSL should expose it's state-stores as ts-stores, as
long as header format is not enabled; otherwise, it would be a backward
incompatible change.
This PR ensures that the builders are creating the correct state stores,
depending on the format, and we insert an adaptor to allow the DSL
Processors to work only against headers-store interface.
Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedi
<[email protected]>
---
.../integration/PapiDslIntegrationTest.java | 102 +++++++-
...bstractKStreamTimeWindowAggregateProcessor.java | 26 ++-
.../kstream/internals/KStreamWindowAggregate.java | 19 +-
.../kstream/internals/WindowStoreMaterializer.java | 46 ++--
.../streams/state/BuiltInDslStoreSuppliers.java | 17 +-
.../streams/state/HeadersBytesStoreSupplier.java | 3 +-
...IndexedTimeOrderedWindowBytesStoreSupplier.java | 19 +-
...OrderedWindowBytesStoreWithHeadersSupplier.java | 100 ++++++++
...dersStoreToWindowedTimestampedStoreAdapter.java | 256 +++++++++++++++++++++
.../internals/KStreamWindowAggregateTest.java | 1 -
.../internals/KeyValueStoreMaterializerTest.java | 36 +--
.../internals/WindowStoreMaterializerTest.java | 150 ++++++++----
...xedTimeOrderedWindowBytesStoreSupplierTest.java | 14 +-
...imeOrderedCachingPersistentWindowStoreTest.java | 3 +-
.../internals/TimeOrderedWindowStoreTest.java | 3 +-
15 files changed, 689 insertions(+), 106 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
index 891f1d527fd..da6be9afff8 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -27,12 +27,15 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Record;
@@ -41,12 +44,15 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -107,7 +113,7 @@ public class PapiDslIntegrationTest {
public void
processorShouldAccessTransformedKTableStoreAsTimestampedStore() {
verify(builder
.table("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
- .transformValues(() -> new ValueTransformerWithKey<String, String,
String>() {
+ .transformValues(() -> new ValueTransformerWithKey<>() {
@Override
public void init(final ProcessorContext context) { }
@@ -228,6 +234,100 @@ public class PapiDslIntegrationTest {
);
}
+ private void verifyWindow(final KTable<Windowed<String>, String> table) {
+ verifyWindow(table, false);
+ }
+
+ private void verifyWindow(final KTable<Windowed<String>, String> table,
final boolean requiresFlush) {
+ table.toStream((windowedKey, value) -> windowedKey.key())
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedWindowStore<String, String> store =
context().getStateStore("table-store");
+
+ try (final KeyValueIterator<Windowed<String>,
ValueAndTimestamp<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueAndTimestamp<String>> row = it.next();
+ context().forward(new Record<>(row.key.key(),
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ if (requiresFlush) {
+ inputTopic.advanceTime(Duration.ofHours(2));
+ inputTopic.pipeInput("flush", "flush");
+ }
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamWindowReducedKTableStoreAsTimestampedStore() {
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+ .reduce(
+ (value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamWindowReducedOnWindowCloseKTableStoreAsTimestampedStore()
{
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+ .emitStrategy(EmitStrategy.onWindowClose())
+ .reduce(
+ (value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ ),
+ true
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamWindowAggregatedKTableStoreAsTimestampedStore() {
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamWindowAggregatedOnWindowCloseKTableStoreAsTimestampedStore()
{
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+ .emitStrategy(EmitStrategy.onWindowClose())
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ ),
+ true
+ );
+ }
+
@Test
public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index dd379003f7f..381b66dccd7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -21,11 +21,13 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
@@ -35,6 +37,7 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import
org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
import org.slf4j.Logger;
@@ -78,7 +81,26 @@ public abstract class
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
droppedRecordsSensor = droppedRecordsSensor(threadId,
context.taskId().toString(), metrics);
emittedRecordsSensor = emittedRecordsSensor(threadId,
context.taskId().toString(), processorName, metrics);
emitFinalLatencySensor = emitFinalLatencySensor(threadId,
context.taskId().toString(), processorName, metrics);
- windowStore = context.getStateStore(storeName);
+
+ boolean isHeadersStore = false;
+ // Try timestamped store
+ try {
+ windowStore = new
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
+ } catch (final ClassCastException swallow) {
+ // not timestamped store
+
+ // Try headers-aware timestamped store
+ try {
+ windowStore = context.getStateStore(storeName);
+ isHeadersStore = true;
+ } catch (final ClassCastException fatal) {
+ final StateStore store = context.getStateStore(storeName);
+ final String storeType = store == null ? "null" :
store.getClass().getName();
+ throw new InvalidStateStoreException("Windowed-KTable state
store must implement either "
+ + "TimestampedWindowStore, or
TimestampedWindowStoreWithHeaders. Got: " + storeType);
+ }
+ }
+
if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
// Restore last emit close time for ON_WINDOW_CLOSE strategy
@@ -98,7 +120,7 @@ public abstract class
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
tupleForwarder = new TimestampedTupleForwarder<>(
windowStore,
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ isHeadersStore ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 4470bff214c..9f5c0269624 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
@@ -26,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
@@ -35,6 +37,7 @@ import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrapping
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import
org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -215,7 +218,21 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W
extends Window> implements
@Override
public void init(final ProcessorContext<?, ?> context) {
- windowStore = context.getStateStore(storeName);
+ try {
+ windowStore = new
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
+ } catch (final ClassCastException swallow) {
+ // not timestamped store
+
+ // Try headers-aware timestamped store
+ try {
+ windowStore = context.getStateStore(storeName);
+ } catch (final ClassCastException fatal) {
+ final StateStore store = context.getStateStore(storeName);
+ final String storeType = store == null ? "null" :
store.getClass().getName();
+ throw new InvalidStateStoreException("Windowed-KTable
state store must implement either "
+ + "TimestampedWindowStore, or
TimestampedWindowStoreWithHeaders. Got: " + storeType);
+ }
+ }
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
index c15c4e8c804..d0df1de7fcc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -21,9 +21,9 @@ import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.DslWindowParams;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
@@ -57,23 +57,33 @@ public class WindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V
@Override
public StoreBuilder<?> builder() {
- final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
- ? dslStoreSuppliers().windowStore(new DslWindowParams(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false,
- emitStrategy,
- false,
- dslStoreFormat()
- ))
- : (WindowBytesStoreSupplier) materialized.storeSupplier();
-
- final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder =
Stores.timestampedWindowStoreWithHeadersBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
+ final WindowBytesStoreSupplier supplier =
+ materialized.storeSupplier() == null
+ ? dslStoreSuppliers().windowStore(new DslWindowParams(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.size()),
+ false,
+ emitStrategy,
+ false,
+ dslStoreFormat()
+ ))
+ : (WindowBytesStoreSupplier) materialized.storeSupplier();
+
+ final StoreBuilder<?> builder;
+ if (supplier instanceof HeadersBytesStoreSupplier) {
+ builder = Stores.timestampedWindowStoreWithHeadersBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
+ } else {
+ builder = Stores.timestampedWindowStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
+ }
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index 3b23b9e1f6e..b74fce37ea0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.EmitStrategy;
import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
/**
@@ -56,13 +57,23 @@ public class BuiltInDslStoreSuppliers {
final DslStoreFormat storeFormat = params.dslStoreFormat();
if (params.emitStrategy().type() ==
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
final boolean withHeaders = (storeFormat ==
DslStoreFormat.HEADERS);
- return
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+ if (!withHeaders) {
+ return
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
params.name(),
params.retentionPeriod(),
params.windowSize(),
params.retainDuplicates(),
- params.isSlidingWindow(),
- withHeaders);
+ params.isSlidingWindow()
+ );
+ } else {
+ return
RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.create(
+ params.name(),
+ params.retentionPeriod(),
+ params.windowSize(),
+ params.retainDuplicates(),
+ params.isSlidingWindow()
+ );
+ }
}
final DslStoreFormat format = (storeFormat == null) ?
DslStoreFormat.TIMESTAMPED : storeFormat;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
index f27c1711b0a..411f4bf787d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
@@ -20,5 +20,4 @@ package org.apache.kafka.streams.state;
* A store supplier that can be used to create one or more "header" stores,
* specifically, {@link HeadersBytesStore} instances.
*/
-public interface HeadersBytesStoreSupplier extends KeyValueBytesStoreSupplier {
-}
\ No newline at end of file
+public interface HeadersBytesStoreSupplier { }
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index 4e372242f8d..eca24e0f737 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -33,20 +33,19 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
INDEXED_WINDOW_STORE_WITH_HEADERS
}
- private final String name;
- private final long retentionPeriod;
- private final long segmentInterval;
- private final long windowSize;
- private final boolean retainDuplicates;
- private final WindowStoreTypes windowStoreType;
+ final String name;
+ final long retentionPeriod;
+ final long segmentInterval;
+ final long windowSize;
+ final boolean retainDuplicates;
+ final WindowStoreTypes windowStoreType;
public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(
final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
- final boolean hasIndex,
- final boolean withHeaders
+ final boolean hasIndex
) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@@ -75,7 +74,7 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
windowSizeMs,
retainDuplicates,
hasIndex,
- withHeaders
+ false
);
}
@@ -109,7 +108,7 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
}
}
- private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
final String name,
final long retentionPeriod,
final long segmentInterval,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.java
new file mode 100644
index 00000000000..e0bc7704e76
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+
+public class RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier
+ extends RocksDbIndexedTimeOrderedWindowBytesStoreSupplier
+ implements HeadersBytesStoreSupplier {
+
+ public static RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier
create(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates,
+ final boolean hasIndex
+ ) {
+ Objects.requireNonNull(name, "name cannot be null");
+ final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+ final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
+ final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize,
"windowSize");
+ final long windowSizeMs = validateMillisecondDuration(windowSize,
wsMsgPrefix);
+
+ final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+ if (retentionMs < 0L) {
+ throw new IllegalArgumentException("retentionPeriod cannot be
negative");
+ }
+ if (windowSizeMs < 0L) {
+ throw new IllegalArgumentException("windowSize cannot be
negative");
+ }
+ if (windowSizeMs > retentionMs) {
+ throw new IllegalArgumentException("The retention period of the
window store "
+ + name + " must be no smaller than its window size. Got size=["
+ + windowSizeMs + "], retention=[" + retentionMs + "]");
+ }
+
+ return new
RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier(
+ name,
+ retentionMs,
+ defaultSegmentInterval,
+ windowSizeMs,
+ retainDuplicates,
+ hasIndex,
+ true
+ );
+ }
+
+ private RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier(
+ final String name,
+ final long retentionPeriod,
+ final long segmentInterval,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final boolean withIndex,
+ final boolean withHeaders
+ ) {
+ super(
+ name,
+ retentionPeriod,
+ segmentInterval,
+ windowSize,
+ retainDuplicates,
+ withIndex,
+ withHeaders
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier{"
+
+ "name='" + name + '\'' +
+ ", retentionPeriod=" + retentionPeriod +
+ ", segmentInterval=" + segmentInterval +
+ ", windowSize=" + windowSize +
+ ", retainDuplicates=" + retainDuplicates +
+ ", windowStoreType=" + windowStoreType +
+ '}';
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter.java
new file mode 100644
index 00000000000..ffc25839278
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.Map;
+
+/**
+ * Adaptor store for the Kafka Streams DSL to bridge between "headers" store
and "ts-store".
+ *
+ * <p> With KIP-1285 we did rewrite the DLS Processor code to work against
"header store" interface to allow users
+ * to plugin "header stores", but by default the underlying store is still a
"ts-store". To avoid "if-then-else"
+ * code across the entire DSL Processor code base, we use this adaptor to wrap
a "ts-store" and make it look like
+ * a "header store".
+ *
+ * <p> On any write operation, provided {@link
org.apache.kafka.common.header.Headers} will just be dropped,
+ * and {@link ValueTimestampHeaders} type is translated into {@link
ValueAndTimestamp} type. Similarly for
+ * any read operation, the underlying {@link ValueAndTimestamp} type is
translated into a {@link ValueTimestampHeaders}
+ * type with an empty {@link org.apache.kafka.common.header.Headers} object.
+ */
+public class
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<K, V>
+ extends WrappedStateStore<TimestampedWindowStore<K, V>, K, V>
+ implements TimestampedWindowStoreWithHeaders<K, V> {
+
+ public
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter(final
TimestampedWindowStore<K, V> timestampedWindowStore) {
+ super(timestampedWindowStore);
+ }
+
+ @Override
+ public String name() {
+ return wrapped().name();
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ wrapped().init(stateStoreContext, root);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ wrapped().flush();
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ wrapped().commit(changelogOffsets);
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return wrapped().committedOffset(partition);
+ }
+
+ @Override
+ public void close() {
+ wrapped().close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return wrapped().persistent();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return wrapped().managesOffsets();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped().isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
+ return wrapped().query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return wrapped().getPosition();
+ }
+
+ @Override
+ public void put(final K key, final ValueTimestampHeaders<V>
valueTimestampHeaders, final long windowStartTimestamp) {
+ wrapped().put(key, valueTimestampHeaders == null ? null :
ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp()), windowStartTimestamp);
+ }
+
+ @Override
+ public WindowStoreIterator<ValueTimestampHeaders<V>> fetch(final K key,
final long timeFrom, final long timeTo) {
+ return new WindowStoreIteratorAdapter(wrapped().fetch(key, timeFrom,
timeTo));
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> fetch(final K key, final long time) {
+ final ValueAndTimestamp<V> valueAndTimestamp = wrapped().fetch(key,
time);
+ return valueAndTimestamp == null ? null :
ValueTimestampHeaders.make(valueAndTimestamp.value(),
valueAndTimestamp.timestamp(), new RecordHeaders());
+ }
+
+ @Override
+ public WindowStoreIterator<ValueTimestampHeaders<V>> fetch(final K key,
final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new WindowStoreIteratorAdapter(wrapped().fetch(key, timeFrom,
timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<ValueTimestampHeaders<V>> backwardFetch(final K
key, final long timeFrom, final long timeTo) {
+ return new WindowStoreIteratorAdapter(wrapped().backwardFetch(key,
timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<ValueTimestampHeaders<V>> backwardFetch(final K
key, final Instant timeFrom, final Instant timeTo) throws
IllegalArgumentException {
+ return new WindowStoreIteratorAdapter(wrapped().backwardFetch(key,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final
K keyFrom, final K keyTo, final long timeFrom, final long timeTo) {
+ return new KeyValueIteratorAdapter(wrapped().fetch(keyFrom, keyTo,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final
K keyFrom, final K keyTo, final Instant timeFrom, final Instant timeTo) throws
IllegalArgumentException {
+ return new KeyValueIteratorAdapter(wrapped().fetch(keyFrom, keyTo,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetch(final K keyFrom, final K keyTo, final long timeFrom, final long
timeTo) {
+ return new KeyValueIteratorAdapter(wrapped().backwardFetch(keyFrom,
keyTo, timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetch(final K keyFrom, final K keyTo, final Instant timeFrom, final
Instant timeTo) throws IllegalArgumentException {
+ return new KeyValueIteratorAdapter(wrapped().backwardFetch(keyFrom,
keyTo, timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> all() {
+ return new KeyValueIteratorAdapter(wrapped().all());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardAll() {
+ return new KeyValueIteratorAdapter(wrapped().backwardAll());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
fetchAll(final long timeFrom, final long timeTo) {
+ return new KeyValueIteratorAdapter(wrapped().fetchAll(timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
fetchAll(final Instant timeFrom, final Instant timeTo) throws
IllegalArgumentException {
+ return new KeyValueIteratorAdapter(wrapped().fetchAll(timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetchAll(final long timeFrom, final long timeTo) {
+ return new
KeyValueIteratorAdapter(wrapped().backwardFetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetchAll(final Instant timeFrom, final Instant timeTo) throws
IllegalArgumentException {
+ return new
KeyValueIteratorAdapter(wrapped().backwardFetchAll(timeFrom, timeTo));
+ }
+
+ private final class WindowStoreIteratorAdapter implements
WindowStoreIterator<ValueTimestampHeaders<V>> {
+ private final WindowStoreIterator<ValueAndTimestamp<V>> innerIterator;
+
+ private WindowStoreIteratorAdapter(final
WindowStoreIterator<ValueAndTimestamp<V>> innerIterator) {
+ this.innerIterator = innerIterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public Long peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Long, ValueTimestampHeaders<V>> next() {
+ final KeyValue<Long, ValueAndTimestamp<V>> next =
innerIterator.next();
+ return KeyValue.pair(next.key,
ValueTimestampHeaders.make(next.value.value(), next.value.timestamp(), new
RecordHeaders()));
+ }
+ }
+
+ private final class KeyValueIteratorAdapter implements
KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> {
+ private final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>>
innerIterator;
+
+ private KeyValueIteratorAdapter(final KeyValueIterator<Windowed<K>,
ValueAndTimestamp<V>> innerIterator) {
+ this.innerIterator = innerIterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public Windowed<K> peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, ValueTimestampHeaders<V>> next() {
+ final KeyValue<Windowed<K>, ValueAndTimestamp<V>> next =
innerIterator.next();
+ return KeyValue.pair(next.key,
ValueTimestampHeaders.make(next.value.value(), next.value.timestamp(), new
RecordHeaders()));
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index af39d256621..7d4ea159c83 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -1011,7 +1011,6 @@ public class KStreamWindowAggregateTest {
Duration.ofDays(1),
Duration.ofMillis(windowSize),
false,
- false,
false
);
} else {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 6e6e35e7512..f3099d381e8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
@@ -72,8 +73,6 @@ public class KeyValueStoreMaterializerTest {
@Mock
private InternalNameProvider nameProvider;
@Mock
- private HeadersBytesStoreSupplier headersStoreSupplier;
- @Mock
private VersionedBytesStoreSupplier versionedStoreSupplier;
private final KeyValueStore<Bytes, byte[]> innerKeyValueStore = new
InMemoryKeyValueStore(STORE_NAME);
@Mock
@@ -102,10 +101,21 @@ public class KeyValueStoreMaterializerTest {
when(versionedStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
}
- private void mockHeadersStoreSupplier() {
- when(headersStoreSupplier.get()).thenReturn(innerKeyValueStore);
- when(headersStoreSupplier.name()).thenReturn(STORE_NAME);
- when(headersStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ private final class HeadersStoreSupplier implements
KeyValueBytesStoreSupplier, HeadersBytesStoreSupplier {
+ @Override
+ public String name() {
+ return STORE_NAME;
+ }
+
+ @Override
+ public KeyValueStore<Bytes, byte[]> get() {
+ return innerKeyValueStore;
+ }
+
+ @Override
+ public String metricsScope() {
+ return METRICS_SCOPE;
+ }
}
@Test
@@ -162,9 +172,8 @@ public class KeyValueStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
- mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.as(headersStoreSupplier),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.as(new
HeadersStoreSupplier()), nameProvider, STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
@@ -178,9 +187,8 @@ public class KeyValueStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
- mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(headersStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withCachingDisabled(), nameProvider, STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
@@ -191,9 +199,8 @@ public class KeyValueStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
- mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(headersStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withLoggingDisabled(), nameProvider, STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
@@ -205,9 +212,8 @@ public class KeyValueStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
- mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(headersStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
@@ -286,4 +292,4 @@ public class KeyValueStoreMaterializerTest {
materializer.configure(streamsConfig);
return (VersionedKeyValueStore<String, String>)
materializer.builder().build();
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index 1abe4beb6af..cd88bc6be75 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -29,13 +29,18 @@ import
org.apache.kafka.streams.kstream.internals.WindowStoreMaterializer;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslStoreSuppliers;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
+import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStoreWithHeaders;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
import
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
+import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedCachingWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
@@ -55,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -68,8 +72,6 @@ public class WindowStoreMaterializerTest {
@Mock
private InternalNameProvider nameProvider;
@Mock
- private WindowBytesStoreSupplier windowStoreSupplier;
- @Mock
private StreamsConfig streamsConfig;
private final WindowStore<Bytes, byte[]> innerWindowStore =
@@ -95,74 +97,103 @@ public class WindowStoreMaterializerTest {
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
}
- private void mockWindowStoreSupplier() {
- when(windowStoreSupplier.get()).thenReturn(innerWindowStore);
- when(windowStoreSupplier.name()).thenReturn(STORE_NAME);
- when(windowStoreSupplier.metricsScope()).thenReturn("metricScope");
+ private final class HeadersStoreSupplier implements
WindowBytesStoreSupplier, HeadersBytesStoreSupplier {
+ @Override
+ public String name() {
+ return STORE_NAME;
+ }
+
+ @Override
+ public WindowStore<Bytes, byte[]> get() {
+ return innerWindowStore;
+ }
+
+ @Override
+ public String metricsScope() {
+ return "metricScope";
+ }
+
+ @Override
+ public long segmentIntervalMs() {
+ return 0;
+ }
+
+ @Override
+ public long windowSize() {
+ return 0;
+ }
+
+ @Override
+ public boolean retainDuplicates() {
+ return false;
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return 0;
+ }
}
@Test
- public void
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
+ public void
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore logging = caching.wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
assertInstanceOf(CachingWindowStore.class, caching);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
}
@Test
- public void shouldCreateHeadersBuilderWithCachingDisabled() {
+ public void shouldCreateTimestampedBuilderWithCachingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
}
@Test
- public void shouldCreateHeadersBuilderWithLoggingDisabled() {
+ public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
}
@Test
- public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
+ public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+ assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
- mockWindowStoreSupplier();
-
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as(windowStoreSupplier),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.as(new
HeadersStoreSupplier()), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore logging = caching.wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
@@ -172,26 +203,24 @@ public class WindowStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withCachingDisabled(), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withLoggingDisabled(), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(CachingWindowStore.class, caching);
assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
@@ -199,46 +228,83 @@ public class WindowStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertFalse(wrapped instanceof CachingWindowStore);
assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateHeadersStoreWithOnWindowClose() {
+ public void shouldCreateTimestampedStoreWithOnWindowCloseByDefault() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
.withCachingDisabled(), nameProvider, STORE_PREFIX);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+ }
+
+ @Test
+ public void
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
+ emitStrategy = EmitStrategy.onWindowClose();
+
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
+ new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
+
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
+ assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
+ }
+
+ @Test
+ public void shouldCreateHeadersStoreWithProvidedSupplierOnWindowClose() {
+ emitStrategy = EmitStrategy.onWindowClose();
+
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier())
+ .withCachingDisabled(), nameProvider, STORE_PREFIX);
+
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierOnWindowCloseAndCachingEnabled() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
+ new
MaterializedInternal<>(Materialized.as(RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.create(
+ STORE_NAME, Duration.ofMillis(60000L),
Duration.ofMillis(WINDOW_SIZE_MS), false, true
+ )), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
}
+ @SuppressWarnings("unchecked")
+ private TimestampedWindowStore<String, String> getTimestampedStore(
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized) {
+ final WindowStoreMaterializer<String, String> materializer =
+ new WindowStoreMaterializer<>(materialized, windows, emitStrategy);
+ materializer.configure(streamsConfig);
+ return (TimestampedWindowStore<String, String>)
materializer.builder().build();
+ }
@SuppressWarnings("unchecked")
private TimestampedWindowStoreWithHeaders<String, String> getHeadersStore(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
index d16df93141d..3c2cd850eb5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
@@ -34,31 +34,31 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
@Test
public void shouldThrowIfStoreNameIsNull() {
- final Exception e = assertThrows(NullPointerException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(null, ZERO, ZERO,
false, false, false));
+ final Exception e = assertThrows(NullPointerException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(null, ZERO, ZERO,
false, false));
assertEquals("name cannot be null", e.getMessage());
}
@Test
public void shouldThrowIfRetentionPeriodIsNegative() {
- final Exception e = assertThrows(IllegalArgumentException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName",
ofMillis(-1L), ZERO, false, false, false));
+ final Exception e = assertThrows(IllegalArgumentException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName",
ofMillis(-1L), ZERO, false, false));
assertEquals("retentionPeriod cannot be negative", e.getMessage());
}
@Test
public void shouldThrowIfWindowSizeIsNegative() {
- final Exception e = assertThrows(IllegalArgumentException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName",
ofMillis(0L), ofMillis(-1L), false, false, false));
+ final Exception e = assertThrows(IllegalArgumentException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName",
ofMillis(0L), ofMillis(-1L), false, false));
assertEquals("windowSize cannot be negative", e.getMessage());
}
@Test
public void shouldThrowIfWindowSizeIsLargerThanRetention() {
- final Exception e = assertThrows(IllegalArgumentException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName",
ofMillis(1L), ofMillis(2L), false, false, false));
+ final Exception e = assertThrows(IllegalArgumentException.class, () ->
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName",
ofMillis(1L), ofMillis(2L), false, false));
assertEquals("The retention period of the window store anyName must be
no smaller than its window size. Got size=[2], retention=[1]", e.getMessage());
}
@Test
public void shouldCreateRocksDbTimeOrderedWindowStoreWithIndex() {
- final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true, false).get();
+ final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true).get();
final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
@@ -67,7 +67,7 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
@Test
public void shouldCreateRocksDbTimeOrderedWindowStoreWithoutIndex() {
- final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, false, false).get();
+ final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, false).get();
final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
@@ -76,7 +76,7 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
@Test
public void shouldCreateRocksDbTimeOrderedWindowStoreWithHeaders() {
- final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L),
ofMillis(1L), false, true, true).get();
+ final WindowStore<?, ?> store =
RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.create("store",
ofMillis(1L), ofMillis(1L), false, true).get();
final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(store,
instanceOf(RocksDBTimeOrderedWindowStoreWithHeaders.class));
assertThat(wrapped,
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index e6bfa0b27c9..8a0c95279b5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -171,8 +171,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
ofHours(1L),
ofMinutes(1),
false,
- hasIndex,
- false
+ hasIndex
), Serdes.String(), Serdes.String())
.withCachingEnabled();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 4b77ce64439..472e55105cf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -173,8 +173,7 @@ public class TimeOrderedWindowStoreTest {
ofHours(1L),
ofMinutes(1),
false,
- hasIndex,
- false
+ hasIndex
), Serdes.String(), Serdes.String())
.withCachingEnabled();