This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 826755a9b4f MINOR: improve generic type safety for IQv2 in 
metered-store layer (#21683)
826755a9b4f is described below

commit 826755a9b4f303021f4141977e49f12ccfde1ef8
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Mar 20 14:07:04 2026 -0700

    MINOR: improve generic type safety for IQv2 in metered-store layer (#21683)
    
    Reviewers: TengYao Chi <[email protected]>
---
 .../state/internals/MeteredKeyValueStore.java      |  23 +--
 .../state/internals/MeteredSessionStore.java       |  42 ++--
 .../internals/MeteredTimestampedKeyValueStore.java | 131 +++++++------
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 139 +++++++-------
 .../MeteredTimestampedWindowStoreWithHeaders.java  | 213 +++++++++++----------
 .../internals/MeteredVersionedKeyValueStore.java   | 103 ++++++----
 .../state/internals/MeteredWindowStore.java        |  59 +++---
 .../streams/state/internals/StoreQueryUtils.java   |  82 ++++----
 ...angeLoggingTimestampedWindowBytesStoreTest.java |   8 +-
 .../ChangeLoggingWindowBytesStoreTest.java         |   8 +-
 10 files changed, 416 insertions(+), 392 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index d94b8fd5253..5586b71d41b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -103,8 +103,7 @@ public class MeteredKeyValueStore<K, V>
     protected NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
 
 
-    @SuppressWarnings("rawtypes")
-    private final Map<Class, QueryHandler> queryHandlers =
+    private final Map<Class<?>, QueryHandler<?>> queryHandlers =
         mkMap(
             mkEntry(
                 RangeQuery.class,
@@ -230,31 +229,29 @@ public class MeteredKeyValueStore<K, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = time.nanoseconds();
         final QueryResult<R> result;
 
-        final QueryHandler handler = queryHandlers.get(query.getClass());
+        final QueryHandler<?> handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, config);
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
             }
         } else {
-            result = (QueryResult<R>) handler.apply(
+            result = ((QueryHandler<R>) handler).apply(
                 query,
                 positionBound,
                 config,
                 this
             );
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " with serdes "
-                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
             }
         }
         return result;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 9b4f05e44cf..5d678cbfd25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -84,14 +84,13 @@ public class MeteredSessionStore<K, V>
     protected final LongAdder numOpenIterators = new LongAdder();
     protected final NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
 
-    @SuppressWarnings("rawtypes")
-    private final Map<Class, QueryHandler> queryHandlers =
-            mkMap(
-                    mkEntry(
-                            WindowRangeQuery.class,
-                            (query, positionBound, config, store) -> 
runRangeQuery(query, positionBound, config)
-                    )
-            );
+    private final Map<Class<?>, QueryHandler<?>> queryHandlers =
+        mkMap(
+            mkEntry(
+                WindowRangeQuery.class,
+                (query, positionBound, config, store) -> runRangeQuery(query, 
positionBound, config)
+            )
+        );
 
 
     MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
@@ -445,39 +444,40 @@ public class MeteredSessionStore<K, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = time.nanoseconds();
         final QueryResult<R> result;
 
-        final QueryHandler handler = queryHandlers.get(query.getClass());
+        final QueryHandler<?> handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, config);
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
             }
         } else {
-            result = (QueryResult<R>) handler.apply(
+            result = ((QueryHandler<R>) handler).apply(
                 query,
                 positionBound,
                 config,
                 this
             );
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " with serdes "
-                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
             }
         }
         return result;
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
-                                             final PositionBound positionBound,
-                                             final QueryConfig config) {
+    private <R> QueryResult<R> runRangeQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) 
query;
         if (typedQuery.getKey().isPresent()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index bbc7214a842..72caa56d88e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 
 import java.util.Map;
 import java.util.function.Function;
@@ -67,26 +68,25 @@ public class MeteredTimestampedKeyValueStore<K, V>
         super(inner, metricScope, time, keySerde, valueSerde);
     }
 
-    @SuppressWarnings("rawtypes")
-    private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
-            mkMap(
-                    mkEntry(
-                            RangeQuery.class,
-                            (query, positionBound, config, store) -> 
runRangeQuery(query, positionBound, config)
-                    ),
-                    mkEntry(
-                            TimestampedRangeQuery.class,
-                            (query, positionBound, config, store) -> 
runTimestampedRangeQuery(query, positionBound, config)
-                    ),
-                    mkEntry(
-                            KeyQuery.class,
-                            (query, positionBound, config, store) -> 
runKeyQuery(query, positionBound, config)
-                    ),
-                    mkEntry(
-                            TimestampedKeyQuery.class,
-                            (query, positionBound, config, store) -> 
runTimestampedKeyQuery(query, positionBound, config)
-                    )
-            );
+    private final Map<Class<?>, QueryHandler<?>> queryHandlers =
+        mkMap(
+            mkEntry(
+                RangeQuery.class,
+                (query, positionBound, config, store) -> runRangeQuery(query, 
positionBound, config)
+            ),
+            mkEntry(
+                TimestampedRangeQuery.class,
+                (query, positionBound, config, store) -> 
runTimestampedRangeQuery(query, positionBound, config)
+            ),
+            mkEntry(
+                KeyQuery.class,
+                (query, positionBound, config, store) -> runKeyQuery(query, 
positionBound, config)
+            ),
+            mkEntry(
+                TimestampedKeyQuery.class,
+                (query, positionBound, config, store) -> 
runTimestampedKeyQuery(query, positionBound, config)
+            )
+        );
 
     @SuppressWarnings("unchecked")
     @Override
@@ -134,40 +134,40 @@ public class MeteredTimestampedKeyValueStore<K, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = time.nanoseconds();
         final QueryResult<R> result;
 
-        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        final QueryHandler<?> handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, config);
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
             }
         } else {
-            result = (QueryResult<R>) handler.apply(
-                    query,
-                    positionBound,
-                    config,
-                    this
+            result = ((QueryHandler<R>) handler).apply(
+                query,
+                positionBound,
+                config,
+                this
             );
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                        "Handled in " + getClass() + " with serdes "
-                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
             }
         }
         return result;
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
-                                                      final PositionBound 
positionBound,
-                                                      final QueryConfig 
config) {
+    private <R> QueryResult<R> runTimestampedKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
         final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(serializeKey(typedKeyQuery.key()));
@@ -176,7 +176,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
             final Function<byte[], ValueAndTimestamp<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
             final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+                
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -186,10 +186,11 @@ public class MeteredTimestampedKeyValueStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
-                                                        final PositionBound 
positionBound,
-                                                        final QueryConfig 
config) {
-
+    private <R> QueryResult<R> runTimestampedRangeQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
         RangeQuery<Bytes, byte[]> rawRangeQuery;
@@ -205,20 +206,21 @@ public class MeteredTimestampedKeyValueStore<K, V>
             rawRangeQuery = rawRangeQuery.withAscendingKeys();
         }
         final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-                wrapped().query(rawRangeQuery, positionBound, config);
+            wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
-            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = 
(KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreIterator(
+            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
+                (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreIterator(
                     iterator,
                     getSensor,
                     StoreQueryUtils.deserializeValue(serdes, wrapped()),
                     false
-            );
+                );
             final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
-                            rawResult,
-                            resultIterator
-                    );
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                    rawResult,
+                    resultIterator
+                );
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -228,9 +230,11 @@ public class MeteredTimestampedKeyValueStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                             final PositionBound positionBound,
-                                             final QueryConfig config) {
+    private <R> QueryResult<R> runKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
         final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(serializeKey(typedKeyQuery.getKey()));
@@ -241,7 +245,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
             final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
             final V plainValue = valueAndTimestamp == null ? null : 
valueAndTimestamp.value();
             final QueryResult<V> typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+                
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -251,10 +255,11 @@ public class MeteredTimestampedKeyValueStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    private  <R> QueryResult<R> runRangeQuery(final Query<R> query,
-                                               final PositionBound 
positionBound,
-                                               final QueryConfig config) {
-
+    private  <R> QueryResult<R> runRangeQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
         RangeQuery<Bytes, byte[]> rawRangeQuery;
@@ -270,7 +275,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
             rawRangeQuery = rawRangeQuery.withAscendingKeys();
         }
         final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-                wrapped().query(rawRangeQuery, positionBound, config);
+            wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
             final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreIterator(
@@ -280,10 +285,10 @@ public class MeteredTimestampedKeyValueStore<K, V>
                 true
             );
             final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
-                            rawResult,
-                            resultIterator
-                    );
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                    rawResult,
+                    resultIterator
+                );
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index f77471ad905..9d8ef5f3fd4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -69,16 +70,17 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
     extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
     implements TimestampedKeyValueStoreWithHeaders<K, V> {
 
-    MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
-                                               final String metricScope,
-                                               final Time time,
-                                               final Serde<K> keySerde,
-                                               final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+    MeteredTimestampedKeyValueStoreWithHeaders(
+        final KeyValueStore<Bytes, byte[]> inner,
+        final String metricScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<ValueTimestampHeaders<V>> valueSerde
+    ) {
         super(inner, metricScope, time, keySerde, valueSerde);
     }
 
-    @SuppressWarnings("rawtypes")
-    private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
+    private final Map<Class<?>, QueryHandler<?>> queryHandlers =
         mkMap(
             mkEntry(
                 KeyQuery.class,
@@ -284,55 +286,54 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
      * @param config the query configuration
      * @return the query result
      */
-
     @SuppressWarnings("unchecked")
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = time.nanoseconds();
         final QueryResult<R> result;
 
-        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        final QueryHandler<?> handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, config);
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
             }
         } else {
-            result = (QueryResult<R>) handler.apply(
+            result = ((QueryHandler<R>) handler).apply(
                 query,
                 positionBound,
                 config,
                 this
             );
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes "
-                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
             }
         }
         return result;
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                           final PositionBound positionBound,
-                                           final QueryConfig config) {
+    private <R> QueryResult<R> runKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
-        final KeyQuery<Bytes, byte[]> rawKeyQuery =
-            KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), 
internalContext.headers()));
-        final QueryResult<byte[]> rawResult =
-            wrapped().query(rawKeyQuery, positionBound, config);
+
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), 
internalContext.headers()));
+        final QueryResult<byte[]> rawResult = wrapped().query(rawKeyQuery, 
positionBound, config);
         if (rawResult.isSuccess()) {
             // value will be `rawValueTimestampHeader`; no need to pass 
headers explicitly
             final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
             final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
             final QueryResult<V> typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+                
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -342,25 +343,27 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
-                                                      final PositionBound 
positionBound,
-                                                      final QueryConfig 
config) {
+    private <R> QueryResult<R> runTimestampedKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
-        final KeyQuery<Bytes, byte[]> rawKeyQuery =
-                KeyQuery.withKey(serializeKey(typedKeyQuery.key(), 
internalContext.headers()));
-        final QueryResult<byte[]> rawResult =
-                wrapped().query(rawKeyQuery, positionBound, config);
+
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers()));
+        final QueryResult<byte[]> rawResult = wrapped().query(rawKeyQuery, 
positionBound, config);
         if (rawResult.isSuccess()) {
             // value will be `rawValueTimestampHeader`; no need to pass 
headers explicitly
             final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
             // Convert ValueTimestampHeaders to ValueAndTimestamp for the 
result
-            final ValueAndTimestamp<V> valueAndTimestamp = 
valueTimestampHeaders == null
+            final ValueAndTimestamp<V> valueAndTimestamp =
+                valueTimestampHeaders == null
                     ? null
                     : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp());
             final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+                
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -370,12 +373,14 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
-                                             final PositionBound positionBound,
-                                             final QueryConfig config) {
-
+    private <R> QueryResult<R> runRangeQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+
         RangeQuery<Bytes, byte[]> rawRangeQuery;
         final ResultOrder order = typedQuery.resultOrder();
         rawRangeQuery = RangeQuery.withRange(
@@ -388,21 +393,21 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         if (order.equals(ResultOrder.ASCENDING)) {
             rawRangeQuery = rawRangeQuery.withAscendingKeys();
         }
-        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-            wrapped().query(rawRangeQuery, positionBound, config);
+
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = 
wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
             final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
-                    iterator,
-                    getSensor,
-                    // value will be `rawValueTimestampHeader`; no need to 
pass headers explicitly
-                    StoreQueryUtils.deserializeValue(serdes, wrapped()),
-                    true
+                iterator,
+                getSensor,
+                // value will be `rawValueTimestampHeader`; no need to pass 
headers explicitly
+                StoreQueryUtils.deserializeValue(serdes, wrapped()),
+                true
             );
             final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
                 InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
-                        rawResult,
-                        resultIterator
+                    rawResult,
+                    resultIterator
                 );
             result = (QueryResult<R>) typedQueryResult;
         } else {
@@ -413,12 +418,14 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
-                                                        final PositionBound 
positionBound,
-                                                        final QueryConfig 
config) {
-
+    private <R> QueryResult<R> runTimestampedRangeQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
         RangeQuery<Bytes, byte[]> rawRangeQuery;
         final ResultOrder order = typedQuery.resultOrder();
         rawRangeQuery = RangeQuery.withRange(
@@ -431,23 +438,23 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         if (order.equals(ResultOrder.ASCENDING)) {
             rawRangeQuery = rawRangeQuery.withAscendingKeys();
         }
-        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-                wrapped().query(rawRangeQuery, positionBound, config);
+
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = 
wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
             final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
-                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
-                            iterator,
-                            getSensor,
-                            // value will be `rawValueTimestampHeader`; no 
need to pass headers explicitly
-                            StoreQueryUtils.deserializeValue(serdes, 
wrapped()),
-                            false
-            );
+                (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
+                    iterator,
+                    getSensor,
+                    // value will be `rawValueTimestampHeader`; no need to 
pass headers explicitly
+                    StoreQueryUtils.deserializeValue(serdes, wrapped()),
+                    false
+                );
             final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
-                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
-                            rawResult,
-                            resultIterator
-                    );
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                    rawResult,
+                    resultIterator
+                );
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no 
result set.
@@ -670,4 +677,4 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
     protected K deserializeKey(final byte[] rawKey, final Headers headers) {
         return serdes.keyFrom(rawKey, headers);
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index e88ec3398d5..6dc01498ff2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -46,7 +46,6 @@ import java.util.Objects;
 import java.util.function.Function;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
 
 /**
  * A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used 
for recording operation metrics,
@@ -61,12 +60,14 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
     extends MeteredWindowStore<K, ValueTimestampHeaders<V>>
     implements TimestampedWindowStoreWithHeaders<K, V> {
 
-    MeteredTimestampedWindowStoreWithHeaders(final WindowStore<Bytes, byte[]> 
inner,
-                                             final long windowSizeMs,
-                                             final String metricScope,
-                                             final Time time,
-                                             final Serde<K> keySerde,
-                                             final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+    MeteredTimestampedWindowStoreWithHeaders(
+        final WindowStore<Bytes, byte[]> inner,
+        final long windowSizeMs,
+        final String metricScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<ValueTimestampHeaders<V>> valueSerde
+    ) {
         super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
     }
 
@@ -110,7 +111,7 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
                             internalContext.setRecordContext(currentContext);
                         }
                     } else {
-                        final Headers headers = value.headers() == null ? new 
RecordHeaders() : value.headers();
+                        final Headers headers = value.headers();
                         wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers), windowStartTimestamp);
                     }
                 },
@@ -130,9 +131,11 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
         final QueryResult<R> result;
 
@@ -160,10 +163,13 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
      * ValueTimestampHeaders to either ValueAndTimestamp<V> (for timestamped 
stores) or V (for non-timestamped stores).
      */
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runWindowKeyQuery(final WindowKeyQuery<K, 
ValueTimestampHeaders<V>> query,
-                                                 final PositionBound 
positionBound,
-                                                 final QueryConfig config) {
+    private <R> QueryResult<R> runWindowKeyQuery(
+        final WindowKeyQuery<K, ValueTimestampHeaders<V>> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> queryResult;
+
         if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
             final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
                     WindowKeyQuery.withKeyAndWindowStartRange(
@@ -175,45 +181,29 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
             if (rawResult.isSuccess()) {
                 if (isUnderlyingStoreTimestamped()) {
                     // For timestamped stores, return ValueAndTimestamp<V>
-                    final Function<byte[], ValueAndTimestamp<V>> valueFrom = 
bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
-                    };
-
-                    final MeteredWindowStoreIterator<ValueAndTimestamp<V>> 
typedResult =
-                            new MeteredWindowStoreIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                    final MeteredWindowStoreIterator<ValueAndTimestamp<V>> 
typedResult = meteredIterator(
+                        rawResult,
+                        bytes -> {
+                            final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
+                            return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+                        }
+                    );
+
                     final 
QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult =
-                            
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     queryResult = (QueryResult<R>) typedQueryResult;
                 } else {
                     // For non-timestamped stores, return plain V
-                    final Function<byte[], V> valueFrom = bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : vth.value();
-                    };
-
-                    final MeteredWindowStoreIterator<V> typedResult =
-                            new MeteredWindowStoreIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                    final MeteredWindowStoreIterator<V> typedResult = 
meteredIterator(
+                        rawResult,
+                        bytes -> {
+                            final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
+                            return vth == null ? null : vth.value();
+                        }
+                    );
+
                     final QueryResult<MeteredWindowStoreIterator<V>> 
typedQueryResult =
-                            
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     queryResult = (QueryResult<R>) typedQueryResult;
                 }
             } else {
@@ -221,80 +211,80 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
             }
         } else {
             queryResult = QueryResult.forFailure(
-                    FailureReason.UNKNOWN_QUERY_TYPE,
-                    "This store (" + getClass() + ") doesn't know how to 
execute"
-                        + " the given query (" + query + ") because it only 
supports closed-range"
-                        + " queries."
-                        + " Contact the store maintainer if you need support 
for a new query type."
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to execute"
+                    + " the given query (" + query + ") because it only 
supports closed-range"
+                    + " queries."
+                    + " Contact the store maintainer if you need support for a 
new query type."
             );
         }
         return queryResult;
     }
 
+    private <ValueType> MeteredWindowStoreIterator<ValueType> meteredIterator(
+        final QueryResult<WindowStoreIterator<byte[]>> rawResult,
+        final Function<byte[], ValueType> valueDeserializer
+    ) {
+        return new MeteredWindowStoreIterator<>(
+            rawResult.getResult(),
+            fetchSensor,
+            iteratorDurationSensor,
+            streamsMetrics,
+            valueDeserializer,
+            time,
+            numOpenIterators,
+            openIterators
+        );
+    }
 
     /**
      * Handles WindowRangeQuery by creating a MeteredWindowedKeyValueIterator 
with conversion from
      * ValueTimestampHeaders to either ValueAndTimestamp<V> (for timestamped 
stores) or V (for non-timestamped stores).
      */
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runWindowRangeQuery(final WindowRangeQuery<K, 
ValueTimestampHeaders<V>> query,
-                                                   final PositionBound 
positionBound,
-                                                   final QueryConfig config) {
+    private <R> QueryResult<R> runWindowRangeQuery(
+        final WindowRangeQuery<K, ValueTimestampHeaders<V>> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
+
         if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
             final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
-                    WindowRangeQuery.withWindowStartRange(
-                        query.getTimeFrom().get(),
-                        query.getTimeTo().get()
-                    );
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
-                    wrapped().query(rawKeyQuery, positionBound, config);
-            if (rawResult.isSuccess()) {
-                final Function<byte[], K> keyFrom = bytes -> 
serdes.keyFrom(bytes, new RecordHeaders());
+                WindowRangeQuery.withWindowStartRange(
+                    query.getTimeFrom().get(),
+                    query.getTimeTo().get()
+                );
 
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult = wrapped().query(rawKeyQuery, positionBound, config);
+            if (rawResult.isSuccess()) {
                 if (isUnderlyingStoreTimestamped()) {
                     // For timestamped stores, return ValueAndTimestamp<V>
-                    final Function<byte[], ValueAndTimestamp<V>> valueFrom = 
bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
-                    };
-
                     final MeteredWindowedKeyValueIterator<K, 
ValueAndTimestamp<V>> typedResult =
-                            new MeteredWindowedKeyValueIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                keyFrom,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                        meteredWindowedIterator(
+                            rawResult,
+                            bytes -> {
+                                final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
+                                return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+                            }
+                        );
+
                     final QueryResult<MeteredWindowedKeyValueIterator<K, 
ValueAndTimestamp<V>>> typedQueryResult =
-                            
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     result = (QueryResult<R>) typedQueryResult;
                 } else {
                     // For non-timestamped stores, return plain V
-                    final Function<byte[], V> valueFrom = bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : vth.value();
-                    };
-
                     final MeteredWindowedKeyValueIterator<K, V> typedResult =
-                            new MeteredWindowedKeyValueIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                keyFrom,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                        meteredWindowedIterator(
+                            rawResult,
+                            bytes -> {
+                                final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
+                                return vth == null ? null : vth.value();
+                            }
+                        );
+
                     final QueryResult<MeteredWindowedKeyValueIterator<K, V>> 
typedQueryResult =
-                            
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     result = (QueryResult<R>) typedQueryResult;
                 }
             } else {
@@ -302,11 +292,11 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
             }
         } else {
             result = QueryResult.forFailure(
-                    FailureReason.UNKNOWN_QUERY_TYPE,
-                    "This store (" + getClass() + ") doesn't know how to"
-                        + " execute the given query (" + query + ") because"
-                        + " WindowStores only supports 
WindowRangeQuery.withWindowStartRange."
-                        + " Contact the store maintainer if you need support 
for a new query type."
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to"
+                    + " execute the given query (" + query + ") because"
+                    + " WindowStores only supports 
WindowRangeQuery.withWindowStartRange."
+                    + " Contact the store maintainer if you need support for a 
new query type."
             );
         }
         return result;
@@ -433,6 +423,23 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
         }
     }
 
+    private <ValueType> MeteredWindowedKeyValueIterator<K, ValueType> 
meteredWindowedIterator(
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult,
+        final Function<byte[], ValueType> valueDeserializer
+    ) {
+        return new MeteredWindowedKeyValueIterator<>(
+            rawResult.getResult(),
+            fetchSensor,
+            iteratorDurationSensor,
+            streamsMetrics,
+            bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
+            valueDeserializer,
+            time,
+            numOpenIterators,
+            openIterators
+        );
+    }
+
     private boolean isUnderlyingStoreTimestamped() {
         Object store = wrapped();
         do {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 0cb35fc63b7..9cd8b9e1861 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -71,11 +71,13 @@ public class MeteredVersionedKeyValueStore<K, V>
 
     private final MeteredVersionedKeyValueStoreInternal internal;
 
-    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
-                                  final String metricScope,
-                                  final Time time,
-                                  final Serde<K> keySerde,
-                                  final Serde<V> valueSerde) {
+    MeteredVersionedKeyValueStore(
+        final VersionedBytesStore inner,
+        final String metricScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         super(inner);
         internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
     }
@@ -95,14 +97,13 @@ public class MeteredVersionedKeyValueStore<K, V>
      * Note that the addition of {@link #get(Object, long)} and {@link 
#delete(Object, long)} in
      * this class are to match the interface of {@link VersionedKeyValueStore}.
      */
-    private class MeteredVersionedKeyValueStoreInternal
-        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
+    private class MeteredVersionedKeyValueStoreInternal extends 
MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
         private final Serde<V> plainValueSerde;
         private StateSerdes<K, V> plainValueSerdes;
 
-        private final Map<Class<?>, QueryHandler> queryHandlers =
+        private final Map<Class<?>, QueryHandler<?>> queryHandlers =
             mkMap(
                 mkEntry(
                     RangeQuery.class,
@@ -122,11 +123,13 @@ public class MeteredVersionedKeyValueStore<K, V>
                 )
             );
 
-        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
-                                              final String metricScope,
-                                              final Time time,
-                                              final Serde<K> keySerde,
-                                              final Serde<V> valueSerde) {
+        MeteredVersionedKeyValueStoreInternal(
+            final VersionedBytesStore inner,
+            final String metricScope,
+            final Time time,
+            final Serde<K> keySerde,
+            final Serde<V> valueSerde
+        ) {
             super(
                 inner,
                 metricScope,
@@ -182,56 +185,62 @@ public class MeteredVersionedKeyValueStore<K, V>
 
         @SuppressWarnings("unchecked")
         @Override
-        public <R> QueryResult<R> query(final Query<R> query, final 
PositionBound positionBound, final QueryConfig config) {
-
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
             final long start = time.nanoseconds();
             final QueryResult<R> result;
 
-            final QueryHandler handler = queryHandlers.get(query.getClass());
+            final QueryHandler<?> handler = 
queryHandlers.get(query.getClass());
             if (handler == null) {
                 result = wrapped().query(query, positionBound, config);
                 if (config.isCollectExecutionInfo()) {
-                    result.addExecutionInfo(
-                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+                    result.addExecutionInfo("Handled in " + getClass() + " in 
" + (time.nanoseconds() - start) + "ns");
                 }
             } else {
-                result = (QueryResult<R>) handler.apply(
+                result = ((QueryHandler<R>) handler).apply(
                     query,
                     positionBound,
                     config,
                     this
                 );
                 if (config.isCollectExecutionInfo()) {
-                    result.addExecutionInfo(
-                        "Handled in " + getClass() + " with serdes "
-                            + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+                    result.addExecutionInfo("Handled in " + getClass() + " 
with serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
                 }
             }
             return result;
         }
 
         @SuppressWarnings("unused")
-        private <R> QueryResult<R> runRangeQuery(final Query<R> query,
-                                                 final PositionBound 
positionBound,
-                                                 final QueryConfig config) {
+        private <R> QueryResult<R> runRangeQuery(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
             // throw exception for now to reserve the ability to implement 
this in the future
             // without clashing with users' custom implementations in the 
meantime
             throw new UnsupportedOperationException("Versioned stores do not 
support RangeQuery queries at this time.");
         }
 
         @SuppressWarnings("unused")
-        private <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                               final PositionBound 
positionBound,
-                                               final QueryConfig config) {
+        private <R> QueryResult<R> runKeyQuery(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
             // throw exception for now to reserve the ability to implement 
this in the future
             // without clashing with users' custom implementations in the 
meantime
             throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
         }
 
         @SuppressWarnings("unchecked")
-        private <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query,
-                                                          final PositionBound 
positionBound,
-                                                          final QueryConfig 
config) {
+        private <R> QueryResult<R> runVersionedKeyQuery(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
             final QueryResult<R> result;
             final VersionedKeyQuery<K, V> typedKeyQuery = 
(VersionedKeyQuery<K, V>) query;
             VersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
VersionedKeyQuery.withKey(serializeKey(typedKeyQuery.key()));
@@ -253,7 +262,11 @@ public class MeteredVersionedKeyValueStore<K, V>
         }
 
         @SuppressWarnings("unchecked")
-        private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query, final PositionBound positionBound, final QueryConfig config) {
+        private <R> QueryResult<R> runMultiVersionedKeyQuery(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
             final QueryResult<R> result;
             final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
 
@@ -273,16 +286,16 @@ public class MeteredVersionedKeyValueStore<K, V>
             final QueryResult<VersionedRecordIterator<byte[]>> rawResult = 
wrapped().query(rawKeyQuery, positionBound, config);
             if (rawResult.isSuccess()) {
                 final MeteredMultiVersionedKeyQueryIterator<V> typedResult =
-                        new MeteredMultiVersionedKeyQueryIterator<>(
-                            rawResult.getResult(),
-                            iteratorDurationSensor,
-                            time,
-                            StoreQueryUtils.deserializeValue(plainValueSerdes),
-                            numOpenIterators,
-                            openIterators
-                        );
+                    new MeteredMultiVersionedKeyQueryIterator<>(
+                        rawResult.getResult(),
+                        iteratorDurationSensor,
+                        time,
+                        StoreQueryUtils.deserializeValue(plainValueSerdes),
+                        numOpenIterators,
+                        openIterators
+                    );
                 final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> 
typedQueryResult =
-                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                 result = (QueryResult<R>) typedQueryResult;
             } else {
                 // the generic type doesn't matter, since failed queries have 
no result set.
@@ -312,7 +325,13 @@ public class MeteredVersionedKeyValueStore<K, V>
             final String storeName = super.name();
             final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
             plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(
-                context, storeName, changelogTopic, keySerde, plainValueSerde, 
WrappingNullableUtils::prepareValueSerde);
+                context,
+                storeName,
+                changelogTopic,
+                keySerde,
+                plainValueSerde,
+                WrappingNullableUtils::prepareValueSerde
+            );
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 8df7fbe161e..2929c2b654a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -86,8 +86,7 @@ public class MeteredWindowStore<K, V>
     protected final LongAdder numOpenIterators = new LongAdder();
     protected final NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
 
-    @SuppressWarnings("rawtypes")
-    private final Map<Class, QueryHandler> queryHandlers =
+    private final Map<Class<?>, QueryHandler<?>> queryHandlers =
         mkMap(
             mkEntry(
                 WindowRangeQuery.class,
@@ -107,12 +106,14 @@ public class MeteredWindowStore<K, V>
             )
         );
 
-    MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
-                       final long windowSizeMs,
-                       final String metricsScope,
-                       final Time time,
-                       final Serde<K> keySerde,
-                       final Serde<V> valueSerde) {
+    MeteredWindowStore(
+        final WindowStore<Bytes, byte[]> inner,
+        final long windowSizeMs,
+        final String metricsScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         super(inner);
         this.windowSizeMs = windowSizeMs;
         this.metricsScope = metricsScope;
@@ -122,8 +123,7 @@ public class MeteredWindowStore<K, V>
     }
 
     @Override
-    public void init(final StateStoreContext stateStoreContext,
-                     final StateStore root) {
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
         internalContext = stateStoreContext instanceof 
InternalProcessorContext ? (InternalProcessorContext<?, ?>) stateStoreContext : 
null;
         taskId = stateStoreContext.taskId();
         initStoreSerde(stateStoreContext);
@@ -136,6 +136,7 @@ public class MeteredWindowStore<K, V>
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, 
restoreSensor);
     }
+
     protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final 
SerdeGetter getter) {
         return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
     }
@@ -397,39 +398,40 @@ public class MeteredWindowStore<K, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final long start = time.nanoseconds();
         final QueryResult<R> result;
 
-        final QueryHandler handler = queryHandlers.get(query.getClass());
+        final QueryHandler<?> handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, config);
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
             }
         } else {
-            result = (QueryResult<R>) handler.apply(
+            result = ((QueryHandler<R>) handler).apply(
                 query,
                 positionBound,
                 config,
                 this
             );
             if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + getClass() + " with serdes "
-                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
             }
         }
         return result;
     }
 
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
-                                             final PositionBound positionBound,
-                                             final QueryConfig config) {
+    private <R> QueryResult<R> runRangeQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) 
query;
         // There's no store API for open time ranges
@@ -466,7 +468,6 @@ public class MeteredWindowStore<K, V>
                 result = (QueryResult<R>) rawResult;
             }
         } else {
-
             result = QueryResult.forFailure(
                 FailureReason.UNKNOWN_QUERY_TYPE,
                 "This store (" + getClass() + ") doesn't know how to"
@@ -479,11 +480,12 @@ public class MeteredWindowStore<K, V>
         return result;
     }
 
-
     @SuppressWarnings("unchecked")
-    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                           final PositionBound positionBound,
-                                           final QueryConfig config) {
+    private <R> QueryResult<R> runKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config
+    ) {
         final QueryResult<R> queryResult;
         final WindowKeyQuery<K, V> typedQuery = (WindowKeyQuery<K, V>) query;
         // There's no store API for open time ranges
@@ -518,7 +520,6 @@ public class MeteredWindowStore<K, V>
                 queryResult = (QueryResult<R>) rawResult;
             }
         } else {
-
             queryResult = QueryResult.forFailure(
                 FailureReason.UNKNOWN_QUERY_TYPE,
                 "This store (" + getClass() + ") doesn't know how to execute"
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 0330ebd05de..08a5bf8e8d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -71,17 +71,16 @@ public final class StoreQueryUtils {
      * in a map.
      */
     @FunctionalInterface
-    public interface QueryHandler {
-        QueryResult<?> apply(
-            final Query<?> query,
+    public interface QueryHandler<R> {
+        QueryResult<R> apply(
+            final Query<R> query,
             final PositionBound positionBound,
             final QueryConfig config,
             final StateStore store
         );
     }
 
-    @SuppressWarnings("rawtypes")
-    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
+    private static final Map<Class<?>, QueryHandler<?>> QUERY_HANDLER_MAP =
         mkMap(
             mkEntry(
                 RangeQuery.class,
@@ -110,9 +109,8 @@ public final class StoreQueryUtils {
         );
 
     // make this class uninstantiable
+    private StoreQueryUtils() { }
 
-    private StoreQueryUtils() {
-    }
     @SuppressWarnings("unchecked")
     public static <R> QueryResult<R> handleBasicQueries(
         final Query<R> query,
@@ -126,7 +124,7 @@ public final class StoreQueryUtils {
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
         final QueryResult<R> result;
 
-        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());
+        final QueryHandler<?> handler = 
QUERY_HANDLER_MAP.get(query.getClass());
         synchronized (position) {
             if (handler == null) {
                 result = QueryResult.forUnknownQueryType(query, store);
@@ -137,7 +135,7 @@ public final class StoreQueryUtils {
                     context == null ? null : context.taskId().partition()
                 );
             } else {
-                result = (QueryResult<R>) handler.apply(
+                result = ((QueryHandler<R>) handler).apply(
                     query,
                     positionBound,
                     config,
@@ -154,10 +152,7 @@ public final class StoreQueryUtils {
         return result;
     }
 
-    public static void updatePosition(
-        final Position position,
-        final StateStoreContext stateStoreContext) {
-
+    public static void updatePosition(final Position position, final 
StateStoreContext stateStoreContext) {
         if (stateStoreContext != null && 
stateStoreContext.recordMetadata().isPresent()) {
             final RecordMetadata meta = 
stateStoreContext.recordMetadata().get();
             if (meta.topic() != null) {
@@ -363,10 +358,12 @@ public final class StoreQueryUtils {
     }
 
     @SuppressWarnings("unchecked")
-    private static <R> QueryResult<R> runVersionedKeyQuery(final Query<R> 
query,
-                                                           final PositionBound 
positionBound,
-                                                           final QueryConfig 
config,
-                                                           final StateStore 
store) {
+    private static <R> QueryResult<R> runVersionedKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config,
+        final StateStore store
+    ) {
         if (store instanceof VersionedKeyValueStore) {
             final VersionedKeyValueStore<Bytes, byte[]> versionedKeyValueStore 
=
                 (VersionedKeyValueStore<Bytes, byte[]>) store;
@@ -388,27 +385,29 @@ public final class StoreQueryUtils {
                     message
                 );
             }
-
         } else {
             return QueryResult.forUnknownQueryType(query, store);
         }
     }
 
     @SuppressWarnings("unchecked")
-    private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,
-                                                                final 
PositionBound positionBound,
-                                                                final 
QueryConfig config,
-                                                                final 
StateStore store) {
-
+    private static <R> QueryResult<R> runMultiVersionedKeyQuery(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config,
+        final StateStore store
+    ) {
         if (store instanceof VersionedKeyValueStore) {
             final RocksDBVersionedStore rocksDBVersionedStore = 
(RocksDBVersionedStore) store;
             final MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
(MultiVersionedKeyQuery<Bytes, byte[]>) query;
             try {
                 final VersionedRecordIterator<byte[]> segmentIterator =
-                        rocksDBVersionedStore.get(rawKeyQuery.key(),
-                                                  
rawKeyQuery.fromTime().get().toEpochMilli(),
-                                                  
rawKeyQuery.toTime().get().toEpochMilli(),
-                                                  rawKeyQuery.resultOrder());
+                        rocksDBVersionedStore.get(
+                            rawKeyQuery.key(),
+                            rawKeyQuery.fromTime().get().toEpochMilli(),
+                            rawKeyQuery.toTime().get().toEpochMilli(),
+                            rawKeyQuery.resultOrder()
+                        );
                 return (QueryResult<R>) QueryResult.forResult(segmentIterator);
             } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
@@ -450,16 +449,19 @@ public final class StoreQueryUtils {
     public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> 
deserializeValue(final StateSerdes<?, V> serdes) {
         final Serde<V> valueSerde = serdes.valueSerde();
         final Deserializer<V> deserializer = valueSerde.deserializer();
-        return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent()
-            ? new VersionedRecord<>(
-                // deserializeValue s only used via IQ, so it's ok to not pass 
any headers
-                deserializer.deserialize(serdes.topic(), new RecordHeaders(), 
rawVersionedRecord.value()),
-                rawVersionedRecord.timestamp(),
-                rawVersionedRecord.validTo().get())
-            : new VersionedRecord<>(
-                // deserializeValue s only used via IQ, so it's ok to not pass 
any headers
-                deserializer.deserialize(serdes.topic(), new RecordHeaders(), 
rawVersionedRecord.value()),
-                rawVersionedRecord.timestamp());
+        return rawVersionedRecord ->
+            rawVersionedRecord.validTo().isPresent()
+                ? new VersionedRecord<>(
+                      // deserializeValue s only used via IQ, so it's ok to 
not pass any headers
+                      deserializer.deserialize(serdes.topic(), new 
RecordHeaders(), rawVersionedRecord.value()),
+                      rawVersionedRecord.timestamp(),
+                      rawVersionedRecord.validTo().get()
+                  )
+                : new VersionedRecord<>(
+                      // deserializeValue s only used via IQ, so it's ok to 
not pass any headers
+                      deserializer.deserialize(serdes.topic(), new 
RecordHeaders(), rawVersionedRecord.value()),
+                      rawVersionedRecord.timestamp()
+                  );
     }
 
     @SuppressWarnings("resource")
@@ -504,8 +506,7 @@ public final class StoreQueryUtils {
     private static Position topicPartitionMapToPosition(final 
Map<TopicPartition, Long> topicPartitions) {
         final Map<String, Map<Integer, Long>> pos = new HashMap<>();
         for (final Entry<TopicPartition, Long> e : topicPartitions.entrySet()) 
{
-            pos
-                .computeIfAbsent(e.getKey().topic(), t -> new HashMap<>())
+            pos.computeIfAbsent(e.getKey().topic(), t -> new HashMap<>())
                 .put(e.getKey().partition(), e.getValue());
         }
         return Position.fromMap(pos);
@@ -514,8 +515,7 @@ public final class StoreQueryUtils {
     private static <R> String parseStoreException(final Exception e, final 
StateStore store, final Query<R> query) {
         final StringWriter stringWriter = new StringWriter();
         final PrintWriter printWriter = new PrintWriter(stringWriter);
-        printWriter.println(
-            store.getClass() + " failed to handle query " + query + ":");
+        printWriter.println(store.getClass() + " failed to handle query " + 
query + ":");
         e.printStackTrace(printWriter);
         printWriter.flush();
         return stringWriter.toString();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index fae6a7fe379..67186489f28 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -65,14 +64,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
         store.init(context, store);
     }
 
-    @AfterEach
-    public void tearDown() {
-        verify(inner).init(context, store);
-    }
-
     @Test
     public void shouldDelegateInit() {
-        // testing the combination of setUp and tearDown
+        verify(inner).init(context, store);
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 39c93b86fe7..fabca5e3bdb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -64,14 +63,9 @@ public class ChangeLoggingWindowBytesStoreTest {
         store.init(context, store);
     }
 
-    @AfterEach
-    public void tearDown() {
-        verify(inner).init(context, store);
-    }
-
     @Test
     public void shouldDelegateInit() {
-        // testing the combination of setUp and tearDown
+        verify(inner).init(context, store);
     }
 
     @Test

Reply via email to