This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b8549f3c4c KAFKA-20194: Ensure backward compatibility for Sliding 
Windows (#21949)
7b8549f3c4c is described below

commit 7b8549f3c4cc26fd2153ef024c2fb743cfe83461
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Apr 6 15:06:01 2026 -0700

    KAFKA-20194: Ensure backward compatibility for Sliding Windows (#21949)
    
    By default, the DSL should expose it's state-stores as ts-stores, as
    long as header format is not enabled; otherwise, it would be a backward
    incompatible change.
    
    This PR ensures that the builders are creating the correct state stores,
    depending on the format, and we insert an adaptor to allow the DSL
    Processors to work only against headers-store interface.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../integration/PapiDslIntegrationTest.java        |  59 +++++++++
 .../internals/KStreamSlidingWindowAggregate.java   |  24 +++-
 .../internals/SlidingWindowStoreMaterializer.java  |  14 +-
 .../SlidingWindowStoreMaterializerTest.java        | 145 ++++++++++++++++-----
 .../internals/WindowStoreMaterializerTest.java     |   2 +-
 5 files changed, 206 insertions(+), 38 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
index 60446ded82e..48d9c597ee4 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -432,6 +433,64 @@ public class PapiDslIntegrationTest {
         );
     }
 
+    @Test
+    public void 
processorShouldAccessKStreamSlidingReducedKTableStoreAsTimestampedStore() {
+        verifyWindow(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+            .reduce(
+                (value, aggregate) -> value,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamSlidingReducedOnWindowCloseKTableStoreAsTimestampedStore()
 {
+        verifyWindow(builder
+                .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey()
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+                .emitStrategy(EmitStrategy.onWindowClose())
+                .reduce(
+                    (value, aggregate) -> value,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+                ),
+            true
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamSlidingAggregatedKTableStoreAsTimestampedStore() {
+        verifyWindow(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamSlidingAggregatedOnWindowCloseKTableStoreAsTimestampedStore()
 {
+        verifyWindow(builder
+                .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey()
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
+                .emitStrategy(EmitStrategy.onWindowClose())
+                .aggregate(
+                    () -> "",
+                    (key, value, aggregate) -> value,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+                ),
+            true
+        );
+    }
+
     @Test
     public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index 1e187454a2c..aa6d718802a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
@@ -36,6 +38,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import 
org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -484,7 +487,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
 
     @Override
     public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
-        return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() {
+        return new KTableValueGetterSupplier<>() {
 
             public KTableValueGetter<Windowed<KIn>, VAgg> get() {
                 return new KStreamWindowAggregateValueGetter();
@@ -502,7 +505,21 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
 
         @Override
         public void init(final ProcessorContext<?, ?> context) {
-            windowStore = context.getStateStore(storeName);
+            try {
+                windowStore = new 
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
+            } catch (final ClassCastException swallow) {
+                // not timestamped store
+
+                // Try headers-aware timestamped store
+                try {
+                    windowStore = context.getStateStore(storeName);
+                } catch (final ClassCastException fatal) {
+                    final StateStore store = context.getStateStore(storeName);
+                    final String storeType = store == null ? "null" : 
store.getClass().getName();
+                    throw new InvalidStateStoreException("Sliding-KTable state 
store must implement either "
+                        + "TimestampedWindowStore, or 
TimestampedWindowStoreWithHeaders. Got: " + storeType);
+                }
+            }
         }
 
         @Override
@@ -511,9 +528,6 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
             return windowStore.fetch(key, windowedKey.window().start());
         }
 
-        @Override
-        public void close() {}
-
         @Override
         public boolean isVersioned() {
             return false;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
index d9a0f3d4e50..ad5607215fd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -21,9 +21,9 @@ import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.state.DslWindowParams;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
@@ -71,12 +71,20 @@ public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFacto
             ))
             : (WindowBytesStoreSupplier) materialized.storeSupplier();
 
-        final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder = 
Stores
-            .timestampedWindowStoreWithHeadersBuilder(
+        final StoreBuilder<?> builder;
+        if (supplier instanceof HeadersBytesStoreSupplier) {
+            builder = Stores.timestampedWindowStoreWithHeadersBuilder(
                 supplier,
                 materialized.keySerde(),
                 materialized.valueSerde()
             );
+        } else {
+            builder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+            );
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
index 1f44a9cd71b..f0404b00add 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SlidingWindowStoreMaterializerTest.java
@@ -28,12 +28,16 @@ import 
org.apache.kafka.streams.kstream.internals.SlidingWindowStoreMaterializer
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.CachingWindowStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
 import 
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
@@ -53,7 +57,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -66,8 +69,6 @@ public class SlidingWindowStoreMaterializerTest {
     @Mock
     private InternalNameProvider nameProvider;
     @Mock
-    private WindowBytesStoreSupplier windowStoreSupplier;
-    @Mock
     private StreamsConfig streamsConfig;
 
     private final WindowStore<Bytes, byte[]> innerWindowStore =
@@ -93,69 +94,102 @@ public class SlidingWindowStoreMaterializerTest {
                 
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
     }
 
-    private void mockWindowStoreSupplier() {
-        when(windowStoreSupplier.get()).thenReturn(innerWindowStore);
-        when(windowStoreSupplier.name()).thenReturn(STORE_NAME);
-        when(windowStoreSupplier.metricsScope()).thenReturn("metricScope");
+    private final class HeadersStoreSupplier implements 
WindowBytesStoreSupplier, HeadersBytesStoreSupplier {
+        @Override
+        public String name() {
+            return STORE_NAME;
+        }
+
+        @Override
+        public WindowStore<Bytes, byte[]> get() {
+            return innerWindowStore;
+        }
+
+        @Override
+        public String metricsScope() {
+            return "metricScope";
+        }
+
+        @Override
+        public long segmentIntervalMs() {
+            return 0;
+        }
+
+        @Override
+        public long windowSize() {
+            return 0;
+        }
+
+        @Override
+        public boolean retainDuplicates() {
+            return false;
+        }
+
+        @Override
+        public long retentionPeriod() {
+            return 0;
+        }
     }
 
     @Test
-    public void 
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
+    public void 
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
         final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore logging = caching.wrapped();
 
-        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
     }
 
     @Test
-    public void shouldCreateHeadersBuilderWithCachingDisabled() {
+    public void shouldCreateTimestampedBuilderWithCachingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
 
         final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
     }
 
     @Test
-    public void shouldCreateHeadersBuilderWithLoggingDisabled() {
+    public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
 
         final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStore);
     }
 
     @Test
-    public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
+    public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
         );
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
         assertFalse(wrapped instanceof CachingWindowStore);
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as(windowStoreSupplier), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.as(new 
HeadersStoreSupplier()), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
@@ -169,53 +203,82 @@ public class SlidingWindowStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withCachingDisabled(), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertEquals(innerWindowStore.name(), store.name());
         
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withLoggingDisabled(), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertInstanceOf(CachingWindowStore.class, caching);
         assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertFalse(wrapped instanceof CachingWindowStore);
         assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void shouldCreateHeadersStoreWithOnWindowClose() {
+    public void shouldCreateTimestampedStoreWithOnWindowCloseByDefault() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
                 .withCachingDisabled(), nameProvider, STORE_PREFIX);
 
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+    }
+
+    @Test
+    public void 
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingDisabled() {
+        emitStrategy = EmitStrategy.onWindowClose();
+
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
+            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
+
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+    }
+
+    @Test
+    public void shouldCreateHeadersStoreWithProvidedSupplierOnWindowClose() {
+        emitStrategy = EmitStrategy.onWindowClose();
+
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier())
+                .withCachingDisabled(), nameProvider, STORE_PREFIX);
+
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
@@ -223,6 +286,30 @@ public class SlidingWindowStoreMaterializerTest {
         
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
+    @Test
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierOnWindowCloseAndCachingDisabled() {
+        emitStrategy = EmitStrategy.onWindowClose();
+
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
+            new MaterializedInternal<>(Materialized.as(new 
HeadersStoreSupplier()),
+                nameProvider, STORE_PREFIX);
+
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
+    }
+
+    @SuppressWarnings("unchecked")
+    private TimestampedWindowStore<String, String> getTimestampedStore(
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized) {
+        final SlidingWindowStoreMaterializer<String, String> materializer =
+            new SlidingWindowStoreMaterializer<>(materialized, windows, 
emitStrategy);
+        materializer.configure(streamsConfig);
+        return (TimestampedWindowStore<String, String>) 
materializer.builder().build();
+    }
+
     @SuppressWarnings("unchecked")
     private TimestampedWindowStoreWithHeaders<String, String> getHeadersStore(
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index cd88bc6be75..041f9f529c6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -240,7 +240,7 @@ public class WindowStoreMaterializerTest {
     }
 
     @Test
-    public void shouldCreateTimestampedStoreWithOnWindowCloseByDefault() {
+    public void 
shouldCreateTimestampedStoreWithOnWindowCloseByDefaultWithCachingDisabled() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =

Reply via email to