This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1dacfd29a30 KAFKA-20194: Ensure backward compatibility for Windowed 
Store (#21922)
1dacfd29a30 is described below

commit 1dacfd29a30a19f74e22737ac224a2404c837211
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Apr 2 14:42:55 2026 -0700

    KAFKA-20194: Ensure backward compatibility for Windowed Store (#21922)
    
    By default, the DSL should expose it's state-stores as ts-stores, as
    long as header format is not enabled; otherwise, it would be a backward
    incompatible change.
    
    This PR ensures that the builders are creating the correct state stores,
    depending on the format, and we insert an adaptor to allow the DSL
    Processors to work only against headers-store interface.
    
    Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedi
    <[email protected]>
---
 .../integration/PapiDslIntegrationTest.java        | 102 +++++++-
 ...bstractKStreamTimeWindowAggregateProcessor.java |  26 ++-
 .../kstream/internals/KStreamWindowAggregate.java  |  19 +-
 .../kstream/internals/WindowStoreMaterializer.java |  46 ++--
 .../streams/state/BuiltInDslStoreSuppliers.java    |  17 +-
 .../streams/state/HeadersBytesStoreSupplier.java   |   3 +-
 ...IndexedTimeOrderedWindowBytesStoreSupplier.java |  19 +-
 ...OrderedWindowBytesStoreWithHeadersSupplier.java | 100 ++++++++
 ...dersStoreToWindowedTimestampedStoreAdapter.java | 256 +++++++++++++++++++++
 .../internals/KStreamWindowAggregateTest.java      |   1 -
 .../internals/KeyValueStoreMaterializerTest.java   |  36 +--
 .../internals/WindowStoreMaterializerTest.java     | 150 ++++++++----
 ...xedTimeOrderedWindowBytesStoreSupplierTest.java |  14 +-
 ...imeOrderedCachingPersistentWindowStoreTest.java |   3 +-
 .../internals/TimeOrderedWindowStoreTest.java      |   3 +-
 15 files changed, 689 insertions(+), 106 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
index 891f1d527fd..da6be9afff8 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -27,12 +27,15 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Record;
@@ -41,12 +44,15 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
 
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -107,7 +113,7 @@ public class PapiDslIntegrationTest {
     public void 
processorShouldAccessTransformedKTableStoreAsTimestampedStore() {
         verify(builder
             .table("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
-            .transformValues(() -> new ValueTransformerWithKey<String, String, 
String>() {
+            .transformValues(() -> new ValueTransformerWithKey<>() {
                 @Override
                 public void init(final ProcessorContext context) { }
 
@@ -228,6 +234,100 @@ public class PapiDslIntegrationTest {
         );
     }
 
+    private void verifyWindow(final KTable<Windowed<String>, String> table) {
+        verifyWindow(table, false);
+    }
+
+    private void verifyWindow(final KTable<Windowed<String>, String> table, 
final boolean requiresFlush) {
+        table.toStream((windowedKey, value) -> windowedKey.key())
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedWindowStore<String, String> store = 
context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<Windowed<String>, 
ValueAndTimestamp<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<Windowed<String>, 
ValueAndTimestamp<String>> row = it.next();
+                            context().forward(new Record<>(row.key.key(), 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            if (requiresFlush) {
+                inputTopic.advanceTime(Duration.ofHours(2));
+                inputTopic.pipeInput("flush", "flush");
+            }
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamWindowReducedKTableStoreAsTimestampedStore() {
+        verifyWindow(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+            .reduce(
+                (value, aggregate) -> value,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamWindowReducedOnWindowCloseKTableStoreAsTimestampedStore()
 {
+        verifyWindow(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+            .emitStrategy(EmitStrategy.onWindowClose())
+            .reduce(
+                (value, aggregate) -> value,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            ),
+            true
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamWindowAggregatedKTableStoreAsTimestampedStore() {
+        verifyWindow(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamWindowAggregatedOnWindowCloseKTableStoreAsTimestampedStore()
 {
+        verifyWindow(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L)))
+            .emitStrategy(EmitStrategy.onWindowClose())
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            ),
+            true
+        );
+    }
+
     @Test
     public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index dd379003f7f..381b66dccd7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -21,11 +21,13 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
@@ -35,6 +37,7 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import 
org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
 
 import org.slf4j.Logger;
 
@@ -78,7 +81,26 @@ public abstract class 
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
         droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
         emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), processorName, metrics);
         emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(), processorName, metrics);
-        windowStore = context.getStateStore(storeName);
+
+        boolean isHeadersStore = false;
+        // Try timestamped store
+        try {
+            windowStore = new 
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
+        } catch (final ClassCastException swallow) {
+            // not timestamped store
+
+            // Try headers-aware timestamped store
+            try {
+                windowStore = context.getStateStore(storeName);
+                isHeadersStore = true;
+            } catch (final ClassCastException fatal) {
+                final StateStore store = context.getStateStore(storeName);
+                final String storeType = store == null ? "null" : 
store.getClass().getName();
+                throw new InvalidStateStoreException("Windowed-KTable state 
store must implement either "
+                    + "TimestampedWindowStore, or 
TimestampedWindowStoreWithHeaders. Got: " + storeType);
+            }
+        }
+
 
         if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
             // Restore last emit close time for ON_WINDOW_CLOSE strategy
@@ -98,7 +120,7 @@ public abstract class 
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
             tupleForwarder = new TimestampedTupleForwarder<>(
                 windowStore,
                 context,
-                new TimestampedCacheFlushListenerWithHeaders<>(context),
+                isHeadersStore ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                 sendOldValues);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 4470bff214c..9f5c0269624 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
@@ -26,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
@@ -35,6 +37,7 @@ import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrapping
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import 
org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -215,7 +218,21 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W 
extends Window> implements
 
         @Override
         public void init(final ProcessorContext<?, ?> context) {
-            windowStore = context.getStateStore(storeName);
+            try {
+                windowStore = new 
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
+            } catch (final ClassCastException swallow) {
+                // not timestamped store
+
+                // Try headers-aware timestamped store
+                try {
+                    windowStore = context.getStateStore(storeName);
+                } catch (final ClassCastException fatal) {
+                    final StateStore store = context.getStateStore(storeName);
+                    final String storeType = store == null ? "null" : 
store.getClass().getName();
+                    throw new InvalidStateStoreException("Windowed-KTable 
state store must implement either "
+                        + "TimestampedWindowStore, or 
TimestampedWindowStoreWithHeaders. Got: " + storeType);
+                }
+            }
         }
 
         @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
index c15c4e8c804..d0df1de7fcc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -21,9 +21,9 @@ import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.state.DslWindowParams;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
@@ -57,23 +57,33 @@ public class WindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V
 
     @Override
     public StoreBuilder<?> builder() {
-        final WindowBytesStoreSupplier supplier = materialized.storeSupplier() 
== null
-            ? dslStoreSuppliers().windowStore(new DslWindowParams(
-            materialized.storeName(),
-            Duration.ofMillis(retentionPeriod),
-            Duration.ofMillis(windows.size()),
-            false,
-            emitStrategy,
-            false,
-            dslStoreFormat()
-        ))
-            : (WindowBytesStoreSupplier) materialized.storeSupplier();
-
-        final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder = 
Stores.timestampedWindowStoreWithHeadersBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde()
-        );
+        final WindowBytesStoreSupplier supplier =
+            materialized.storeSupplier() == null
+                ? dslStoreSuppliers().windowStore(new DslWindowParams(
+                    materialized.storeName(),
+                    Duration.ofMillis(retentionPeriod),
+                    Duration.ofMillis(windows.size()),
+                    false,
+                    emitStrategy,
+                    false,
+                    dslStoreFormat()
+                ))
+                : (WindowBytesStoreSupplier) materialized.storeSupplier();
+
+        final StoreBuilder<?>  builder;
+        if (supplier instanceof HeadersBytesStoreSupplier) {
+            builder = Stores.timestampedWindowStoreWithHeadersBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+            );
+        } else {
+            builder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+            );
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index 3b23b9e1f6e..b74fce37ea0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
 
 /**
@@ -56,13 +57,23 @@ public class BuiltInDslStoreSuppliers {
             final DslStoreFormat storeFormat = params.dslStoreFormat();
             if (params.emitStrategy().type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
                 final boolean withHeaders = (storeFormat == 
DslStoreFormat.HEADERS);
-                return 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+                if (!withHeaders) {
+                    return 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
                         params.name(),
                         params.retentionPeriod(),
                         params.windowSize(),
                         params.retainDuplicates(),
-                        params.isSlidingWindow(),
-                        withHeaders);
+                        params.isSlidingWindow()
+                    );
+                } else {
+                    return 
RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.create(
+                        params.name(),
+                        params.retentionPeriod(),
+                        params.windowSize(),
+                        params.retainDuplicates(),
+                        params.isSlidingWindow()
+                    );
+                }
             }
 
             final DslStoreFormat format = (storeFormat == null) ? 
DslStoreFormat.TIMESTAMPED : storeFormat;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
index f27c1711b0a..411f4bf787d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
@@ -20,5 +20,4 @@ package org.apache.kafka.streams.state;
  * A store supplier that can be used to create one or more "header" stores,
  * specifically, {@link HeadersBytesStore} instances.
  */
-public interface HeadersBytesStoreSupplier extends KeyValueBytesStoreSupplier {
-}
\ No newline at end of file
+public interface HeadersBytesStoreSupplier { }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index 4e372242f8d..eca24e0f737 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -33,20 +33,19 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
         INDEXED_WINDOW_STORE_WITH_HEADERS
     }
 
-    private final String name;
-    private final long retentionPeriod;
-    private final long segmentInterval;
-    private final long windowSize;
-    private final boolean retainDuplicates;
-    private final WindowStoreTypes windowStoreType;
+    final String name;
+    final long retentionPeriod;
+    final long segmentInterval;
+    final long windowSize;
+    final boolean retainDuplicates;
+    final WindowStoreTypes windowStoreType;
 
     public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(
         final String name,
         final Duration retentionPeriod,
         final Duration windowSize,
         final boolean retainDuplicates,
-        final boolean hasIndex,
-        final boolean withHeaders
+        final boolean hasIndex
     ) {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@@ -75,7 +74,7 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
             windowSizeMs,
             retainDuplicates,
             hasIndex,
-            withHeaders
+            false
         );
     }
 
@@ -109,7 +108,7 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
         }
     }
 
-    private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+    RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
         final String name,
         final long retentionPeriod,
         final long segmentInterval,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.java
new file mode 100644
index 00000000000..e0bc7704e76
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+
+public class RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier
+    extends RocksDbIndexedTimeOrderedWindowBytesStoreSupplier
+    implements HeadersBytesStoreSupplier {
+
+    public static RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier 
create(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates,
+        final boolean hasIndex
+    ) {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+        final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be 
negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                + name + " must be no smaller than its window size. Got size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new 
RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier(
+            name,
+            retentionMs,
+            defaultSegmentInterval,
+            windowSizeMs,
+            retainDuplicates,
+            hasIndex,
+            true
+        );
+    }
+
+    private RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier(
+        final String name,
+        final long retentionPeriod,
+        final long segmentInterval,
+        final long windowSize,
+        final boolean retainDuplicates,
+        final boolean withIndex,
+        final boolean withHeaders
+    ) {
+        super(
+            name,
+            retentionPeriod,
+            segmentInterval,
+            windowSize,
+            retainDuplicates,
+            withIndex,
+            withHeaders
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier{" 
+
+                   "name='" + name + '\'' +
+                   ", retentionPeriod=" + retentionPeriod +
+                   ", segmentInterval=" + segmentInterval +
+                   ", windowSize=" + windowSize +
+                   ", retainDuplicates=" + retainDuplicates +
+                   ", windowStoreType=" + windowStoreType +
+                   '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter.java
new file mode 100644
index 00000000000..ffc25839278
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.Map;
+
+/**
+ * Adaptor store for the Kafka Streams DSL to bridge between "headers" store 
and "ts-store".
+ *
+ * <p> With KIP-1285 we did rewrite the DLS Processor code to work against 
"header store" interface to allow users
+ * to plugin "header stores", but by default the underlying store is still a 
"ts-store". To avoid "if-then-else"
+ * code across the entire DSL Processor code base, we use this adaptor to wrap 
a "ts-store" and make it look like
+ * a "header store".
+ *
+ * <p> On any write operation, provided {@link 
org.apache.kafka.common.header.Headers} will just be dropped,
+ * and {@link ValueTimestampHeaders} type is translated into {@link 
ValueAndTimestamp} type. Similarly for
+ * any read operation, the underlying {@link ValueAndTimestamp} type is 
translated into a {@link ValueTimestampHeaders}
+ * type with an empty {@link org.apache.kafka.common.header.Headers} object.
+ */
+public class 
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<K, V>
+    extends WrappedStateStore<TimestampedWindowStore<K, V>, K, V>
+    implements TimestampedWindowStoreWithHeaders<K, V> {
+
+    public 
WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter(final 
TimestampedWindowStore<K, V> timestampedWindowStore) {
+        super(timestampedWindowStore);
+    }
+
+    @Override
+    public String name() {
+        return wrapped().name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        wrapped().init(stateStoreContext, root);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void flush() {
+        wrapped().flush();
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        wrapped().commit(changelogOffsets);
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return wrapped().committedOffset(partition);
+    }
+
+    @Override
+    public void close() {
+        wrapped().close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return wrapped().persistent();
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return wrapped().managesOffsets();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return wrapped().isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
+        return wrapped().query(query, positionBound, config);
+    }
+
+    @Override
+    public Position getPosition() {
+        return wrapped().getPosition();
+    }
+
+    @Override
+    public void put(final K key, final ValueTimestampHeaders<V> 
valueTimestampHeaders, final long windowStartTimestamp) {
+        wrapped().put(key, valueTimestampHeaders == null ? null : 
ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp()), windowStartTimestamp);
+    }
+
+    @Override
+    public WindowStoreIterator<ValueTimestampHeaders<V>> fetch(final K key, 
final long timeFrom, final long timeTo) {
+        return new WindowStoreIteratorAdapter(wrapped().fetch(key, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> fetch(final K key, final long time) {
+        final ValueAndTimestamp<V> valueAndTimestamp = wrapped().fetch(key, 
time);
+        return valueAndTimestamp == null ? null : 
ValueTimestampHeaders.make(valueAndTimestamp.value(), 
valueAndTimestamp.timestamp(), new RecordHeaders());
+    }
+
+    @Override
+    public WindowStoreIterator<ValueTimestampHeaders<V>> fetch(final K key, 
final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new WindowStoreIteratorAdapter(wrapped().fetch(key, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<ValueTimestampHeaders<V>> backwardFetch(final K 
key, final long timeFrom, final long timeTo) {
+        return new WindowStoreIteratorAdapter(wrapped().backwardFetch(key, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<ValueTimestampHeaders<V>> backwardFetch(final K 
key, final Instant timeFrom, final Instant timeTo) throws 
IllegalArgumentException {
+        return new WindowStoreIteratorAdapter(wrapped().backwardFetch(key, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final 
K keyFrom, final K keyTo, final long timeFrom, final long timeTo) {
+        return new KeyValueIteratorAdapter(wrapped().fetch(keyFrom, keyTo, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final 
K keyFrom, final K keyTo, final Instant timeFrom, final Instant timeTo) throws 
IllegalArgumentException {
+        return new KeyValueIteratorAdapter(wrapped().fetch(keyFrom, keyTo, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetch(final K keyFrom, final K keyTo, final long timeFrom, final long 
timeTo) {
+        return new KeyValueIteratorAdapter(wrapped().backwardFetch(keyFrom, 
keyTo, timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetch(final K keyFrom, final K keyTo, final Instant timeFrom, final 
Instant timeTo) throws IllegalArgumentException {
+        return new KeyValueIteratorAdapter(wrapped().backwardFetch(keyFrom, 
keyTo, timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> all() {
+        return new KeyValueIteratorAdapter(wrapped().all());
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardAll() {
+        return new KeyValueIteratorAdapter(wrapped().backwardAll());
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
fetchAll(final long timeFrom, final long timeTo) {
+        return new KeyValueIteratorAdapter(wrapped().fetchAll(timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
fetchAll(final Instant timeFrom, final Instant timeTo) throws 
IllegalArgumentException {
+        return new KeyValueIteratorAdapter(wrapped().fetchAll(timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetchAll(final long timeFrom, final long timeTo) {
+        return new 
KeyValueIteratorAdapter(wrapped().backwardFetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetchAll(final Instant timeFrom, final Instant timeTo) throws 
IllegalArgumentException {
+        return new 
KeyValueIteratorAdapter(wrapped().backwardFetchAll(timeFrom, timeTo));
+    }
+
+    private final class WindowStoreIteratorAdapter implements 
WindowStoreIterator<ValueTimestampHeaders<V>> {
+        private final WindowStoreIterator<ValueAndTimestamp<V>> innerIterator;
+
+        private WindowStoreIteratorAdapter(final 
WindowStoreIterator<ValueAndTimestamp<V>> innerIterator) {
+            this.innerIterator = innerIterator;
+        }
+
+        @Override
+        public void close() {
+            innerIterator.close();
+        }
+
+        @Override
+        public Long peekNextKey() {
+            return innerIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return innerIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, ValueTimestampHeaders<V>> next() {
+            final KeyValue<Long, ValueAndTimestamp<V>> next = 
innerIterator.next();
+            return KeyValue.pair(next.key, 
ValueTimestampHeaders.make(next.value.value(), next.value.timestamp(), new 
RecordHeaders()));
+        }
+    }
+
+    private final class KeyValueIteratorAdapter implements 
KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> {
+        private final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> 
innerIterator;
+
+        private KeyValueIteratorAdapter(final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<V>> innerIterator) {
+            this.innerIterator = innerIterator;
+        }
+
+        @Override
+        public void close() {
+            innerIterator.close();
+        }
+
+        @Override
+        public Windowed<K> peekNextKey() {
+            return innerIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return innerIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Windowed<K>, ValueTimestampHeaders<V>> next() {
+            final KeyValue<Windowed<K>, ValueAndTimestamp<V>> next = 
innerIterator.next();
+            return KeyValue.pair(next.key, 
ValueTimestampHeaders.make(next.value.value(), next.value.timestamp(), new 
RecordHeaders()));
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index af39d256621..7d4ea159c83 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -1011,7 +1011,6 @@ public class KStreamWindowAggregateTest {
                 Duration.ofDays(1),
                 Duration.ofMillis(windowSize),
                 false,
-                false,
                 false
             );
         } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 6e6e35e7512..f3099d381e8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
@@ -72,8 +73,6 @@ public class KeyValueStoreMaterializerTest {
     @Mock
     private InternalNameProvider nameProvider;
     @Mock
-    private HeadersBytesStoreSupplier headersStoreSupplier;
-    @Mock
     private VersionedBytesStoreSupplier versionedStoreSupplier;
     private final KeyValueStore<Bytes, byte[]> innerKeyValueStore = new 
InMemoryKeyValueStore(STORE_NAME);
     @Mock
@@ -102,10 +101,21 @@ public class KeyValueStoreMaterializerTest {
         when(versionedStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
     }
 
-    private void mockHeadersStoreSupplier() {
-        when(headersStoreSupplier.get()).thenReturn(innerKeyValueStore);
-        when(headersStoreSupplier.name()).thenReturn(STORE_NAME);
-        when(headersStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
+    private final class HeadersStoreSupplier implements 
KeyValueBytesStoreSupplier, HeadersBytesStoreSupplier {
+        @Override
+        public String name() {
+            return STORE_NAME;
+        }
+
+        @Override
+        public KeyValueStore<Bytes, byte[]> get() {
+            return innerKeyValueStore;
+        }
+
+        @Override
+        public String metricsScope() {
+            return METRICS_SCOPE;
+        }
     }
 
     @Test
@@ -162,9 +172,8 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
-        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.as(headersStoreSupplier), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.as(new 
HeadersStoreSupplier()), nameProvider, STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
@@ -178,9 +187,8 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
-        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(headersStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withCachingDisabled(), nameProvider, STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
@@ -191,9 +199,8 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
-        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(headersStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withLoggingDisabled(), nameProvider, STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
@@ -205,9 +212,8 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
-        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(headersStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
@@ -286,4 +292,4 @@ public class KeyValueStoreMaterializerTest {
         materializer.configure(streamsConfig);
         return (VersionedKeyValueStore<String, String>) 
materializer.builder().build();
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index 1abe4beb6af..cd88bc6be75 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -29,13 +29,18 @@ import 
org.apache.kafka.streams.kstream.internals.WindowStoreMaterializer;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.CachingWindowStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
 import 
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier;
 import org.apache.kafka.streams.state.internals.TimeOrderedCachingWindowStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
@@ -55,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -68,8 +72,6 @@ public class WindowStoreMaterializerTest {
     @Mock
     private InternalNameProvider nameProvider;
     @Mock
-    private WindowBytesStoreSupplier windowStoreSupplier;
-    @Mock
     private StreamsConfig streamsConfig;
 
     private final WindowStore<Bytes, byte[]> innerWindowStore =
@@ -95,74 +97,103 @@ public class WindowStoreMaterializerTest {
                 
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
     }
 
-    private void mockWindowStoreSupplier() {
-        when(windowStoreSupplier.get()).thenReturn(innerWindowStore);
-        when(windowStoreSupplier.name()).thenReturn(STORE_NAME);
-        when(windowStoreSupplier.metricsScope()).thenReturn("metricScope");
+    private final class HeadersStoreSupplier implements 
WindowBytesStoreSupplier, HeadersBytesStoreSupplier {
+        @Override
+        public String name() {
+            return STORE_NAME;
+        }
+
+        @Override
+        public WindowStore<Bytes, byte[]> get() {
+            return innerWindowStore;
+        }
+
+        @Override
+        public String metricsScope() {
+            return "metricScope";
+        }
+
+        @Override
+        public long segmentIntervalMs() {
+            return 0;
+        }
+
+        @Override
+        public long windowSize() {
+            return 0;
+        }
+
+        @Override
+        public boolean retainDuplicates() {
+            return false;
+        }
+
+        @Override
+        public long retentionPeriod() {
+            return 0;
+        }
     }
 
     @Test
-    public void 
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
+    public void 
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore logging = caching.wrapped();
 
-        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
     }
 
     @Test
-    public void shouldCreateHeadersBuilderWithCachingDisabled() {
+    public void shouldCreateTimestampedBuilderWithCachingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
     }
 
     @Test
-    public void shouldCreateHeadersBuilderWithLoggingDisabled() {
+    public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStore);
     }
 
     @Test
-    public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
+    public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
         );
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertFalse(wrapped instanceof CachingWindowStore);
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
+        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
-        mockWindowStoreSupplier();
-
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as(windowStoreSupplier), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.as(new 
HeadersStoreSupplier()), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore logging = caching.wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
@@ -172,26 +203,24 @@ public class WindowStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withCachingDisabled(), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withLoggingDisabled(), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertInstanceOf(CachingWindowStore.class, caching);
         assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
@@ -199,46 +228,83 @@ public class WindowStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
-        mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier()).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertFalse(wrapped instanceof CachingWindowStore);
         assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void shouldCreateHeadersStoreWithOnWindowClose() {
+    public void shouldCreateTimestampedStoreWithOnWindowCloseByDefault() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
                 .withCachingDisabled(), nameProvider, STORE_PREFIX);
 
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+    }
+
+    @Test
+    public void 
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
+        emitStrategy = EmitStrategy.onWindowClose();
+
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
+            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
+
+        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
+        assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
+    }
+
+    @Test
+    public void shouldCreateHeadersStoreWithProvidedSupplierOnWindowClose() {
+        emitStrategy = EmitStrategy.onWindowClose();
+
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
+            new MaterializedInternal<>(Materialized.<String, String>as(new 
HeadersStoreSupplier())
+                .withCachingDisabled(), nameProvider, STORE_PREFIX);
+
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierOnWindowCloseAndCachingEnabled() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
+            new 
MaterializedInternal<>(Materialized.as(RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.create(
+                STORE_NAME, Duration.ofMillis(60000L), 
Duration.ofMillis(WINDOW_SIZE_MS), false, true
+            )), nameProvider, STORE_PREFIX);
 
         final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
     }
 
+    @SuppressWarnings("unchecked")
+    private TimestampedWindowStore<String, String> getTimestampedStore(
+        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized) {
+        final WindowStoreMaterializer<String, String> materializer =
+            new WindowStoreMaterializer<>(materialized, windows, emitStrategy);
+        materializer.configure(streamsConfig);
+        return (TimestampedWindowStore<String, String>) 
materializer.builder().build();
+    }
 
     @SuppressWarnings("unchecked")
     private TimestampedWindowStoreWithHeaders<String, String> getHeadersStore(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
index d16df93141d..3c2cd850eb5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest.java
@@ -34,31 +34,31 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
 
     @Test
     public void shouldThrowIfStoreNameIsNull() {
-        final Exception e = assertThrows(NullPointerException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(null, ZERO, ZERO, 
false, false, false));
+        final Exception e = assertThrows(NullPointerException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(null, ZERO, ZERO, 
false, false));
         assertEquals("name cannot be null", e.getMessage());
     }
 
     @Test
     public void shouldThrowIfRetentionPeriodIsNegative() {
-        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName", 
ofMillis(-1L), ZERO, false, false, false));
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName", 
ofMillis(-1L), ZERO, false, false));
         assertEquals("retentionPeriod cannot be negative", e.getMessage());
     }
 
     @Test
     public void shouldThrowIfWindowSizeIsNegative() {
-        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName", 
ofMillis(0L), ofMillis(-1L), false, false, false));
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName", 
ofMillis(0L), ofMillis(-1L), false, false));
         assertEquals("windowSize cannot be negative", e.getMessage());
     }
 
     @Test
     public void shouldThrowIfWindowSizeIsLargerThanRetention() {
-        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName", 
ofMillis(1L), ofMillis(2L), false, false, false));
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("anyName", 
ofMillis(1L), ofMillis(2L), false, false));
         assertEquals("The retention period of the window store anyName must be 
no smaller than its window size. Got size=[2], retention=[1]", e.getMessage());
     }
 
     @Test
     public void shouldCreateRocksDbTimeOrderedWindowStoreWithIndex() {
-        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true, false).get();
+        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true).get();
         final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
@@ -67,7 +67,7 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
 
     @Test
     public void shouldCreateRocksDbTimeOrderedWindowStoreWithoutIndex() {
-        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, false, false).get();
+        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, false).get();
         final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(store, instanceOf(RocksDBTimeOrderedWindowStore.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
@@ -76,7 +76,7 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplierTest {
 
     @Test
     public void shouldCreateRocksDbTimeOrderedWindowStoreWithHeaders() {
-        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create("store", ofMillis(1L), 
ofMillis(1L), false, true, true).get();
+        final WindowStore<?, ?> store = 
RocksDbIndexedTimeOrderedWindowBytesStoreWithHeadersSupplier.create("store", 
ofMillis(1L), ofMillis(1L), false, true).get();
         final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(store, 
instanceOf(RocksDBTimeOrderedWindowStoreWithHeaders.class));
         assertThat(wrapped, 
instanceOf(RocksDBTimeOrderedWindowSegmentedBytesStore.class));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index e6bfa0b27c9..8a0c95279b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -171,8 +171,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
                 ofHours(1L),
                 ofMinutes(1),
                 false,
-                hasIndex,
-                false
+                hasIndex
             ), Serdes.String(), Serdes.String())
             .withCachingEnabled();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 4b77ce64439..472e55105cf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -173,8 +173,7 @@ public class TimeOrderedWindowStoreTest {
                 ofHours(1L),
                 ofMinutes(1),
                 false,
-                hasIndex,
-                false
+                hasIndex
             ), Serdes.String(), Serdes.String())
             .withCachingEnabled();
 

Reply via email to