This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb36f97d960 KAFKA-20304: Implement all required ReadOnly*Facade 
classes for headers stores (#21743)
fb36f97d960 is described below

commit fb36f97d960fca677778fbb4258e71cc35c8a030
Author: Alieh Saeedi <[email protected]>
AuthorDate: Sun Mar 15 01:28:42 2026 +0100

    KAFKA-20304: Implement all required ReadOnly*Facade classes for headers 
stores (#21743)
    
    This PR
    - adds all `ReadOnly*Facade` classes required to wrap headers store for
    backward compatibility
    - refactors `ReadOnly*Facade` classes to use generic infrastructure with
    converter functions
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 ...ade.java => GenericKeyValueIteratorFacade.java} |  38 ++-
 .../GenericReadOnlyKeyValueStoreFacade.java        |  78 ++++++
 .../GenericReadOnlyWindowStoreFacade.java          | 101 +++++++
 ....java => GenericWindowStoreIteratorFacade.java} |  40 +--
 .../state/internals/GlobalStateStoreProvider.java  |   4 +-
 .../internals/ReadOnlyKeyValueStoreFacade.java     |  70 -----
 .../state/internals/ReadOnlyWindowStoreFacade.java | 124 ---------
 .../internals/StreamThreadStateStoreProvider.java  |   4 +-
 .../streams/state/internals/ValueConverters.java   |  65 +++++
 ...java => GenericKeyValueIteratorFacadeTest.java} |  57 ++--
 .../GenericReadOnlyKeyValueStoreFacadeTest.java    | 209 +++++++++++++++
 .../GenericReadOnlyWindowStoreFacadeTest.java      | 296 +++++++++++++++++++++
 ...a => GenericWindowStoreIteratorFacadeTest.java} |  67 ++---
 .../internals/ReadOnlyKeyValueStoreFacadeTest.java | 107 --------
 .../internals/ReadOnlyWindowStoreFacadeTest.java   | 192 -------------
 .../state/internals/ValueConvertersTest.java       |  98 +++++++
 .../apache/kafka/streams/TopologyTestDriver.java   |  84 +++---
 .../kafka/streams/KeyValueStoreFacadeTest.java     |  10 +
 .../kafka/streams/WindowStoreFacadeTest.java       |  70 +++++
 19 files changed, 1098 insertions(+), 616 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacade.java
similarity index 56%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacade.java
index f79b6f352a2..2ab15313b61 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacade.java
@@ -18,20 +18,30 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+import java.util.function.Function;
 
-public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
-    private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
-
-    public KeyValueIteratorFacade(final KeyValueIterator<K, 
ValueAndTimestamp<V>> iterator) {
-        innerIterator = iterator;
+/**
+ * Generic iterator facade that wraps a {@link KeyValueIterator} and converts 
values
+ * using a provided converter function.
+ *
+ * @param <K> key type
+ * @param <InV> input value type (from inner iterator)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+class GenericKeyValueIteratorFacade<K, InV, OutV> implements 
KeyValueIterator<K, OutV> {
+    private final KeyValueIterator<K, InV> innerIterator;
+    private final Function<InV, OutV> valueConverter;
+
+    GenericKeyValueIteratorFacade(final KeyValueIterator<K, InV> innerIterator,
+                                  final Function<InV, OutV> valueConverter) {
+        this.innerIterator = innerIterator;
+        this.valueConverter = valueConverter;
     }
 
     @Override
-    public boolean hasNext() {
-        return innerIterator.hasNext();
+    public void close() {
+        innerIterator.close();
     }
 
     @Override
@@ -40,13 +50,13 @@ public class KeyValueIteratorFacade<K, V> implements 
KeyValueIterator<K, V> {
     }
 
     @Override
-    public KeyValue<K, V> next() {
-        final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue = 
innerIterator.next();
-        return KeyValue.pair(innerKeyValue.key, 
getValueOrNull(innerKeyValue.value));
+    public boolean hasNext() {
+        return innerIterator.hasNext();
     }
 
     @Override
-    public void close() {
-        innerIterator.close();
+    public KeyValue<K, OutV> next() {
+        final KeyValue<K, InV> innerKeyValue = innerIterator.next();
+        return KeyValue.pair(innerKeyValue.key, 
valueConverter.apply(innerKeyValue.value));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java
new file mode 100644
index 00000000000..a35db0ebbd1
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.function.Function;
+
+/**
+ * Generic facade that wraps a {@link ReadOnlyKeyValueStore} and converts 
values
+ * using a provided converter function.
+ *
+ * @param <K> key type
+ * @param <InV> input value type (from inner store)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+public class GenericReadOnlyKeyValueStoreFacade<K, InV, OutV> implements 
ReadOnlyKeyValueStore<K, OutV> {
+    private final ReadOnlyKeyValueStore<K, InV> inner;
+    private final Function<InV, OutV> valueConverter;
+
+    public GenericReadOnlyKeyValueStoreFacade(final ReadOnlyKeyValueStore<K, 
InV> inner,
+                                              final Function<InV, OutV> 
valueConverter) {
+        this.inner = inner;
+        this.valueConverter = valueConverter;
+    }
+
+    @Override
+    public OutV get(final K key) {
+        return valueConverter.apply(inner.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<K, OutV> range(final K from, final K to) {
+        return new GenericKeyValueIteratorFacade<>(inner.range(from, to), 
valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<K, OutV> reverseRange(final K from, final K to) {
+        return new GenericKeyValueIteratorFacade<>(inner.reverseRange(from, 
to), valueConverter);
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, OutV> 
prefixScan(final P prefix,
+                                                                               
final PS prefixKeySerializer) {
+        return new GenericKeyValueIteratorFacade<>(inner.prefixScan(prefix, 
prefixKeySerializer), valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<K, OutV> all() {
+        return new GenericKeyValueIteratorFacade<>(inner.all(), 
valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<K, OutV> reverseAll() {
+        return new GenericKeyValueIteratorFacade<>(inner.reverseAll(), 
valueConverter);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java
new file mode 100644
index 00000000000..93f4d0b5245
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.function.Function;
+
+/**
+ * Generic facade that wraps a {@link ReadOnlyWindowStore} and converts values
+ * using a provided converter function.
+ *
+ * @param <K> key type
+ * @param <InV> input value type (from inner store)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+public class GenericReadOnlyWindowStoreFacade<K, InV, OutV> implements 
ReadOnlyWindowStore<K, OutV> {
+    private final ReadOnlyWindowStore<K, InV> inner;
+    private final Function<InV, OutV> valueConverter;
+
+    public GenericReadOnlyWindowStoreFacade(final ReadOnlyWindowStore<K, InV> 
inner,
+                                            final Function<InV, OutV> 
valueConverter) {
+        this.inner = inner;
+        this.valueConverter = valueConverter;
+    }
+
+    @Override
+    public OutV fetch(final K key, final long time) {
+        return valueConverter.apply(inner.fetch(key, time));
+    }
+
+    @Override
+    public WindowStoreIterator<OutV> fetch(final K key,
+                                           final Instant timeFrom,
+                                           final Instant timeTo) throws 
IllegalArgumentException {
+        return new GenericWindowStoreIteratorFacade<>(inner.fetch(key, 
timeFrom, timeTo), valueConverter);
+    }
+
+    @Override
+    public WindowStoreIterator<OutV> backwardFetch(final K key,
+                                                   final Instant timeFrom,
+                                                   final Instant timeTo) 
throws IllegalArgumentException {
+        return new GenericWindowStoreIteratorFacade<>(inner.backwardFetch(key, 
timeFrom, timeTo), valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, OutV> fetch(final K keyFrom,
+                                                     final K keyTo,
+                                                     final Instant timeFrom,
+                                                     final Instant timeTo) 
throws IllegalArgumentException {
+        return new GenericKeyValueIteratorFacade<>(inner.fetch(keyFrom, keyTo, 
timeFrom, timeTo), valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, OutV> backwardFetch(final K keyFrom,
+                                                             final K keyTo,
+                                                             final Instant 
timeFrom,
+                                                             final Instant 
timeTo) throws IllegalArgumentException {
+        return new 
GenericKeyValueIteratorFacade<>(inner.backwardFetch(keyFrom, keyTo, timeFrom, 
timeTo), valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, OutV> fetchAll(final Instant timeFrom,
+                                                        final Instant timeTo) 
throws IllegalArgumentException {
+        return new GenericKeyValueIteratorFacade<>(inner.fetchAll(timeFrom, 
timeTo), valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, OutV> backwardFetchAll(final Instant 
timeFrom,
+                                                                final Instant 
timeTo) throws IllegalArgumentException {
+        return new 
GenericKeyValueIteratorFacade<>(inner.backwardFetchAll(timeFrom, timeTo), 
valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, OutV> all() {
+        return new GenericKeyValueIteratorFacade<>(inner.all(), 
valueConverter);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, OutV> backwardAll() {
+        return new GenericKeyValueIteratorFacade<>(inner.backwardAll(), 
valueConverter);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacade.java
similarity index 52%
rename from 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
rename to 
streams/src/main/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacade.java
index f79b6f352a2..a2e858b05dc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacade.java
@@ -18,35 +18,45 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+import java.util.function.Function;
 
-public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
-    private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
-
-    public KeyValueIteratorFacade(final KeyValueIterator<K, 
ValueAndTimestamp<V>> iterator) {
-        innerIterator = iterator;
+/**
+ * Generic iterator facade that wraps a {@link KeyValueIterator} with Long 
keys and converts values
+ * using a provided converter function to implement {@link 
WindowStoreIterator}.
+ *
+ * @param <InV> input value type (from inner iterator)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+class GenericWindowStoreIteratorFacade<InV, OutV> implements 
WindowStoreIterator<OutV> {
+    private final KeyValueIterator<Long, InV> innerIterator;
+    private final Function<InV, OutV> valueConverter;
+
+    GenericWindowStoreIteratorFacade(final KeyValueIterator<Long, InV> 
innerIterator,
+                                     final Function<InV, OutV> valueConverter) 
{
+        this.innerIterator = innerIterator;
+        this.valueConverter = valueConverter;
     }
 
     @Override
-    public boolean hasNext() {
-        return innerIterator.hasNext();
+    public void close() {
+        innerIterator.close();
     }
 
     @Override
-    public K peekNextKey() {
+    public Long peekNextKey() {
         return innerIterator.peekNextKey();
     }
 
     @Override
-    public KeyValue<K, V> next() {
-        final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue = 
innerIterator.next();
-        return KeyValue.pair(innerKeyValue.key, 
getValueOrNull(innerKeyValue.value));
+    public boolean hasNext() {
+        return innerIterator.hasNext();
     }
 
     @Override
-    public void close() {
-        innerIterator.close();
+    public KeyValue<Long, OutV> next() {
+        final KeyValue<Long, InV> innerKeyValue = innerIterator.next();
+        return KeyValue.pair(innerKeyValue.key, 
valueConverter.apply(innerKeyValue.value));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index f15c21be5c7..6620401cbcf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -46,9 +46,9 @@ public class GlobalStateStoreProvider implements 
StateStoreProvider {
             throw new InvalidStateStoreException("the state store, " + 
storeName + ", is not open.");
         }
         if (store instanceof TimestampedKeyValueStore && queryableStoreType 
instanceof QueryableStoreTypes.KeyValueStoreType) {
-            return (List<T>) Collections.singletonList(new 
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store));
+            return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue()));
         } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
-            return (List<T>) Collections.singletonList(new 
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
+            return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue()));
         } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
             return (List<T>) Collections.singletonList(new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store));
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
deleted file mode 100644
index 7a03f7270e8..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-
-public class ReadOnlyKeyValueStoreFacade<K, V> implements 
ReadOnlyKeyValueStore<K, V> {
-    protected final TimestampedKeyValueStore<K, V> inner;
-
-    protected ReadOnlyKeyValueStoreFacade(final TimestampedKeyValueStore<K, V> 
store) {
-        inner = store;
-    }
-
-    @Override
-    public V get(final K key) {
-        return getValueOrNull(inner.get(key));
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from,
-                                        final K to) {
-        return new KeyValueIteratorFacade<>(inner.range(from, to));
-    }
-
-    @Override
-    public KeyValueIterator<K, V> reverseRange(final K from,
-                                               final K to) {
-        return new KeyValueIteratorFacade<>(inner.reverseRange(from, to));
-    }
-
-    @Override
-    public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix,
-                                                                           
final PS prefixKeySerializer) {
-        return new KeyValueIteratorFacade<>(inner.prefixScan(prefix, 
prefixKeySerializer));
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new KeyValueIteratorFacade<>(inner.all());
-    }
-
-    @Override
-    public KeyValueIterator<K, V> reverseAll() {
-        return new KeyValueIteratorFacade<>(inner.reverseAll());
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return inner.approximateNumEntries();
-    }
-}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
deleted file mode 100644
index 281a1c26757..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyWindowStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.time.Instant;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-
-public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K, 
V> {
-    protected final TimestampedWindowStore<K, V> inner;
-
-    protected ReadOnlyWindowStoreFacade(final TimestampedWindowStore<K, V> 
store) {
-        inner = store;
-    }
-
-    @Override
-    public V fetch(final K key,
-                   final long time) {
-        return getValueOrNull(inner.fetch(key, time));
-    }
-
-    @Override
-    public WindowStoreIterator<V> fetch(final K key,
-                                        final Instant timeFrom,
-                                        final Instant timeTo) throws 
IllegalArgumentException {
-        return new WindowStoreIteratorFacade<>(inner.fetch(key, timeFrom, 
timeTo));
-    }
-
-    @Override
-    public WindowStoreIterator<V> backwardFetch(final K key,
-                                                final Instant timeFrom,
-                                                final Instant timeTo) throws 
IllegalArgumentException {
-        return new WindowStoreIteratorFacade<>(inner.backwardFetch(key, 
timeFrom, timeTo));
-    }
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
-                                                  final K keyTo,
-                                                  final Instant timeFrom,
-                                                  final Instant timeTo) throws 
IllegalArgumentException {
-        return new KeyValueIteratorFacade<>(inner.fetch(keyFrom, keyTo, 
timeFrom, timeTo));
-    }
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
-                                                          final K keyTo,
-                                                          final Instant 
timeFrom,
-                                                          final Instant 
timeTo) throws IllegalArgumentException {
-        return new KeyValueIteratorFacade<>(inner.backwardFetch(keyFrom, 
keyTo, timeFrom, timeTo));
-    }
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant timeFrom,
-                                                     final Instant timeTo) 
throws IllegalArgumentException {
-        return new KeyValueIteratorFacade<>(inner.fetchAll(timeFrom, timeTo));
-    }
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final Instant 
timeFrom,
-                                                             final Instant 
timeTo) throws IllegalArgumentException {
-        return new KeyValueIteratorFacade<>(inner.backwardFetchAll(timeFrom, 
timeTo));
-    }
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> all() {
-        return new KeyValueIteratorFacade<>(inner.all());
-    }
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> backwardAll() {
-        return new KeyValueIteratorFacade<>(inner.backwardAll());
-    }
-
-    private static class WindowStoreIteratorFacade<V> implements 
WindowStoreIterator<V> {
-        final KeyValueIterator<Long, ValueAndTimestamp<V>> innerIterator;
-
-        WindowStoreIteratorFacade(final KeyValueIterator<Long, 
ValueAndTimestamp<V>> iterator) {
-            innerIterator = iterator;
-        }
-
-        @Override
-        public void close() {
-            innerIterator.close();
-        }
-
-        @Override
-        public Long peekNextKey() {
-            return innerIterator.peekNextKey();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return innerIterator.hasNext();
-        }
-
-        @Override
-        public KeyValue<Long, V> next() {
-            final KeyValue<Long, ValueAndTimestamp<V>> innerKeyValue = 
innerIterator.next();
-            return KeyValue.pair(innerKeyValue.key, 
getValueOrNull(innerKeyValue.value));
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index f797ff0aadb..8a7418fec8a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -105,9 +105,9 @@ public class StreamThreadStateStoreProvider {
                             "The state store may have migrated to another 
instance.");
             }
             if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
-                return (T) new 
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
+                return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue());
             } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
-                return (T) new 
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store);
+                return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue());
             } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
                 return (T) new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
             } else {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
new file mode 100644
index 00000000000..3a3ceec2d42
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.function.Function;
+
+/**
+ * Utility class providing common value converter functions for facade 
patterns.
+ * These converters are used to transform between different value 
representations
+ * (e.g., ValueAndTimestamp, ValueTimestampHeaders, plain values).
+ */
+public final class ValueConverters {
+
+    private ValueConverters() {
+        // Utility class - prevent instantiation
+    }
+
+    /**
+     * Converts {@link ValueAndTimestamp} to plain value, discarding timestamp.
+     *
+     * @param <V> value type
+     * @return converter function that extracts the value or returns null
+     */
+    public static <V> Function<ValueAndTimestamp<V>, V> extractValue() {
+        return ValueAndTimestamp::getValueOrNull;
+    }
+
+    /**
+     * Converts {@link ValueTimestampHeaders} to plain value, discarding 
timestamp and headers.
+     *
+     * @param <V> value type
+     * @return converter function that extracts the value or returns null
+     */
+    public static <V> Function<ValueTimestampHeaders<V>, V> 
extractValueFromHeaders() {
+        return vth -> vth == null ? null : vth.value();
+    }
+
+    /**
+     * Converts {@link ValueTimestampHeaders} to {@link ValueAndTimestamp}, 
discarding headers.
+     *
+     * @param <V> value type
+     * @return converter function that creates ValueAndTimestamp or returns 
null
+     */
+    public static <V> Function<ValueTimestampHeaders<V>, ValueAndTimestamp<V>> 
headersToValueAndTimestamp() {
+        return vth -> vth == null ? null :
+            ValueAndTimestamp.make(vth.value(), vth.timestamp());
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
similarity index 50%
rename from 
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
index 2f2458a3e39..3983ed6057d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
@@ -28,53 +28,70 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.util.function.Function;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class KeyValueIteratorFacadeTest {
+public class GenericKeyValueIteratorFacadeTest {
+
     @Mock
-    private KeyValueIterator<String, ValueAndTimestamp<String>> 
mockedKeyValueIterator;
+    private KeyValueIterator<String, ValueAndTimestamp<String>> 
mockedInnerIterator;
 
-    private KeyValueIteratorFacade<String, String> keyValueIteratorFacade;
+    private GenericKeyValueIteratorFacade<String, ValueAndTimestamp<String>, 
String> facade;
 
     @BeforeEach
     public void setup() {
-        keyValueIteratorFacade = new 
KeyValueIteratorFacade<>(mockedKeyValueIterator);
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        facade = new GenericKeyValueIteratorFacade<>(mockedInnerIterator, 
converter);
     }
 
     @Test
-    public void shouldForwardHasNext() {
-        
when(mockedKeyValueIterator.hasNext()).thenReturn(true).thenReturn(false);
+    public void shouldConvertValues() {
+        when(mockedInnerIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
42L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
84L)));
 
-        assertTrue(keyValueIteratorFacade.hasNext());
-        assertFalse(keyValueIteratorFacade.hasNext());
+        assertThat(facade.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(facade.next(), is(KeyValue.pair("key2", "value2")));
     }
 
     @Test
-    public void shouldForwardPeekNextKey() {
-        when(mockedKeyValueIterator.peekNextKey()).thenReturn("key");
+    public void shouldHandleNullValues() {
+        when(mockedInnerIterator.next())
+            .thenReturn(KeyValue.pair("key1", null))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
 
-        assertThat(keyValueIteratorFacade.peekNextKey(), is("key"));
+        assertThat(facade.next(), is(KeyValue.pair("key1", null)));
+        assertThat(facade.next(), is(KeyValue.pair("key2", "value2")));
     }
 
     @Test
-    public void shouldReturnPlainKeyValuePairOnGet() {
-        when(mockedKeyValueIterator.next()).thenReturn(
-            new KeyValue<>("key", ValueAndTimestamp.make("value", 42L)));
+    public void shouldDelegateHasNext() {
+        when(mockedInnerIterator.hasNext()).thenReturn(true, false);
 
-        assertThat(keyValueIteratorFacade.next(), is(KeyValue.pair("key", 
"value")));
+        assertTrue(facade.hasNext());
+        assertFalse(facade.hasNext());
     }
 
     @Test
-    public void shouldCloseInnerIterator() {
-        doNothing().when(mockedKeyValueIterator).close();
+    public void shouldDelegatePeekNextKey() {
+        when(mockedInnerIterator.peekNextKey()).thenReturn("peekedKey", null);
 
-        keyValueIteratorFacade.close();
+        assertThat(facade.peekNextKey(), is("peekedKey"));
+        assertNull(facade.peekNextKey());
+    }
+
+    @Test
+    public void shouldDelegateClose() {
+        facade.close();
+        verify(mockedInnerIterator).close();
     }
-}
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
new file mode 100644
index 00000000000..c569ca20eaa
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class GenericReadOnlyKeyValueStoreFacadeTest {
+
+    @Mock
+    private ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
mockedTimestampedStore;
+    @Mock
+    private ReadOnlyKeyValueStore<String, ValueTimestampHeaders<String>> 
mockedHeadersStore;
+    @Mock
+    private KeyValueIterator<String, ValueAndTimestamp<String>> 
mockedTimestampedIterator;
+    @Mock
+    private KeyValueIterator<String, ValueTimestampHeaders<String>> 
mockedHeadersIterator;
+
+    @Test
+    public void shouldConvertValueWithExtractValueConverter() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedStore.get("key"))
+            .thenReturn(ValueAndTimestamp.make("value", 42L));
+        when(mockedTimestampedStore.get("unknownKey"))
+            .thenReturn(null);
+
+        assertThat(facade.get("key"), is("value"));
+        assertNull(facade.get("unknownKey"));
+    }
+
+    @Test
+    public void shouldConvertValueWithHeadersConverter() {
+        final Function<ValueTimestampHeaders<String>, String> converter = 
ValueConverters.extractValueFromHeaders();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueTimestampHeaders<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedHeadersStore, 
converter);
+
+        when(mockedHeadersStore.get("key"))
+            .thenReturn(ValueTimestampHeaders.make("value", 42L, new 
RecordHeaders()));
+        when(mockedHeadersStore.get("unknownKey"))
+            .thenReturn(null);
+
+        assertThat(facade.get("key"), is("value"));
+        assertNull(facade.get("unknownKey"));
+    }
+
+    @Test
+    public void shouldConvertValueToValueAndTimestamp() {
+        final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
+            ValueConverters.headersToValueAndTimestamp();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedHeadersStore, 
converter);
+
+        when(mockedHeadersStore.get("key"))
+            .thenReturn(ValueTimestampHeaders.make("value", 42L, new 
RecordHeaders()));
+        when(mockedHeadersStore.get("unknownKey"))
+            .thenReturn(null);
+
+        final ValueAndTimestamp<String> result = facade.get("key");
+        assertThat(result.value(), is("value"));
+        assertThat(result.timestamp(), is(42L));
+        assertNull(facade.get("unknownKey"));
+    }
+
+    @Test
+    public void shouldConvertIteratorValues() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        when(mockedTimestampedStore.range("key1", 
"key2")).thenReturn(mockedTimestampedIterator);
+
+        final KeyValueIterator<String, String> iterator = facade.range("key1", 
"key2");
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+    }
+
+    @Test
+    public void shouldConvertReverseRangeIteratorValues() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedIterator.next())
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)))
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)));
+        when(mockedTimestampedStore.reverseRange("key1", 
"key2")).thenReturn(mockedTimestampedIterator);
+
+        final KeyValueIterator<String, String> iterator = 
facade.reverseRange("key1", "key2");
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+    }
+
+    @Test
+    public void shouldConvertPrefixScanIteratorValues() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        final StringSerializer stringSerializer = new StringSerializer();
+        when(mockedTimestampedIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        when(mockedTimestampedStore.prefixScan("key", 
stringSerializer)).thenReturn(mockedTimestampedIterator);
+
+        final KeyValueIterator<String, String> iterator = 
facade.prefixScan("key", stringSerializer);
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+    }
+
+    @Test
+    public void shouldConvertAllIteratorValues() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        
when(mockedTimestampedStore.all()).thenReturn(mockedTimestampedIterator);
+
+        final KeyValueIterator<String, String> iterator = facade.all();
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+    }
+
+    @Test
+    public void shouldConvertReverseAllIteratorValues() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedIterator.next())
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)))
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)));
+        
when(mockedTimestampedStore.reverseAll()).thenReturn(mockedTimestampedIterator);
+
+        final KeyValueIterator<String, String> iterator = facade.reverseAll();
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+    }
+
+    @Test
+    public void shouldForwardApproximateNumEntries() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedStore.approximateNumEntries()).thenReturn(42L);
+
+        assertThat(facade.approximateNumEntries(), is(42L));
+    }
+
+    @Test
+    public void shouldHandleNullValuesInIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyKeyValueStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedIterator.next())
+            .thenReturn(KeyValue.pair("key1", null))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        
when(mockedTimestampedStore.all()).thenReturn(mockedTimestampedIterator);
+
+        final KeyValueIterator<String, String> iterator = facade.all();
+        assertThat(iterator.next(), is(KeyValue.pair("key1", null)));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
new file mode 100644
index 00000000000..86a4f3e5543
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.time.Instant;
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class GenericReadOnlyWindowStoreFacadeTest {
+
+    @Mock
+    private ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
mockedTimestampedStore;
+    @Mock
+    private ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> 
mockedHeadersStore;
+    @Mock
+    private WindowStoreIterator<ValueAndTimestamp<String>> 
mockedTimestampedWindowIterator;
+    @Mock
+    private WindowStoreIterator<ValueTimestampHeaders<String>> 
mockedHeadersWindowIterator;
+    @Mock
+    private KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> 
mockedTimestampedKeyValueIterator;
+    @Mock
+    private KeyValueIterator<Windowed<String>, ValueTimestampHeaders<String>> 
mockedHeadersKeyValueIterator;
+
+    @Test
+    public void shouldConvertSingleValueFetch() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedStore.fetch("key1", 21L))
+            .thenReturn(ValueAndTimestamp.make("value1", 42L));
+        when(mockedTimestampedStore.fetch("unknownKey", 21L))
+            .thenReturn(null);
+
+        assertThat(facade.fetch("key1", 21L), is("value1"));
+        assertNull(facade.fetch("unknownKey", 21L));
+    }
+
+    @Test
+    public void shouldConvertSingleValueFetchWithHeadersConverter() {
+        final Function<ValueTimestampHeaders<String>, String> converter = 
ValueConverters.extractValueFromHeaders();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueTimestampHeaders<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedHeadersStore, 
converter);
+
+        when(mockedHeadersStore.fetch("key1", 21L))
+            .thenReturn(ValueTimestampHeaders.make("value1", 42L, new 
RecordHeaders()));
+        when(mockedHeadersStore.fetch("unknownKey", 21L))
+            .thenReturn(null);
+
+        assertThat(facade.fetch("key1", 21L), is("value1"));
+        assertNull(facade.fetch("unknownKey", 21L));
+    }
+
+    @Test
+    public void shouldConvertToValueAndTimestamp() {
+        final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
+            ValueConverters.headersToValueAndTimestamp();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedHeadersStore, 
converter);
+
+        when(mockedHeadersStore.fetch("key1", 21L))
+            .thenReturn(ValueTimestampHeaders.make("value1", 42L, new 
RecordHeaders()));
+
+        final ValueAndTimestamp<String> result = facade.fetch("key1", 21L);
+        assertThat(result.value(), is("value1"));
+        assertThat(result.timestamp(), is(42L));
+    }
+
+    @Test
+    public void shouldConvertWindowStoreIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedWindowIterator.next())
+            .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 
22L)))
+            .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 
23L)));
+        when(mockedTimestampedStore.fetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedWindowIterator);
+
+        final WindowStoreIterator<String> iterator =
+            facade.fetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+    }
+
+    @Test
+    public void shouldConvertBackwardFetchWindowStoreIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedWindowIterator.next())
+            .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 
23L)))
+            .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 
22L)));
+        when(mockedTimestampedStore.backwardFetch("key1", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedWindowIterator);
+
+        final WindowStoreIterator<String> iterator =
+            facade.backwardFetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+        assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+    }
+
+    @Test
+    public void shouldConvertKeyRangeFetchIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedKeyValueIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        when(mockedTimestampedStore.fetch("key1", "key2", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedKeyValueIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            facade.fetch("key1", "key2", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
+    }
+
+    @Test
+    public void shouldConvertBackwardFetchKeyRangeIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedKeyValueIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)));
+        when(mockedTimestampedStore.backwardFetch("key1", "key2", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedKeyValueIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            facade.backwardFetch("key1", "key2", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
+    }
+
+    @Test
+    public void shouldConvertFetchAllIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedKeyValueIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        when(mockedTimestampedStore.fetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedKeyValueIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            facade.fetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
+    }
+
+    @Test
+    public void shouldConvertBackwardFetchAllIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedKeyValueIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)));
+        
when(mockedTimestampedStore.backwardFetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedKeyValueIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            facade.backwardFetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
+    }
+
+    @Test
+    public void shouldConvertAllIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedKeyValueIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        
when(mockedTimestampedStore.all()).thenReturn(mockedTimestampedKeyValueIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator = 
facade.all();
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
+    }
+
+    @Test
+    public void shouldConvertBackwardAllIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedKeyValueIterator.next())
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)))
+            .thenReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)));
+        
when(mockedTimestampedStore.backwardAll()).thenReturn(mockedTimestampedKeyValueIterator);
+
+        final KeyValueIterator<Windowed<String>, String> iterator = 
facade.backwardAll();
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
+    }
+
+    @Test
+    public void shouldHandleNullValuesInWindowIterator() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        final GenericReadOnlyWindowStoreFacade<String, 
ValueAndTimestamp<String>, String> facade =
+            new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore, 
converter);
+
+        when(mockedTimestampedWindowIterator.next())
+            .thenReturn(KeyValue.pair(21L, null))
+            .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 
23L)));
+        when(mockedTimestampedStore.fetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L)))
+            .thenReturn(mockedTimestampedWindowIterator);
+
+        final WindowStoreIterator<String> iterator =
+            facade.fetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(21L, null)));
+        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
similarity index 50%
rename from 
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
index c1f35083051..9bbab36e6ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -29,63 +29,70 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.util.function.Function;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class SessionStoreIteratorFacadeTest {
+public class GenericWindowStoreIteratorFacadeTest {
+
     @Mock
-    private KeyValueIterator<String, AggregationWithHeaders<String>> 
mockedInnerIterator;
+    private KeyValueIterator<Long, ValueAndTimestamp<String>> 
mockedInnerIterator;
 
-    private SessionStoreIteratorFacade<String, String> facade;
+    private WindowStoreIterator<String> facade;
 
     @BeforeEach
     public void setup() {
-        facade = new SessionStoreIteratorFacade<>(mockedInnerIterator);
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+        facade = new GenericWindowStoreIteratorFacade<>(mockedInnerIterator, 
converter);
     }
 
     @Test
-    public void shouldForwardHasNext() {
-        when(mockedInnerIterator.hasNext()).thenReturn(true);
-        assertTrue(facade.hasNext());
-    }
+    public void shouldConvertValues() {
+        when(mockedInnerIterator.next())
+            .thenReturn(KeyValue.pair(100L, ValueAndTimestamp.make("value1", 
42L)))
+            .thenReturn(KeyValue.pair(200L, ValueAndTimestamp.make("value2", 
84L)));
 
-    @Test
-    public void shouldForwardPeekNextKey() {
-        when(mockedInnerIterator.peekNextKey()).thenReturn("key");
-        assertThat(facade.peekNextKey(), is("key"));
+        assertThat(facade.next(), is(KeyValue.pair(100L, "value1")));
+        assertThat(facade.next(), is(KeyValue.pair(200L, "value2")));
     }
 
     @Test
-    public void shouldReturnPlainKeyValuePairOnNext() {
-        final AggregationWithHeaders<String> aggregation =
-            AggregationWithHeaders.make("value", new RecordHeaders());
-        when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key", 
aggregation));
-        assertThat(facade.next(), is(KeyValue.pair("key", "value")));
+    public void shouldHandleNullValues() {
+        when(mockedInnerIterator.next())
+            .thenReturn(KeyValue.pair(100L, null))
+            .thenReturn(KeyValue.pair(200L, ValueAndTimestamp.make("value2", 
42L)));
+
+        assertThat(facade.next(), is(KeyValue.pair(100L, null)));
+        assertThat(facade.next(), is(KeyValue.pair(200L, "value2")));
     }
 
     @Test
-    public void shouldReturnNullValueWhenAggregationWithHeadersIsNull() {
-        when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key", 
null));
-        final KeyValue<String, String> result = facade.next();
-        assertThat(result.key, is("key"));
-        assertThat(result.value, is(nullValue()));
+    public void shouldDelegateHasNext() {
+        when(mockedInnerIterator.hasNext()).thenReturn(true, false);
+
+        assertTrue(facade.hasNext());
+        assertFalse(facade.hasNext());
     }
 
     @Test
-    public void shouldReturnNullWhenInnerNextReturnsNull() {
-        when(mockedInnerIterator.next()).thenReturn(null);
-        assertThat(facade.next(), is(nullValue()));
+    public void shouldDelegatePeekNextKey() {
+        when(mockedInnerIterator.peekNextKey()).thenReturn(100L, null);
+
+        assertThat(facade.peekNextKey(), is(100L));
+        assertNull(facade.peekNextKey());
     }
 
     @Test
-    public void shouldCloseInnerIterator() {
+    public void shouldDelegateClose() {
         facade.close();
         verify(mockedInnerIterator).close();
     }
-}
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
deleted file mode 100644
index 6d978c80b0f..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class ReadOnlyKeyValueStoreFacadeTest {
-    @Mock
-    private TimestampedKeyValueStore<String, String> 
mockedKeyValueTimestampStore;
-    @Mock
-    private KeyValueIterator<String, ValueAndTimestamp<String>> 
mockedKeyValueTimestampIterator;
-
-    private ReadOnlyKeyValueStoreFacade<String, String> 
readOnlyKeyValueStoreFacade;
-
-    @BeforeEach
-    public void setup() {
-        readOnlyKeyValueStoreFacade = new 
ReadOnlyKeyValueStoreFacade<>(mockedKeyValueTimestampStore);
-    }
-
-    @Test
-    public void shouldReturnPlainValueOnGet() {
-        when(mockedKeyValueTimestampStore.get("key"))
-            .thenReturn(ValueAndTimestamp.make("value", 42L));
-        when(mockedKeyValueTimestampStore.get("unknownKey"))
-            .thenReturn(null);
-
-        assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value"));
-        assertNull(readOnlyKeyValueStoreFacade.get("unknownKey"));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsForRangeIterator() {
-        when(mockedKeyValueTimestampIterator.next())
-            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
-            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
-        when(mockedKeyValueTimestampStore.range("key1", 
"key2")).thenReturn(mockedKeyValueTimestampIterator);
-
-        final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.range("key1", "key2");
-        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsForPrefixScan() {
-        final StringSerializer stringSerializer = new StringSerializer();
-        when(mockedKeyValueTimestampIterator.next())
-            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
-            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
-        when(mockedKeyValueTimestampStore.prefixScan("key", 
stringSerializer)).thenReturn(mockedKeyValueTimestampIterator);
-
-        final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer);
-        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsForAllIterator() {
-        when(mockedKeyValueTimestampIterator.next())
-            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
-            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
-        
when(mockedKeyValueTimestampStore.all()).thenReturn(mockedKeyValueTimestampIterator);
-
-        final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.all();
-        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
-    }
-
-    @Test
-    public void shouldForwardApproximateNumEntries() {
-        
when(mockedKeyValueTimestampStore.approximateNumEntries()).thenReturn(42L);
-
-        assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(), 
is(42L));
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
deleted file mode 100644
index b0d65aa10c2..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import java.time.Instant;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class ReadOnlyWindowStoreFacadeTest {
-    @Mock
-    private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
-    @Mock
-    private WindowStoreIterator<ValueAndTimestamp<String>> 
mockedWindowTimestampIterator;
-    @Mock
-    private KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> 
mockedKeyValueWindowTimestampIterator;
-
-    private ReadOnlyWindowStoreFacade<String, String> 
readOnlyWindowStoreFacade;
-
-    @BeforeEach
-    public void setup() {
-        readOnlyWindowStoreFacade = new 
ReadOnlyWindowStoreFacade<>(mockedWindowTimestampStore);
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnSingleKeyFetch() {
-        when(mockedWindowTimestampStore.fetch("key1", 21L))
-            .thenReturn(ValueAndTimestamp.make("value1", 42L));
-        when(mockedWindowTimestampStore.fetch("unknownKey", 21L))
-            .thenReturn(null);
-
-        assertThat(readOnlyWindowStoreFacade.fetch("key1", 21L), is("value1"));
-        assertNull(readOnlyWindowStoreFacade.fetch("unknownKey", 21L));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnSingleKeyFetchLongParameters() 
{
-        when(mockedWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 
22L)))
-            .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 
23L)));
-        when(mockedWindowTimestampStore.fetch("key1", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
-            .thenReturn(mockedWindowTimestampIterator);
-
-        final WindowStoreIterator<String> iterator =
-            readOnlyWindowStoreFacade.fetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
-
-        assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
-    }
-
-    @Test
-    public void 
shouldReturnPlainKeyValuePairsOnSingleKeyFetchInstantParameters() {
-        when(mockedWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 
22L)))
-            .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 
23L)));
-        when(mockedWindowTimestampStore.fetch("key1", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
-            .thenReturn(mockedWindowTimestampIterator);
-
-        final WindowStoreIterator<String> iterator =
-            readOnlyWindowStoreFacade.fetch("key1", Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
-
-        assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnRangeFetchLongParameters() {
-        when(mockedKeyValueWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key1", new TimeWindow(21L, 22L)),
-                ValueAndTimestamp.make("value1", 22L)))
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key2", new TimeWindow(42L, 43L)),
-                ValueAndTimestamp.make("value2", 100L)));
-        when(mockedWindowTimestampStore.fetch("key1", "key2", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
-            .thenReturn(mockedKeyValueWindowTimestampIterator);
-
-        final KeyValueIterator<Windowed<String>, String> iterator =
-            readOnlyWindowStoreFacade.fetch("key1", "key2", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
-
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnRangeFetchInstantParameters() {
-        when(mockedKeyValueWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key1", new TimeWindow(21L, 22L)),
-                ValueAndTimestamp.make("value1", 22L)))
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key2", new TimeWindow(42L, 43L)),
-                ValueAndTimestamp.make("value2", 100L)));
-        when(mockedWindowTimestampStore.fetch("key1", "key2", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
-            .thenReturn(mockedKeyValueWindowTimestampIterator);
-
-        final KeyValueIterator<Windowed<String>, String> iterator =
-            readOnlyWindowStoreFacade.fetch("key1", "key2", 
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
-
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnFetchAllLongParameters() {
-        when(mockedKeyValueWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key1", new TimeWindow(21L, 22L)),
-                ValueAndTimestamp.make("value1", 22L)))
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key2", new TimeWindow(42L, 43L)),
-                ValueAndTimestamp.make("value2", 100L)));
-        when(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L)))
-            .thenReturn(mockedKeyValueWindowTimestampIterator);
-
-        final KeyValueIterator<Windowed<String>, String> iterator =
-            readOnlyWindowStoreFacade.fetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
-
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnFetchAllInstantParameters() {
-        when(mockedKeyValueWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key1", new TimeWindow(21L, 22L)),
-                ValueAndTimestamp.make("value1", 22L)))
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key2", new TimeWindow(42L, 43L)),
-                ValueAndTimestamp.make("value2", 100L)));
-        when(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L)))
-            .thenReturn(mockedKeyValueWindowTimestampIterator);
-
-        final KeyValueIterator<Windowed<String>, String> iterator =
-            readOnlyWindowStoreFacade.fetchAll(Instant.ofEpochMilli(21L), 
Instant.ofEpochMilli(42L));
-
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
-    }
-
-    @Test
-    public void shouldReturnPlainKeyValuePairsOnAll() {
-        when(mockedKeyValueWindowTimestampIterator.next())
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key1", new TimeWindow(21L, 22L)),
-                ValueAndTimestamp.make("value1", 22L)))
-            .thenReturn(KeyValue.pair(
-                new Windowed<>("key2", new TimeWindow(42L, 43L)),
-                ValueAndTimestamp.make("value2", 100L)));
-        
when(mockedWindowTimestampStore.all()).thenReturn(mockedKeyValueWindowTimestampIterator);
-
-        final KeyValueIterator<Windowed<String>, String> iterator = 
readOnlyWindowStoreFacade.all();
-
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", 
new TimeWindow(21L, 22L)), "value1")));
-        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", 
new TimeWindow(42L, 43L)), "value2")));
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
new file mode 100644
index 00000000000..fd8093f2933
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ValueConvertersTest {
+
+    @Test
+    public void extractValueShouldReturnPlainValue() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+
+        final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make("value", 42L);
+        assertThat(converter.apply(valueAndTimestamp), is("value"));
+    }
+
+    @Test
+    public void extractValueShouldReturnNullWhenInputIsNull() {
+        final Function<ValueAndTimestamp<String>, String> converter = 
ValueConverters.extractValue();
+
+        assertNull(converter.apply(null));
+    }
+
+    @Test
+    public void extractValueFromHeadersShouldReturnPlainValue() {
+        final Function<ValueTimestampHeaders<String>, String> converter = 
ValueConverters.extractValueFromHeaders();
+
+        final ValueTimestampHeaders<String> vth = 
ValueTimestampHeaders.make("value", 42L, new RecordHeaders());
+        assertThat(converter.apply(vth), is("value"));
+    }
+
+    @Test
+    public void extractValueFromHeadersShouldReturnNullWhenInputIsNull() {
+        final Function<ValueTimestampHeaders<String>, String> converter = 
ValueConverters.extractValueFromHeaders();
+
+        assertNull(converter.apply(null));
+    }
+
+    @Test
+    public void headersToValueAndTimestampShouldConvertCorrectly() {
+        final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
+            ValueConverters.headersToValueAndTimestamp();
+
+        final ValueTimestampHeaders<String> vth = 
ValueTimestampHeaders.make("value", 42L, new RecordHeaders());
+        final ValueAndTimestamp<String> result = converter.apply(vth);
+
+        assertThat(result.value(), is("value"));
+        assertThat(result.timestamp(), is(42L));
+    }
+
+    @Test
+    public void headersToValueAndTimestampShouldReturnNullWhenInputIsNull() {
+        final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
+            ValueConverters.headersToValueAndTimestamp();
+
+        assertNull(converter.apply(null));
+    }
+
+    @Test
+    public void headersToValueAndTimestampShouldDiscardHeaders() {
+        final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
+            ValueConverters.headersToValueAndTimestamp();
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        final ValueTimestampHeaders<String> vth = 
ValueTimestampHeaders.make("value", 42L, headers);
+
+        final ValueAndTimestamp<String> result = converter.apply(vth);
+
+        // Should only have value and timestamp, headers are discarded
+        assertThat(result.value(), is("value"));
+        assertThat(result.timestamp(), is(42L));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 4fa9117cee7..beae91966fc 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -93,10 +93,11 @@ import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
-import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
+import 
org.apache.kafka.streams.state.internals.GenericReadOnlyKeyValueStoreFacade;
+import 
org.apache.kafka.streams.state.internals.GenericReadOnlyWindowStoreFacade;
 import org.apache.kafka.streams.state.internals.SessionStoreIteratorFacade;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ValueConverters;
 import org.apache.kafka.streams.test.TestRecord;
 
 import org.slf4j.Logger;
@@ -1286,10 +1287,12 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
-    static class KeyValueStoreFacade<K, V> extends 
ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
+    static class KeyValueStoreFacade<K, V> extends 
GenericReadOnlyKeyValueStoreFacade<K, ValueAndTimestamp<V>, V> implements 
KeyValueStore<K, V> {
+        private final TimestampedKeyValueStore<K, V> inner;
 
-        public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) 
{
-            super(inner);
+        public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> store) 
{
+            super(store, ValueConverters.extractValue());
+            this.inner = store;
         }
 
         @Override
@@ -1350,10 +1353,12 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
-    static class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, 
V> implements WindowStore<K, V> {
+    static class WindowStoreFacade<K, V> extends 
GenericReadOnlyWindowStoreFacade<K, ValueAndTimestamp<V>, V> implements 
WindowStore<K, V> {
+        private final TimestampedWindowStore<K, V> inner;
 
         public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
-            super(store);
+            super(store, ValueConverters.extractValue());
+            this.inner = store;
         }
 
         @Override
@@ -1361,13 +1366,6 @@ public class TopologyTestDriver implements Closeable {
             inner.init(stateStoreContext, root);
         }
 
-        @Override
-        public void put(final K key,
-                        final V value,
-                        final long windowStartTimestamp) {
-            inner.put(key, ValueAndTimestamp.make(value, 
ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
-        }
-
         @Override
         public WindowStoreIterator<V> fetch(final K key,
                                             final long timeFrom,
@@ -1376,10 +1374,9 @@ public class TopologyTestDriver implements Closeable {
         }
 
         @Override
-        public WindowStoreIterator<V> backwardFetch(final K key,
-                                                    final long timeFrom,
-                                                    final long timeTo) {
-            return backwardFetch(key, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                         final long timeTo) {
+            return fetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
         }
 
         @Override
@@ -1387,8 +1384,20 @@ public class TopologyTestDriver implements Closeable {
                                                       final K keyTo,
                                                       final long timeFrom,
                                                       final long timeTo) {
-            return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
-                Instant.ofEpochMilli(timeTo));
+            return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public WindowStoreIterator<V> backwardFetch(final K key,
+                                                     final long timeFrom,
+                                                     final long timeTo) {
+            return backwardFetch(key, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom,
+                                                                 final long 
timeTo) {
+            return backwardFetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
         }
 
         @Override
@@ -1400,15 +1409,10 @@ public class TopologyTestDriver implements Closeable {
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
-                                                         final long timeTo) {
-            return fetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom,
-                                                                 final long 
timeTo) {
-            return backwardFetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        public void put(final K key,
+                        final V value,
+                        final long windowStartTimestamp) {
+            inner.put(key, ValueAndTimestamp.make(value, 
ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
         }
 
         @Override
@@ -1451,31 +1455,31 @@ public class TopologyTestDriver implements Closeable {
 
         @Override
         public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
-                                                              final long 
earliestSessionEndTime,
-                                                              final long 
latestSessionStartTime) {
+                                                             final long 
earliestSessionEndTime,
+                                                             final long 
latestSessionStartTime) {
             return new SessionStoreIteratorFacade<>(inner.findSessions(key, 
earliestSessionEndTime, latestSessionStartTime));
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
key,
-                                                                      final 
long earliestSessionEndTime,
-                                                                      final 
long latestSessionStartTime) {
+                                                                     final 
long earliestSessionEndTime,
+                                                                     final 
long latestSessionStartTime) {
             return new 
SessionStoreIteratorFacade<>(inner.backwardFindSessions(key, 
earliestSessionEndTime, latestSessionStartTime));
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
-                                                              final K keyTo,
-                                                              final long 
earliestSessionEndTime,
-                                                              final long 
latestSessionStartTime) {
+                                                             final K keyTo,
+                                                             final long 
earliestSessionEndTime,
+                                                             final long 
latestSessionStartTime) {
             return new 
SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime));
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
keyFrom,
-                                                                      final K 
keyTo,
-                                                                      final 
long earliestSessionEndTime,
-                                                                      final 
long latestSessionStartTime) {
+                                                                     final K 
keyTo,
+                                                                     final 
long earliestSessionEndTime,
+                                                                     final 
long latestSessionStartTime) {
             return new 
SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime));
         }
 
@@ -1488,7 +1492,7 @@ public class TopologyTestDriver implements Closeable {
 
         @Override
         public KeyValueIterator<Windowed<K>, V> findSessions(final long 
earliestSessionEndTime,
-                                                              final long 
latestSessionEndTime) {
+                                                             final long 
latestSessionEndTime) {
             return new 
SessionStoreIteratorFacade<>(inner.findSessions(earliestSessionEndTime, 
latestSessionEndTime));
         }
 
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
index 5942322f103..3fb7aa7d99b 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
@@ -138,4 +139,13 @@ public class KeyValueStoreFacadeTest {
         assertThat(keyValueStoreFacade.isOpen(), is(false));
         verify(mockedKeyValueTimestampStore, times(2)).isOpen();
     }
+
+    @Test
+    public void shouldReturnPosition() {
+        when(mockedKeyValueTimestampStore.getPosition())
+            .thenReturn(Position.emptyPosition());
+
+        assertThat(keyValueStoreFacade.getPosition(), 
is(Position.emptyPosition()));
+        verify(mockedKeyValueTimestampStore, times(1)).getPosition();
+    }
 }
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index 7b64dc6e374..9964e5cf6bb 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -18,14 +18,20 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.TopologyTestDriver.WindowStoreFacade;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.Instant;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -103,4 +109,68 @@ public class WindowStoreFacadeTest {
         verify(mockedWindowTimestampStore, times(2)).isOpen();
     }
 
+    @Test
+    public void shouldReturnPosition() {
+        when(mockedWindowTimestampStore.getPosition())
+            .thenReturn(Position.emptyPosition());
+
+        assertThat(windowStoreFacade.getPosition(), 
is(Position.emptyPosition()));
+        verify(mockedWindowTimestampStore, times(1)).getPosition();
+    }
+
+    @Test
+    public void shouldFetchTimeRangeAndConvertValues() {
+        @SuppressWarnings("unchecked")
+        final WindowStoreIterator<ValueAndTimestamp<String>> mockIterator = 
mock(WindowStoreIterator.class);
+        final long from = 100L;
+        final long to = 200L;
+
+        when(mockedWindowTimestampStore.fetch("key", 
Instant.ofEpochMilli(from), Instant.ofEpochMilli(to))).thenReturn(mockIterator);
+        when(mockIterator.hasNext()).thenReturn(true, true, false);
+        when(mockIterator.next())
+            .thenReturn(KeyValue.pair(100L, ValueAndTimestamp.make("value1", 
10L)))
+            .thenReturn(KeyValue.pair(150L, ValueAndTimestamp.make("value2", 
20L)));
+
+        try (final WindowStoreIterator<String> iterator = 
windowStoreFacade.fetch("key", from, to)) {
+            assertThat(iterator.next(), is(KeyValue.pair(100L, "value1")));
+            assertThat(iterator.next(), is(KeyValue.pair(150L, "value2")));
+        }
+    }
+
+    @Test
+    public void shouldFetchAllTimeRangeAndConvertValues() {
+        @SuppressWarnings("unchecked")
+        final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> 
mockIterator = mock(KeyValueIterator.class);
+        final long from = 100L;
+        final long to = 200L;
+        final Windowed<String> windowedKey = new Windowed<>("key", new 
TimeWindow(100L, 200L));
+
+        when(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(from), 
Instant.ofEpochMilli(to))).thenReturn(mockIterator);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next())
+            .thenReturn(KeyValue.pair(windowedKey, 
ValueAndTimestamp.make("value", 10L)));
+
+        try (final KeyValueIterator<Windowed<String>, String> iterator = 
windowStoreFacade.fetchAll(from, to)) {
+            assertThat(iterator.next(), is(KeyValue.pair(windowedKey, 
"value")));
+        }
+    }
+
+    @Test
+    public void shouldFetchKeyRangeTimeRangeAndConvertValues() {
+        @SuppressWarnings("unchecked")
+        final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> 
mockIterator = mock(KeyValueIterator.class);
+        final long from = 100L;
+        final long to = 200L;
+        final Windowed<String> windowedKey = new Windowed<>("key", new 
TimeWindow(100L, 200L));
+
+        when(mockedWindowTimestampStore.fetch("key", "key", 
Instant.ofEpochMilli(from), Instant.ofEpochMilli(to))).thenReturn(mockIterator);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next())
+            .thenReturn(KeyValue.pair(windowedKey, 
ValueAndTimestamp.make("value", 10L)));
+
+        try (final KeyValueIterator<Windowed<String>, String> iterator = 
windowStoreFacade.fetch("key", "key", from, to)) {
+            assertThat(iterator.next(), is(KeyValue.pair(windowedKey, 
"value")));
+        }
+    }
+
 }

Reply via email to