This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new ad7fbefc208 KAFKA-20194: Ensure backward compatibility for Sliding
Windows (#21949)
ad7fbefc208 is described below
commit ad7fbefc2089fa65865d074e8cde7078d73b7920
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Apr 6 15:06:01 2026 -0700
KAFKA-20194: Ensure backward compatibility for Sliding Windows (#21949)
By default, the DSL should expose it's state-stores as ts-stores, as
long as header format is not enabled; otherwise, it would be a backward
incompatible change.
This PR ensures that the builders are creating the correct state stores,
depending on the format, and we insert an adaptor to allow the DSL
Processors to work only against headers-store interface.
Reviewers: Bill Bejeck <[email protected]>
---
.../integration/PapiDslIntegrationTest.java | 59 +++++++++
.../internals/KStreamSlidingWindowAggregate.java | 24 +++-
.../internals/SlidingWindowStoreMaterializer.java | 14 +-
.../SlidingWindowStoreMaterializerTest.java | 145 ++++++++++++++++-----
.../internals/WindowStoreMaterializerTest.java | 2 +-
5 files changed, 206 insertions(+), 38 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
index 60446ded82e..48d9c597ee4 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.Windowed;
@@ -432,6 +433,64 @@ public class PapiDslIntegrationTest {
);
}
+ @Test
+ public void
processorShouldAccessKStreamSlidingReducedKTableStoreAsTimestampedStore() {
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+ .reduce(
+ (value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamSlidingReducedOnWindowCloseKTableStoreAsTimestampedStore()
{
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+ .emitStrategy(EmitStrategy.onWindowClose())
+ .reduce(
+ (value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ ),
+ true
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamSlidingAggregatedKTableStoreAsTimestampedStore() {
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamSlidingAggregatedOnWindowCloseKTableStoreAsTimestampedStore()
{
+ verifyWindow(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+ .emitStrategy(EmitStrategy.onWindowClose())
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ ),
+ true
+ );
+ }
+
@Test
public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index 1e187454a2c..aa6d718802a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
@@ -36,6 +38,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import
org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -484,7 +487,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
@Override
public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
- return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() {
+ return new KTableValueGetterSupplier<>() {
public KTableValueGetter<Windowed<KIn>, VAgg> get() {
return new KStreamWindowAggregateValueGetter();
@@ -502,7 +505,21 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
@Override
public void init(final ProcessorContext<?, ?> context) {
- windowStore = context.getStateStore(storeName);
+ try {
+ windowStore = new
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
+ } catch (final ClassCastException swallow) {
+ // not timestamped store
+
+ // Try headers-aware timestamped store
+ try {
+ windowStore = context.getStateStore(storeName);
+ } catch (final ClassCastException fatal) {
+ final StateStore store = context.getStateStore(storeName);
+ final String storeType = store == null ? "null" :
store.getClass().getName();
+ throw new InvalidStateStoreException("Sliding-KTable state
store must implement either "
+ + "TimestampedWindowStore, or
TimestampedWindowStoreWithHeaders. Got: " + storeType);
+ }
+ }
}
@Override
@@ -511,9 +528,6 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
return windowStore.fetch(key, windowedKey.window().start());
}
- @Override
- public void close() {}
-
@Override
public boolean isVersioned() {
return false;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
index d9a0f3d4e50..ad5607215fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -21,9 +21,9 @@ import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.state.DslWindowParams;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
@@ -71,12 +71,20 @@ public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFacto
))
: (WindowBytesStoreSupplier) materialized.storeSupplier();
- final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder =
Stores
- .timestampedWindowStoreWithHeadersBuilder(
+ final StoreBuilder<?> builder;
+ if (supplier instanceof HeadersBytesStoreSupplier) {
+ builder = Stores.timestampedWindowStoreWithHeadersBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde()
);
+ } else {
+ builder = Stores.timestampedWindowStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
+ }
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
index 1f44a9cd71b..f0404b00add 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
@@ -28,12 +28,16 @@ import
org.apache.kafka.streams.kstream.internals.SlidingWindowStoreMaterializer
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslStoreSuppliers;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
+import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStoreWithHeaders;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
import
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
@@ -53,7 +57,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -66,8 +69,6 @@ public class SlidingWindowStoreMaterializerTest {
@Mock
private InternalNameProvider nameProvider;
@Mock
- private WindowBytesStoreSupplier windowStoreSupplier;
- @Mock
private StreamsConfig streamsConfig;
private final WindowStore<Bytes, byte[]> innerWindowStore =
@@ -93,69 +94,102 @@ public class SlidingWindowStoreMaterializerTest {
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
}
- private void mockWindowStoreSupplier() {
- when(windowStoreSupplier.get()).thenReturn(innerWindowStore);
- when(windowStoreSupplier.name()).thenReturn(STORE_NAME);
- when(windowStoreSupplier.metricsScope()).thenReturn("metricScope");
+ private final class HeadersStoreSupplier implements
WindowBytesStoreSupplier, HeadersBytesStoreSupplier {
+ @Override
+ public String name() {
+ return STORE_NAME;
+ }
+
+ @Override
+ public WindowStore<Bytes, byte[]> get() {
+ return innerWindowStore;
+ }
+
+ @Override
+ public String metricsScope() {
+ return "metricScope";
+ }
+
+ @Override
+ public long segmentIntervalMs() {
+ return 0;
+ }
+
+ @Override
+ public long windowSize() {
+ return 0;
+ }
+
+ @Override
+ public boolean retainDuplicates() {
+ return false;
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return 0;
+ }
}
@Test
- public void
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
+ public void
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore logging = caching.wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
assertInstanceOf(CachingWindowStore.class, caching);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
}
@Test
- public void shouldCreateHeadersBuilderWithCachingDisabled() {
+ public void shouldCreateTimestampedBuilderWithCachingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
}
@Test
- public void shouldCreateHeadersBuilderWithLoggingDisabled() {
+ public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
}
@Test
- public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
+ public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+ assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as(windowStoreSupplier),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.as(new
HeadersStoreSupplier()), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
@@ -169,53 +203,82 @@ public class SlidingWindowStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withCachingDisabled(), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withLoggingDisabled(), nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
- mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier()).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertFalse(wrapped instanceof CachingWindowStore);
assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateHeadersStoreWithOnWindowClose() {
+ public void shouldCreateTimestampedStoreWithOnWindowCloseByDefault() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
.withCachingDisabled(), nameProvider, STORE_PREFIX);
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+ }
+
+ @Test
+ public void
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingDisabled() {
+ emitStrategy = EmitStrategy.onWindowClose();
+
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
+ new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
+
+ final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+ }
+
+ @Test
+ public void shouldCreateHeadersStoreWithProvidedSupplierOnWindowClose() {
+ emitStrategy = EmitStrategy.onWindowClose();
+
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
+ new MaterializedInternal<>(Materialized.<String, String>as(new
HeadersStoreSupplier())
+ .withCachingDisabled(), nameProvider, STORE_PREFIX);
+
final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
@@ -223,6 +286,30 @@ public class SlidingWindowStoreMaterializerTest {
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
+ @Test
+ public void
shouldCreateHeadersStoreWithProvidedSupplierOnWindowCloseAndCachingDisabled() {
+ emitStrategy = EmitStrategy.onWindowClose();
+
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
+ new MaterializedInternal<>(Materialized.as(new
HeadersStoreSupplier()),
+ nameProvider, STORE_PREFIX);
+
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
+ }
+
+ @SuppressWarnings("unchecked")
+ private TimestampedWindowStore<String, String> getTimestampedStore(
+ final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized) {
+ final SlidingWindowStoreMaterializer<String, String> materializer =
+ new SlidingWindowStoreMaterializer<>(materialized, windows,
emitStrategy);
+ materializer.configure(streamsConfig);
+ return (TimestampedWindowStore<String, String>)
materializer.builder().build();
+ }
+
@SuppressWarnings("unchecked")
private TimestampedWindowStoreWithHeaders<String, String> getHeadersStore(
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index cd88bc6be75..041f9f529c6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -240,7 +240,7 @@ public class WindowStoreMaterializerTest {
}
@Test
- public void shouldCreateTimestampedStoreWithOnWindowCloseByDefault() {
+ public void
shouldCreateTimestampedStoreWithOnWindowCloseByDefaultWithCachingDisabled() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =