This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 2b586f0da61 KAFKA-20194: Ensure backward compatibility (#21904)
2b586f0da61 is described below

commit 2b586f0da61b02fdd42b43c0f25279b4b891c786
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 1 14:37:21 2026 -0700

    KAFKA-20194: Ensure backward compatibility (#21904)
    
    By default, the DSL should expose it's state-stores as ts-stores, as
    long as header format is not enabled; otherwise, it would be a backward
    incompatible change.
    
    This PR ensures that the builders are creating the correct state stores,
    depending on the format, and we insert an adaptor to allow the DSL
    Processors to work only against headers-store interface.
    
    Reviewers: Bill Bejeck <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../integration/PapiDslIntegrationTest.java        | 296 +++++++++++++++++++++
 .../integration/QueryableStateIntegrationTest.java |  34 +--
 .../kstream/internals/KStreamAggregate.java        |   2 +-
 .../streams/kstream/internals/KStreamReduce.java   |   2 +-
 .../streams/kstream/internals/KTableAggregate.java |   2 +-
 .../streams/kstream/internals/KTableFilter.java    |   2 +-
 .../kstream/internals/KTableKTableJoinMerger.java  |   2 +-
 .../streams/kstream/internals/KTableMapValues.java |   2 +-
 .../streams/kstream/internals/KTableReduce.java    |   2 +-
 .../streams/kstream/internals/KTableSource.java    |   2 +-
 .../kstream/internals/KTableTransformValues.java   |   2 +-
 .../internals/KeyValueStoreMaterializer.java       |  14 +-
 .../internals/TimestampedCacheFlushListener.java   |  62 +++++
 .../streams/state/HeadersBytesStoreSupplier.java   |  24 ++
 .../org/apache/kafka/streams/state/Stores.java     |   7 +-
 .../state/internals/KeyValueStoreWrapper.java      |  13 +
 ...HeaderStoreToKeyValueTimestampStoreAdapter.java | 225 ++++++++++++++++
 .../RocksDBKeyValueBytesStoreSupplier.java         |  12 +-
 ... RocksDBKeyValueHeadersBytesStoreSupplier.java} |  23 +-
 .../internals/KeyValueStoreMaterializerTest.java   |  84 +++---
 20 files changed, 712 insertions(+), 100 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
new file mode 100644
index 00000000000..891f1d527fd
--- /dev/null
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class PapiDslIntegrationTest {
+    final StreamsBuilder builder = new StreamsBuilder();
+
+    private void verify(final KTable<String, String> table) {
+        table.toStream()
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedKeyValueStore<String, String> store = 
context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueAndTimestamp<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, ValueAndTimestamp<String>> 
row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void processorShouldAccessSourceKTableStoreAsTimestampedStore() {
+        verify(builder.table("input-topic", Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())));
+    }
+
+    @Test
+    public void processorShouldAccessFilteredKTableStoreAsTimestampedStore() {
+        verify(builder
+            .table("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .filter((k, v) -> true, Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+        );
+    }
+
+    @Test
+    public void processorShouldAccessMappedKTableStoreAsTimestampedStore() {
+        verify(builder
+            .table("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .mapValues(v -> v, Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessTransformedKTableStoreAsTimestampedStore() {
+        verify(builder
+            .table("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .transformValues(() -> new ValueTransformerWithKey<String, String, 
String>() {
+                @Override
+                public void init(final ProcessorContext context) { }
+
+                @Override
+                public String transform(final String readOnlyKey, final String 
value) {
+                    return value;
+                }
+
+                @Override
+                public void close() { }
+            }, Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+        );
+    }
+
+    @Test
+    public void processorShouldAccessReducedKTableStoreAsTimestampedStore() {
+        verify(builder
+            .table("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupBy((KeyValueMapper<String, String, KeyValue<String, 
String>>) KeyValue::pair, Grouped.with(Serdes.String(), Serdes.String()))
+            .reduce(
+                (value, aggregate) -> value,
+                (value, aggregate) -> aggregate,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void processorShouldAccessAggregatedKTableStoreAsTimestampedStore() 
{
+        verify(builder
+            .table("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupBy((KeyValueMapper<String, String, KeyValue<String, 
String>>) KeyValue::pair, Grouped.with(Serdes.String(), Serdes.String()))
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                (key, value, aggregate) -> aggregate,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    private void verifyJoin(final KTable<String, String> table) {
+        table.toStream()
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedKeyValueStore<String, String> store = 
context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueAndTimestamp<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, ValueAndTimestamp<String>> 
row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> leftInputTopic = 
testDriver.createInputTopic("left-input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestInputTopic<String, String> rightInputTopic = 
testDriver.createInputTopic("right-input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            leftInputTopic.pipeInput("key1", "left");
+            rightInputTopic.pipeInput("key1", "right");
+
+            assertEquals(KeyValue.pair("key1", "left-right"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void processorShouldAccessJoinedKTableStoreAsTimestampedStore() {
+        verifyJoin(builder
+            .table("left-input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .join(
+                builder.table("right-input-topic", 
Consumed.with(Serdes.String(), Serdes.String())),
+                (left, right) -> left + "-" + right,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void processorShouldAccessFKJoinedKTableStoreAsTimestampedStore() {
+        verifyJoin(builder
+            .table("left-input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .join(
+                builder.table("right-input-topic", 
Consumed.with(Serdes.String(), Serdes.String())),
+                (key, value) -> key,
+                (left, right) -> left + "-" + right,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamReducedKTableStoreAsTimestampedStore() {
+        verify(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .reduce(
+                (value, aggregate) -> value,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void 
processorShouldAccessKStreamAggregatedKTableStoreAsTimestampedStore() {
+        verify(builder
+            .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .aggregate(
+                () -> "",
+                (key, value, aggregate) -> value,
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            )
+        );
+    }
+
+    @Test
+    public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table("input-topic", Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+            .toStream()
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedKeyValueStoreWithHeaders<String, String> 
store = context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueTimestampHeaders<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, 
ValueTimestampHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        final Properties props = new Properties();
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, 
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+
+    @Test
+    public void processorShouldAccessKTableStoreAsHeadersStoreViaSupplier() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> 
materialized = 
Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"));
+        builder.table("input-topic", 
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+            .toStream()
+            .process(() -> new ContextualProcessor<String, String, String, 
String>() {
+                @Override
+                public void process(final Record<String, String> record) {
+                    final TimestampedKeyValueStoreWithHeaders<String, String> 
store = context().getStateStore("table-store");
+
+                    try (final KeyValueIterator<String, 
ValueTimestampHeaders<String>> it = store.all()) {
+                        while (it.hasNext()) {
+                            final KeyValue<String, 
ValueTimestampHeaders<String>> row = it.next();
+                            context().forward(new Record<>(row.key, 
row.value.value(), row.value.timestamp()));
+                        }
+                    }
+                }
+            }, "table-store")
+            .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new 
StringDeserializer());
+
+            inputTopic.pipeInput("key1", "value1");
+
+            assertEquals(KeyValue.pair("key1", "value1"), 
outputTopic.readKeyValue());
+        }
+    }
+}
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 9859e3ed24f..ae0e6226ba2 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -220,8 +220,8 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
10000);
@@ -442,7 +442,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void shouldRejectNonExistentStoreName(final TestInfo testInfo) 
throws InterruptedException {
+    public void shouldRejectNonExistentStoreName(final TestInfo testInfo) {
         final String uniqueTestName = safeUniqueTestName(testInfo);
         final String input = uniqueTestName + "-input";
         final String storeName = uniqueTestName + "-input-table";
@@ -480,7 +480,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws 
InterruptedException {
+    public void shouldRejectWronglyTypedStore(final TestInfo testInfo) {
         final String uniqueTestName = safeUniqueTestName(testInfo);
         final String input = uniqueTestName + "-input";
         final String storeName = uniqueTestName + "-input-table";
@@ -521,7 +521,7 @@ public class QueryableStateIntegrationTest {
                     "Cannot get state store " + storeName + " because the 
queryable store type" +
                         " [class 
org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType]" +
                         " does not accept the actual store type" +
-                        " [class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders]."
+                        " [class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
                 )
             );
         }
@@ -734,8 +734,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryFilterState() throws Exception {
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.LongSerde.class);
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, Long>> batch1 = new HashSet<>(
@@ -800,8 +800,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryMapValuesState() throws Exception {
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>(
@@ -852,8 +852,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>(
@@ -897,7 +897,7 @@ public class QueryableStateIntegrationTest {
             IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, 
keyValueStore());
 
         int index = 0;
-        try (final KeyValueIterator<String, Long> range = 
myMapStore.prefixScan("go", Serdes.String().serializer())) {
+        try (final KeyValueIterator<String, Long> range = 
myMapStore.prefixScan("go", new StringSerializer())) {
             while (range.hasNext()) {
                 assertEquals(expectedPrefixScanResult.get(index++), 
range.next());
             }
@@ -906,8 +906,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryMapValuesAfterFilterState() throws 
Exception {
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>(
@@ -1220,7 +1220,6 @@ public class QueryableStateIntegrationTest {
 
     private void waitUntilAtLeastNumRecordProcessed(final String topic,
                                                     final int numRecs) throws 
Exception {
-        final long timeout = DEFAULT_TIMEOUT_MS;
         final Properties config = new Properties();
         config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"queryable-state-consumer");
@@ -1231,7 +1230,8 @@ public class QueryableStateIntegrationTest {
             config,
             topic,
             numRecs,
-            timeout);
+            DEFAULT_TIMEOUT_MS
+        );
     }
 
     private Set<KeyValue<String, Long>> fetch(final 
ReadOnlyWindowStore<String, Long> store,
@@ -1250,7 +1250,7 @@ public class QueryableStateIntegrationTest {
     /**
      * A class that periodically produces records in a separate thread
      */
-    private class ProducerRunnable implements Runnable {
+    private static class ProducerRunnable implements Runnable {
         private final String topic;
         private final List<String> inputValues;
         private final int numIterations;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 166c41f1d62..4cc4736cd21 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -95,7 +95,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store.store(),
                 context,
-                new TimestampedCacheFlushListenerWithHeaders<>(context),
+                store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                 sendOldValues);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 0f240fde857..5a6a17a74c9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -92,7 +92,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store.store(),
                 context,
-                new TimestampedCacheFlushListenerWithHeaders<>(context),
+                store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                 sendOldValues);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index eaea79a21ce..e6e0cf91298 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -88,7 +88,7 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store.store(),
                 context,
-                new TimestampedCacheFlushListenerWithHeaders<>(context),
+                store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                 sendOldValues);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index f9ed84f66b1..f1a48e2620f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -128,7 +128,7 @@ public class KTableFilter<KIn, VIn> implements 
KTableProcessorSupplier<KIn, VIn,
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store.store(),
                     context,
-                    new TimestampedCacheFlushListenerWithHeaders<>(context),
+                    store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                     sendOldValues);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 7008620aa07..7fe6798e503 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -126,7 +126,7 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store.store(),
                     context,
-                    new TimestampedCacheFlushListenerWithHeaders<>(context),
+                    store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                     sendOldValues);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 2c6f7abb01e..e71dd84cf7c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -139,7 +139,7 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store.store(),
                     context,
-                    new TimestampedCacheFlushListenerWithHeaders<>(context),
+                    store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                     sendOldValues);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 04d2f65fbe6..8969a6bc0bc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -84,7 +84,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, K, V> {
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store.store(),
                 context,
-                new TimestampedCacheFlushListenerWithHeaders<>(context),
+                store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                 sendOldValues);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 0263aa76356..9bb54befeb4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -113,7 +113,7 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store.store(),
                     context,
-                    new TimestampedCacheFlushListenerWithHeaders<>(context),
+                    store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                     sendOldValues);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 3d92c5e5b08..75b8289132d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -109,7 +109,7 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store.store(),
                     context,
-                    new TimestampedCacheFlushListenerWithHeaders<>(context),
+                    store.isHeadersStore() ? new 
TimestampedCacheFlushListenerWithHeaders<>(context) : new 
TimestampedCacheFlushListener<>(context),
                     sendOldValues);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 2a80f21a6ee..b5f8cdfbe14 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -55,11 +56,16 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
                     (VersionedBytesStoreSupplier) supplier,
                     materialized.keySerde(),
                     materialized.valueSerde());
-        } else {
+        } else if (supplier instanceof HeadersBytesStoreSupplier) {
             builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
-                    supplier,
-                    materialized.keySerde(),
-                    materialized.valueSerde());
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
         }
 
         if (materialized.loggingEnabled()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
new file mode 100644
index 00000000000..1cef42dbff2
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<KOut, ValueAndTimestamp<VOut>> {
+
+    private final InternalProcessorContext<KOut, Change<VOut>> context;
+
+    @SuppressWarnings("rawtypes")
+    private final ProcessorNode myNode;
+
+    TimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> 
context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
+        myNode = this.context.currentNode();
+    }
+
+    @Override
+    public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> 
record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            final VOut newValue = getValueOrNull(record.value().newValue);
+            final VOut oldValue = getValueOrNull(record.value().oldValue);
+            final long timestamp = record.value().newValue != null ? 
record.value().newValue.timestamp() : record.timestamp();
+            final Headers headers = record.headers() != null ? 
record.headers() : new RecordHeaders();
+
+            context.forward(
+                record
+                    .withValue(new Change<>(newValue, oldValue, 
record.value().isLatest))
+                    .withTimestamp(timestamp)
+                    .withHeaders(headers)
+            );
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
new file mode 100644
index 00000000000..f27c1711b0a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+/**
+ * A store supplier that can be used to create one or more "header" stores,
+ * specifically, {@link HeadersBytesStore} instances.
+ */
+public interface HeadersBytesStoreSupplier extends KeyValueBytesStoreSupplier {
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c541eca32a0..48f982f5e67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
 import 
org.apache.kafka.streams.state.internals.RocksDBKeyValueBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDBKeyValueHeadersBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
@@ -97,7 +98,7 @@ public final class Stores {
      */
     public static KeyValueBytesStoreSupplier persistentKeyValueStore(final 
String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new RocksDBKeyValueBytesStoreSupplier(name, false, false);
+        return new RocksDBKeyValueBytesStoreSupplier(name, false);
     }
 
     /**
@@ -116,7 +117,7 @@ public final class Stores {
      */
     public static KeyValueBytesStoreSupplier 
persistentTimestampedKeyValueStore(final String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new RocksDBKeyValueBytesStoreSupplier(name, true, false);
+        return new RocksDBKeyValueBytesStoreSupplier(name, true);
     }
 
     /**
@@ -142,7 +143,7 @@ public final class Stores {
      */
     public static KeyValueBytesStoreSupplier 
persistentTimestampedKeyValueStoreWithHeaders(final String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new RocksDBKeyValueBytesStoreSupplier(name, true, true);
+        return new RocksDBKeyValueHeadersBytesStoreSupplier(name);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index dc9fb21046b..62d939562ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -57,6 +57,15 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     private StateStore store;
 
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        // Try timestamped store
+        try {
+            headersStore = new 
KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter<>(context.getStateStore(storeName));
+            store = headersStore;
+            return;
+        } catch (final ClassCastException e) {
+            // not timestamped store, try headers
+        }
+
         // Try headers-aware timestamped store
         try {
             headersStore = context.getStateStore(storeName);
@@ -119,6 +128,10 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
         return store;
     }
 
+    public boolean isHeadersStore() {
+        return !(headersStore instanceof 
KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter);
+    }
+
     public boolean isVersionedStore() {
         return versionedStore != null;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter.java
new file mode 100644
index 00000000000..a85f96877ac
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Adaptor store for the Kafka Streams DSL to bridge between "headers" store 
and "ts-store".
+ *
+ * <p> With KIP-1285 we did rewrite the DLS Processor code to work against 
"header store" interface to allow users
+ * to plugin "header stores", but by default the underlying store is still a 
"ts-store". To avoid "if-then-else"
+ * code across the entire DSL Processor code base, we use this adaptor to wrap 
a "ts-store" and make it look like
+ * a "header store".
+ *
+ * <p> On any write operation, provided {@link 
org.apache.kafka.common.header.Headers} will just be dropped,
+ * and {@link ValueTimestampHeaders} type is translated into {@link 
ValueAndTimestamp} type. Similarly for
+ * any read operation, the underlying {@link ValueAndTimestamp} type is 
translated into a {@link ValueTimestampHeaders}
+ * type with an empty {@link org.apache.kafka.common.header.Headers} object.
+ */
+public class KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter<K, 
V>
+    extends WrappedStateStore<TimestampedKeyValueStore<K, V>, K, V>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+    public KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter(final 
TimestampedKeyValueStore<K, V> timestampedKeyValueStore) {
+        super(timestampedKeyValueStore);
+    }
+
+    @Override
+    public String name() {
+        return wrapped().name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        wrapped().init(stateStoreContext, root);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void flush() {
+        wrapped().flush();
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        wrapped().commit(changelogOffsets);
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return wrapped().committedOffset(partition);
+    }
+
+    @Override
+    public void close() {
+        wrapped().close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return wrapped().persistent();
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return wrapped().managesOffsets();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return wrapped().isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
+        return wrapped().query(query, positionBound, config);
+    }
+
+    @Override
+    public Position getPosition() {
+        return wrapped().getPosition();
+    }
+
+    @Override
+    public void put(final K key, final ValueTimestampHeaders<V> value) {
+        wrapped().put(
+            key,
+            value == null ? null : ValueAndTimestamp.make(value.value(), 
value.timestamp())
+        );
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> putIfAbsent(final K key, final 
ValueTimestampHeaders<V> value) {
+        final ValueAndTimestamp<V> oldValueAndTimestamp = 
wrapped().putIfAbsent(
+            key,
+            value == null ? null : ValueAndTimestamp.make(value.value(), 
value.timestamp())
+        );
+
+        return oldValueAndTimestamp == null
+            ? null
+            : ValueTimestampHeaders.make(oldValueAndTimestamp.value(), 
oldValueAndTimestamp.timestamp(), new RecordHeaders());
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<K, ValueTimestampHeaders<V>>> 
entries) {
+        wrapped().putAll(
+            entries.stream().map(keyValuePair -> KeyValue.pair(
+                keyValuePair.key,
+                ValueAndTimestamp.make(keyValuePair.value.value(), 
keyValuePair.value.timestamp()))
+            )
+            .collect(Collectors.toList())
+        );
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> delete(final K key) {
+        final ValueAndTimestamp<V> oldValueAndTimestamp = 
wrapped().delete(key);
+
+        return oldValueAndTimestamp == null
+            ? null
+            : ValueTimestampHeaders.make(oldValueAndTimestamp.value(), 
oldValueAndTimestamp.timestamp(), new RecordHeaders());
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        final ValueAndTimestamp<V> valueAndTimestamp = wrapped().get(key);
+
+        return valueAndTimestamp == null
+            ? null
+            : ValueTimestampHeaders.make(valueAndTimestamp.value(), 
valueAndTimestamp.timestamp(), new RecordHeaders());
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from, 
final K to) {
+        return new KeyValueIteratorAdapter(wrapped().range(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K 
from, final K to) {
+        return new KeyValueIteratorAdapter(wrapped().reverseRange(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
+        return new KeyValueIteratorAdapter(wrapped().all());
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
+        return new KeyValueIteratorAdapter(wrapped().reverseAll());
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V>> prefixScan(final P prefix, final PS 
prefixKeySerializer) {
+        return new KeyValueIteratorAdapter(wrapped().prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return wrapped().approximateNumEntries();
+    }
+
+    private final class KeyValueIteratorAdapter implements KeyValueIterator<K, 
ValueTimestampHeaders<V>> {
+        private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
+
+        private KeyValueIteratorAdapter(final KeyValueIterator<K, 
ValueAndTimestamp<V>> innerIterator) {
+            this.innerIterator = innerIterator;
+        }
+
+        @Override
+        public void close() {
+            innerIterator.close();
+        }
+
+        @Override
+        public K peekNextKey() {
+            return innerIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return innerIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, ValueTimestampHeaders<V>> next() {
+            final KeyValue<K, ValueAndTimestamp<V>> next = 
innerIterator.next();
+            return KeyValue.pair(next.key, 
ValueTimestampHeaders.make(next.value.value(), next.value.timestamp(), new 
RecordHeaders()));
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
index a2d3426c6c7..400074f7154 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
@@ -24,18 +24,11 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     private final String name;
     private final boolean returnTimestampedStore;
-    private final boolean returnHeadersStore;
 
     public RocksDBKeyValueBytesStoreSupplier(final String name,
-                                             final boolean 
returnTimestampedStore,
-                                             final boolean returnHeadersStore) 
{
+                                             final boolean 
returnTimestampedStore) {
         this.name = name;
         this.returnTimestampedStore = returnTimestampedStore;
-        this.returnHeadersStore = returnHeadersStore;
-        if (returnHeadersStore && !returnTimestampedStore) {
-            throw new IllegalStateException(
-                "RocksDBKeyValueBytesStoreSupplier cannot return a headers 
store without also returning a timestamped store!");
-        }
     }
 
     @Override
@@ -45,9 +38,6 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     @Override
     public KeyValueStore<Bytes, byte[]> get() {
-        if (returnHeadersStore && returnTimestampedStore) {
-            return new RocksDBTimestampedStoreWithHeaders(name, 
metricsScope());
-        }
         if (returnTimestampedStore) {
             return new RocksDBTimestampedStore(name, metricsScope());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueHeadersBytesStoreSupplier.java
similarity index 54%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueHeadersBytesStoreSupplier.java
index a2d3426c6c7..87608668e39 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueHeadersBytesStoreSupplier.java
@@ -17,25 +17,16 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
-public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupplier {
+public class RocksDBKeyValueHeadersBytesStoreSupplier implements 
KeyValueBytesStoreSupplier, HeadersBytesStoreSupplier {
 
     private final String name;
-    private final boolean returnTimestampedStore;
-    private final boolean returnHeadersStore;
 
-    public RocksDBKeyValueBytesStoreSupplier(final String name,
-                                             final boolean 
returnTimestampedStore,
-                                             final boolean returnHeadersStore) 
{
+    public RocksDBKeyValueHeadersBytesStoreSupplier(final String name) {
         this.name = name;
-        this.returnTimestampedStore = returnTimestampedStore;
-        this.returnHeadersStore = returnHeadersStore;
-        if (returnHeadersStore && !returnTimestampedStore) {
-            throw new IllegalStateException(
-                "RocksDBKeyValueBytesStoreSupplier cannot return a headers 
store without also returning a timestamped store!");
-        }
     }
 
     @Override
@@ -45,13 +36,7 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     @Override
     public KeyValueStore<Bytes, byte[]> get() {
-        if (returnHeadersStore && returnTimestampedStore) {
-            return new RocksDBTimestampedStoreWithHeaders(name, 
metricsScope());
-        }
-        if (returnTimestampedStore) {
-            return new RocksDBTimestampedStore(name, metricsScope());
-        }
-        return new RocksDBStore(name, metricsScope());
+        return new RocksDBTimestampedStoreWithHeaders(name, metricsScope());
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 59a207b988b..6e6e35e7512 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -26,17 +26,20 @@ import 
org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.VersionedBytesStore;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
 import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
@@ -69,7 +72,7 @@ public class KeyValueStoreMaterializerTest {
     @Mock
     private InternalNameProvider nameProvider;
     @Mock
-    private KeyValueBytesStoreSupplier keyValueStoreSupplier;
+    private HeadersBytesStoreSupplier headersStoreSupplier;
     @Mock
     private VersionedBytesStoreSupplier versionedStoreSupplier;
     private final KeyValueStore<Bytes, byte[]> innerKeyValueStore = new 
InMemoryKeyValueStore(STORE_NAME);
@@ -99,74 +102,73 @@ public class KeyValueStoreMaterializerTest {
         when(versionedStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
     }
 
-    private void mockKeyValueStoreSupplier() {
-        when(keyValueStoreSupplier.get()).thenReturn(innerKeyValueStore);
-        when(keyValueStoreSupplier.name()).thenReturn(STORE_NAME);
-        when(keyValueStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
+    private void mockHeadersStoreSupplier() {
+        when(headersStoreSupplier.get()).thenReturn(innerKeyValueStore);
+        when(headersStoreSupplier.name()).thenReturn(STORE_NAME);
+        when(headersStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
     }
 
     @Test
-    public void 
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
+    public void 
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedKeyValueStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore logging = caching.wrapped();
-        assertThat(store, 
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
+        assertThat(store, instanceOf(MeteredTimestampedKeyValueStore.class));
         assertThat(caching, instanceOf(CachingKeyValueStore.class));
-        assertThat(logging, 
instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class));
+        assertThat(logging, 
instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class));
     }
 
     @Test
-    public void shouldCreateDefaultHeadersBuilderWithCachingDisabled() {
+    public void shouldCreateTimestampedBuilderWithCachingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
             Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedKeyValueStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+        assertThat(logging, 
instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class));
     }
 
     @Test
-    public void shouldCreateDefaultHeadersBuilderWithLoggingDisabled() {
+    public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
             Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedKeyValueStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertThat(caching, instanceOf(CachingKeyValueStore.class));
         assertThat(caching.wrapped(), 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
     }
 
     @Test
-    public void 
shouldCreateDefaultHeadersBuilderWithCachingAndLoggingDisabled() {
+    public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
             Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
         );
 
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
+        final TimestampedKeyValueStore<String, String> store = 
getTimestampedStore(materialized);
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class)));
         assertThat(wrapped, 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
-        mockKeyValueStoreSupplier();
-
+        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.as(keyValueStoreSupplier), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.as(headersStoreSupplier), 
nameProvider, STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore logging = caching.wrapped();
         assertThat(innerKeyValueStore.name(), equalTo(store.name()));
         assertThat(store, 
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
@@ -176,26 +178,26 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
-        mockKeyValueStoreSupplier();
+        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(keyValueStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, 
String>as(headersStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertThat(innerKeyValueStore.name(), equalTo(store.name()));
         assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
     }
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
-        mockKeyValueStoreSupplier();
+        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(keyValueStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, 
String>as(headersStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         assertThat(innerKeyValueStore.name(), equalTo(store.name()));
         assertThat(caching, instanceOf(CachingKeyValueStore.class));
         assertThat(caching.wrapped(), 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
@@ -203,13 +205,13 @@ public class KeyValueStoreMaterializerTest {
 
     @Test
     public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
-        mockKeyValueStoreSupplier();
+        mockHeadersStoreSupplier();
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(keyValueStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
+            new MaterializedInternal<>(Materialized.<String, 
String>as(headersStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
 
         final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        final StateStore wrapped = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(innerKeyValueStore.name(), equalTo(store.name()));
         assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class)));
         assertThat(wrapped, 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
@@ -223,7 +225,7 @@ public class KeyValueStoreMaterializerTest {
 
         final VersionedKeyValueStore<String, String> store = 
getVersionedStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore inner = logging.wrapped();
         assertThat(innerVersionedStore.name(), equalTo(store.name()));
         assertThat(store, instanceOf(MeteredVersionedKeyValueStore.class));
@@ -239,7 +241,7 @@ public class KeyValueStoreMaterializerTest {
 
         final VersionedKeyValueStore<String, String> store = 
getVersionedStore(materialized);
 
-        final StateStore inner = ((WrappedStateStore) store).wrapped();
+        final StateStore inner = ((WrappedStateStore<?, ?, ?>) 
store).wrapped();
         assertThat(innerVersionedStore.name(), equalTo(store.name()));
         assertThat(store, instanceOf(MeteredVersionedKeyValueStore.class));
         assertThat(innerVersionedStore, equalTo(inner));
@@ -253,7 +255,7 @@ public class KeyValueStoreMaterializerTest {
 
         final VersionedKeyValueStore<String, String> store = 
getVersionedStore(materialized);
 
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?, 
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
         final StateStore inner = logging.wrapped();
         assertThat(innerVersionedStore.name(), equalTo(store.name()));
         assertThat(store, instanceOf(MeteredVersionedKeyValueStore.class));
@@ -261,6 +263,14 @@ public class KeyValueStoreMaterializerTest {
         assertThat(innerVersionedStore, equalTo(inner));
     }
 
+    @SuppressWarnings("unchecked")
+    private TimestampedKeyValueStore<String, String> getTimestampedStore(
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized) {
+        final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
+        materializer.configure(streamsConfig);
+        return (TimestampedKeyValueStore<String, String>) 
materializer.builder().build();
+    }
+
     @SuppressWarnings("unchecked")
     private TimestampedKeyValueStoreWithHeaders<String, String> 
getHeadersStore(
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized) {

Reply via email to