This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 128aabc1c7e KAFKA-20173: Ensure Metered window-stores pass headers 
correctly (#21936)
128aabc1c7e is described below

commit 128aabc1c7ee9e5a1248013f3f594fabce5097ae
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Apr 2 15:05:04 2026 -0700

    KAFKA-20173: Ensure Metered window-stores pass headers correctly (#21936)
    
    Ensures that all Metered Window-stores (plain, ts, headers) pass headers
    into de/serializers.
    
    Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
     <[email protected]>, Bill Bejeck <[email protected]>
---
 .../state/internals/MeteredKeyValueStore.java      |  19 +-
 .../state/internals/MeteredSessionStore.java       |  10 -
 .../internals/MeteredSessionStoreWithHeaders.java  |   1 -
 ...MeteredTimestampedKeyValueStoreWithHeaders.java |   6 +-
 .../internals/MeteredTimestampedWindowStore.java   |  14 +-
 .../MeteredTimestampedWindowStoreWithHeaders.java  | 242 +++++++++++++--------
 .../state/internals/MeteredWindowStore.java        | 182 ++++++++--------
 .../internals/MeteredWindowStoreIterator.java      |   4 -
 .../internals/MeteredWindowedKeyValueIterator.java |  10 +-
 .../streams/state/internals/WindowKeySchema.java   |  11 -
 .../kstream/internals/KTableAggregateTest.java     |   7 +-
 ...teredTimestampedWindowStoreWithHeadersTest.java |  21 +-
 12 files changed, 286 insertions(+), 241 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 2c291ac9306..452b662733d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -227,7 +227,8 @@ public class MeteredKeyValueStore<K, V>
                             ))
                     );
                 },
-                sendOldValues);
+                sendOldValues
+            );
         }
         return false;
     }
@@ -440,14 +441,6 @@ public class MeteredKeyValueStore<K, V>
         }
     }
 
-    protected byte[] serializeValue(final V value) {
-        return value != null ? serdes.rawValue(value, 
internalContext.headers()) : null;
-    }
-
-    protected V deserializeValue(final byte[] rawValue) {
-        return rawValue != null ? serdes.valueFrom(rawValue, 
internalContext.headers()) : null;
-    }
-
     protected Bytes serializeKey(final K key) {
         return Bytes.wrap(serdes.rawKey(key, internalContext.headers()));
     }
@@ -456,6 +449,14 @@ public class MeteredKeyValueStore<K, V>
         return serdes.keyFrom(rawKey, internalContext.headers());
     }
 
+    protected byte[] serializeValue(final V value) {
+        return value != null ? serdes.rawValue(value, 
internalContext.headers()) : null;
+    }
+
+    protected V deserializeValue(final byte[] rawValue) {
+        return rawValue != null ? serdes.valueFrom(rawValue, 
internalContext.headers()) : null;
+    }
+
     private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
V>> from) {
         final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
         for (final KeyValue<K, V> entry : from) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 3d0e85474b0..a838ceab680 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -271,7 +271,6 @@ public class MeteredSessionStore<K, V>
             wrapped().fetch(keyBytes(key)),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -286,7 +285,6 @@ public class MeteredSessionStore<K, V>
             wrapped().backwardFetch(keyBytes(key)),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -302,7 +300,6 @@ public class MeteredSessionStore<K, V>
             wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -317,7 +314,6 @@ public class MeteredSessionStore<K, V>
             wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -339,7 +335,6 @@ public class MeteredSessionStore<K, V>
                 latestSessionStartTime),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -361,7 +356,6 @@ public class MeteredSessionStore<K, V>
             ),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -385,7 +379,6 @@ public class MeteredSessionStore<K, V>
                 latestSessionStartTime),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -400,7 +393,6 @@ public class MeteredSessionStore<K, V>
                 wrapped().findSessions(earliestSessionEndTime, 
latestSessionEndTime),
                 fetchSensor,
                 iteratorDurationSensor,
-                streamsMetrics,
                 bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
                 serdes::valueFrom,
                 time,
@@ -424,7 +416,6 @@ public class MeteredSessionStore<K, V>
             ),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
             bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
             serdes::valueFrom,
             time,
@@ -497,7 +488,6 @@ public class MeteredSessionStore<K, V>
                         rawResult.getResult(),
                         fetchSensor,
                         iteratorDurationSensor,
-                        streamsMetrics,
                         bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
                         StoreQueryUtils.deserializeValue(serdes, wrapped()),
                         time,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index b74ebd9ec88..45a1b17ecc8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -304,7 +304,6 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
                     rawResult.getResult(),
                     fetchSensor,
                     iteratorDurationSensor,
-                    streamsMetrics,
                     bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
                     byteArray -> {
                         final AggregationWithHeaders<AGG> awh =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index e93aad8c8f2..f3865cfc641 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -149,7 +149,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                             internalContext.setRecordContext(currentContext);
                         }
                     } else {
-                        // it's ok to only pass header into `serializeKey`, 
because for the value case passed-in headers are
+                        // it's ok to only pass headers into `serializeKey`, 
because for the value case passed-in headers are
                         // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
                         final Headers headers = value.headers();
                         wrapped().put(serializeKey(key, headers), 
serializeValue(value));
@@ -194,7 +194,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                         internalContext.setRecordContext(currentContext);
                     }
                 } else {
-                    // it's ok to only pass header into `serializeKey`, 
because for the value case passed-in headers are
+                    // it's ok to only pass headers into `serializeKey`, 
because for the value case passed-in headers are
                     // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
                     final Headers headers = value.headers();
                     // `rawOldValue` returned from 
`wrapped().putIfAbsent(...)` is type ValueTimestampHeader
@@ -225,7 +225,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
     private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
ValueTimestampHeaders<V>>> from) {
         final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
         for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
-            // it's ok to only pass header into `serializeKey`, because for 
the value case passed-in headers are
+            // it's ok to only pass headers into `serializeKey`, because for 
the value case passed-in headers are
             // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
             final Headers headers = entry.value != null ? 
entry.value.headers() : internalContext.headers();
             byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers), 
serializeValue(entry.value)));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index f172222baa7..b72f638f1df 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -37,12 +37,14 @@ public class MeteredTimestampedWindowStore<K, V>
     extends MeteredWindowStore<K, ValueAndTimestamp<V>>
     implements TimestampedWindowStore<K, V> {
 
-    MeteredTimestampedWindowStore(final WindowStore<Bytes, byte[]> inner,
-                                  final long windowSizeMs,
-                                  final String metricScope,
-                                  final Time time,
-                                  final Serde<K> keySerde,
-                                  final Serde<ValueAndTimestamp<V>> 
valueSerde) {
+    MeteredTimestampedWindowStore(
+        final WindowStore<Bytes, byte[]> inner,
+        final long windowSizeMs,
+        final String metricScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<ValueAndTimestamp<V>> valueSerde
+    ) {
         super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 74aafc87bd4..9d5a9ffb4d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.query.FailureReason;
@@ -43,6 +45,9 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -103,14 +108,15 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
 
                         try {
                             internalContext.setRecordContext(temporaryContext);
-                            wrapped().put(keyBytes(key, deleteHeaders), null, 
windowStartTimestamp);
+                            wrapped().put(serializeKey(key, deleteHeaders), 
null, windowStartTimestamp);
                         } finally {
                             // Restore original context
                             internalContext.setRecordContext(currentContext);
                         }
                     } else {
-                        final Headers headers = value.headers() == null ? new 
RecordHeaders() : value.headers();
-                        wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers), windowStartTimestamp);
+                        // it's ok to only pass headers into `serializeKey`, 
because for the value case passed-in headers are
+                        // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
+                        wrapped().put(serializeKey(key, value.headers()), 
serializeValue(value), windowStartTimestamp);
                     }
                 },
                 time,
@@ -123,10 +129,6 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
         }
     }
 
-    protected Bytes keyBytes(final K key, final Headers headers) {
-        return Bytes.wrap(serdes.rawKey(key, headers));
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     public <R> QueryResult<R> query(final Query<R> query,
@@ -165,52 +167,36 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
         final QueryResult<R> queryResult;
         if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
             final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
-                    WindowKeyQuery.withKeyAndWindowStartRange(
-                        keyBytes(query.getKey(), new RecordHeaders()),
-                        query.getTimeFrom().get(),
-                        query.getTimeTo().get()
-                    );
+                WindowKeyQuery.withKeyAndWindowStartRange(
+                    serializeKey(query.getKey(), internalContext.headers()),
+                    query.getTimeFrom().get(),
+                    query.getTimeTo().get()
+                );
             final QueryResult<WindowStoreIterator<byte[]>> rawResult = 
wrapped().query(rawKeyQuery, positionBound, config);
             if (rawResult.isSuccess()) {
                 if (isUnderlyingStoreTimestamped()) {
                     // For timestamped stores, return ValueAndTimestamp<V>
-                    final Function<byte[], ValueAndTimestamp<V>> valueFrom = 
bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
-                    };
-
-                    final MeteredWindowStoreIterator<ValueAndTimestamp<V>> 
typedResult =
-                            new MeteredWindowStoreIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                    final MeteredWindowStoreIterator<ValueAndTimestamp<V>> 
typedResult = meteredIterator(
+                        rawResult,
+                        rawValueTimestampHeaders -> {
+                            final ValueTimestampHeaders<V> vth = 
deserializeValue(rawValueTimestampHeaders);
+                            return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+                        }
+                    );
+
                     final 
QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult =
                             
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     queryResult = (QueryResult<R>) typedQueryResult;
                 } else {
                     // For non-timestamped stores, return plain V
-                    final Function<byte[], V> valueFrom = bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : vth.value();
-                    };
-
-                    final MeteredWindowStoreIterator<V> typedResult =
-                            new MeteredWindowStoreIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                    final MeteredWindowStoreIterator<V> typedResult = 
meteredIterator(
+                        rawResult,
+                        rawValueTimestampHeaders -> {
+                            final ValueTimestampHeaders<V> vth = 
deserializeValue(rawValueTimestampHeaders);
+                            return vth == null ? null : vth.value();
+                        }
+                    );
+
                     final QueryResult<MeteredWindowStoreIterator<V>> 
typedQueryResult =
                             
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     queryResult = (QueryResult<R>) typedQueryResult;
@@ -230,6 +216,20 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
         return queryResult;
     }
 
+    private <ValueType> MeteredWindowStoreIterator<ValueType> meteredIterator(
+        final QueryResult<WindowStoreIterator<byte[]>> rawResult,
+        final Function<byte[], ValueType> valueDeserializer
+    ) {
+        return new MeteredWindowStoreIterator<>(
+            rawResult.getResult(),
+            fetchSensor,
+            iteratorDurationSensor,
+            valueDeserializer,
+            time,
+            numOpenIterators,
+            openIterators
+        );
+    }
 
     /**
      * Handles WindowRangeQuery by creating a MeteredWindowedKeyValueIterator 
with conversion from
@@ -249,49 +249,25 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
             final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
                     wrapped().query(rawKeyQuery, positionBound, config);
             if (rawResult.isSuccess()) {
-                final Function<byte[], K> keyFrom = bytes -> 
serdes.keyFrom(bytes, new RecordHeaders());
-
                 if (isUnderlyingStoreTimestamped()) {
                     // For timestamped stores, return ValueAndTimestamp<V>
-                    final Function<byte[], ValueAndTimestamp<V>> valueFrom = 
bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : 
ValueAndTimestamp.make(vth.value(), vth.timestamp());
-                    };
-
                     final MeteredWindowedKeyValueIterator<K, 
ValueAndTimestamp<V>> typedResult =
-                            new MeteredWindowedKeyValueIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                keyFrom,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                        meteredWindowedIterator(
+                            rawResult,
+                            valueTimestampHeaders -> valueTimestampHeaders == 
null ? null : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp())
+                        );
+
                     final QueryResult<MeteredWindowedKeyValueIterator<K, 
ValueAndTimestamp<V>>> typedQueryResult =
                             
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     result = (QueryResult<R>) typedQueryResult;
                 } else {
                     // For non-timestamped stores, return plain V
-                    final Function<byte[], V> valueFrom = bytes -> {
-                        final ValueTimestampHeaders<V> vth = 
serdes.valueFrom(bytes, new RecordHeaders());
-                        return vth == null ? null : vth.value();
-                    };
-
                     final MeteredWindowedKeyValueIterator<K, V> typedResult =
-                            new MeteredWindowedKeyValueIterator<>(
-                                rawResult.getResult(),
-                                fetchSensor,
-                                iteratorDurationSensor,
-                                streamsMetrics,
-                                keyFrom,
-                                valueFrom,
-                                time,
-                                numOpenIterators,
-                                openIterators
-                            );
+                        meteredWindowedIterator(
+                            rawResult,
+                            valueTimestampHeaders -> valueTimestampHeaders == 
null ? null : valueTimestampHeaders.value()
+                        );
+
                     final QueryResult<MeteredWindowedKeyValueIterator<K, V>> 
typedQueryResult =
                             
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
                     result = (QueryResult<R>) typedQueryResult;
@@ -312,28 +288,32 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final 
K keyFrom,
-                                                                         final 
K keyTo,
-                                                                         final 
long timeFrom,
-                                                                         final 
long timeTo) {
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(
+        final K keyFrom,
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo
+    ) {
         return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
             wrapped().fetch(
-                keyBytes(keyFrom, new RecordHeaders()),
-                keyBytes(keyTo, new RecordHeaders()),
+                serializeKey(keyFrom, internalContext.headers()),
+                serializeKey(keyTo, internalContext.headers()),
                 timeFrom,
                 timeTo)
         );
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetch(final K keyFrom,
-                                                                               
  final K keyTo,
-                                                                               
  final long timeFrom,
-                                                                               
  final long timeTo) {
+    public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> 
backwardFetch(
+        final K keyFrom,
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo
+    ) {
         return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
             wrapped().backwardFetch(
-                keyBytes(keyFrom, new RecordHeaders()),
-                keyBytes(keyTo, new RecordHeaders()),
+                serializeKey(keyFrom, internalContext.headers()),
+                serializeKey(keyTo, internalContext.headers()),
                 timeFrom,
                 timeTo)
         );
@@ -403,9 +383,9 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
             }
 
             final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
-            final ValueTimestampHeaders<V> valueTimestampHeaders = 
serdes.valueFrom(next.value, new RecordHeaders());
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializeValue(next.value);
             final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
-            final K key = serdes.keyFrom(next.key.key().get(), headers);
+            final K key = deserializeKey(next.key.key().get(), headers);
             final Windowed<K> windowedKey = new Windowed<>(key, 
next.key.window());
             return KeyValue.pair(windowedKey, valueTimestampHeaders);
         }
@@ -432,8 +412,69 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
         }
     }
 
+    private <ValueType> MeteredWindowedKeyValueIterator<K, ValueType> 
meteredWindowedIterator(
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult,
+        final Function<ValueTimestampHeaders<V>, ValueType> valueConverter
+    ) {
+        return new MeteredWindowedKeyValueWithHeadersIterator<>(
+            rawResult.getResult(),
+            fetchSensor,
+            iteratorDurationSensor,
+            this::deserializeKey,
+            valueConverter,
+            time,
+            numOpenIterators,
+            openIterators
+        );
+    }
+
+    private final class MeteredWindowedKeyValueWithHeadersIterator<ValueType> 
extends MeteredWindowedKeyValueIterator<K, ValueType> {
+        private final BiFunction<byte[], Headers, K> deserializeKey;
+        private final Function<ValueTimestampHeaders<V>, ValueType> 
valueConverter;
+
+        MeteredWindowedKeyValueWithHeadersIterator(
+            final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
+            final Sensor operationSensor,
+            final Sensor iteratorSensor,
+            final BiFunction<byte[], Headers, K> deserializeKey,
+            final Function<ValueTimestampHeaders<V>, ValueType> valueConverter,
+            final Time time,
+            final LongAdder numOpenIterators,
+            final Set<MeteredIterator> openIterators
+        ) {
+            super(
+                iter,
+                operationSensor,
+                iteratorSensor,
+                null, // should not be used in super-class
+                null, // should not be used in super-class
+                time,
+                numOpenIterators,
+                openIterators
+            );
+
+            this.deserializeKey = deserializeKey;
+            this.valueConverter = valueConverter;
+        }
+
+        @Override
+        public KeyValue<Windowed<K>, ValueType> next() {
+            final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializeValue(next.value);
+            return KeyValue.pair(
+                windowedKey(next.key, valueTimestampHeaders.headers()),
+                valueConverter.apply(valueTimestampHeaders)
+            );
+        }
+
+        private Windowed<K> windowedKey(final Windowed<Bytes> bytesKey, final 
Headers headers) {
+            final K key = deserializeKey.apply(bytesKey.key().get(), headers);
+            return new Windowed<>(key, bytesKey.window());
+        }
+    }
+
     private boolean isUnderlyingStoreTimestamped() {
-        Object store = wrapped();
+        StateStore store = wrapped();
         do {
             // Check adapters first before attempting to unwrap
             if (store instanceof TimestampedToHeadersWindowStoreAdapter) {
@@ -452,6 +493,21 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
                 break;
             }
         } while (true);
+
         return false;
     }
+
+    protected Bytes serializeKey(final K key, final Headers headers) {
+        return Bytes.wrap(serdes.rawKey(key, headers));
+    }
+
+    @Override
+    protected K deserializeKey(final byte[] rawKey) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedWindowStoreWithHeaders 
required to pass in Headers when deserializing a key.");
+    }
+
+    protected K deserializeKey(final byte[] rawKey, final Headers headers) {
+        return serdes.keyFrom(rawKey, headers);
+    }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 25d98162bd8..cfc3f90ac24 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
@@ -61,8 +60,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
 public class MeteredWindowStore<K, V>
     extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
     implements WindowStore<K, V>, MeteredStateStore {
@@ -151,7 +148,7 @@ public class MeteredWindowStore<K, V>
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> numOpenIterators.sum());
+            (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
             (config, now) -> {
                 try {
@@ -164,10 +161,10 @@ public class MeteredWindowStore<K, V>
         );
         if (!persistent()) {
             StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
-                    (config, now) -> {
-                        final InMemoryWindowStore inMemoryStore = 
findInMemoryWindowStore(wrapped());
-                        return inMemoryStore != null ? 
inMemoryStore.numEntries() : -1L;
-                    }
+                (config, now) -> {
+                    final InMemoryWindowStore inMemoryStore = 
findInMemoryWindowStore(wrapped());
+                    return inMemoryStore != null ? inMemoryStore.numEntries() 
: -1L;
+                }
             );
         }
     }
@@ -191,37 +188,51 @@ public class MeteredWindowStore<K, V>
         final String storeName = name();
         final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
         serdes = StoreSerdeInitializer.prepareStoreSerde(
-            context, storeName, changelogTopic, keySerde, valueSerde, 
this::prepareValueSerde);
+            context,
+            storeName,
+            changelogTopic,
+            keySerde,
+            valueSerde,
+            this::prepareValueSerde
+        );
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> 
listener,
-                                    final boolean sendOldValues) {
+    public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> 
listener, final boolean sendOldValues) {
         final WindowStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) 
wrapped).setFlushListener(
-                record -> listener.apply(
-                    record.withKey(WindowKeySchema.fromStoreKey(record.key(), 
windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
-                        .withValue(new Change<>(
-                            record.value().newValue != null ? 
serdes.valueFrom(record.value().newValue, record.headers()) : null,
-                            record.value().oldValue != null ? 
serdes.valueFrom(record.value().oldValue, record.headers()) : null,
-                            record.value().isLatest
-                        ))
-                ),
-                sendOldValues);
+                record -> {
+                    final Change<byte[]> change = record.value();
+                    listener.apply(
+                        record.withKey(
+                            WindowKeySchema.fromStoreKey(
+                                record.key(),
+                                windowSizeMs,
+                                serdes.keyDeserializer(),
+                                internalContext.headers(),
+                                serdes.topic()
+                            ))
+                            .withValue(new Change<>(
+                                change.newValue != null ? 
serdes.valueFrom(change.newValue, record.headers()) : null,
+                                change.oldValue != null ? 
serdes.valueFrom(change.oldValue, record.headers()) : null,
+                                change.isLatest
+                            ))
+                    );
+                },
+                sendOldValues
+            );
         }
         return false;
     }
 
     @Override
-    public void put(final K key,
-                    final V value,
-                    final long windowStartTimestamp) {
+    public void put(final K key, final V value, final long 
windowStartTimestamp) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
             maybeMeasureLatency(
-                () -> wrapped().put(keyBytes(key), serdes.rawValue(value, new 
RecordHeaders()), windowStartTimestamp),
+                () -> wrapped().put(serializeKey(key), serializeValue(value), 
windowStartTimestamp),
                 time,
                 putSensor
             );
@@ -233,16 +244,15 @@ public class MeteredWindowStore<K, V>
     }
 
     @Override
-    public V fetch(final K key,
-                   final long timestamp) {
+    public V fetch(final K key, final long timestamp) {
         Objects.requireNonNull(key, "key cannot be null");
         return maybeMeasureLatency(
             () -> {
-                final byte[] result = wrapped().fetch(keyBytes(key), 
timestamp);
+                final byte[] result = wrapped().fetch(serializeKey(key), 
timestamp);
                 if (result == null) {
                     return null;
                 }
-                return serdes.valueFrom(result, new RecordHeaders());
+                return deserializeValue(result);
             },
             time,
             fetchSensor
@@ -250,16 +260,13 @@ public class MeteredWindowStore<K, V>
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(final K key,
-                                        final long timeFrom,
-                                        final long timeTo) {
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
         return new MeteredWindowStoreIterator<>(
-            wrapped().fetch(keyBytes(key), timeFrom, timeTo),
+            wrapped().fetch(serializeKey(key), timeFrom, timeTo),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::valueFrom,
+            this::deserializeValue,
             time,
             numOpenIterators,
             openIterators
@@ -267,16 +274,13 @@ public class MeteredWindowStore<K, V>
     }
 
     @Override
-    public WindowStoreIterator<V> backwardFetch(final K key,
-                                                final long timeFrom,
-                                                final long timeTo) {
+    public WindowStoreIterator<V> backwardFetch(final K key, final long 
timeFrom, final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
         return new MeteredWindowStoreIterator<>(
-            wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
+            wrapped().backwardFetch(serializeKey(key), timeFrom, timeTo),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::valueFrom,
+            this::deserializeValue,
             time,
             numOpenIterators,
             openIterators
@@ -284,75 +288,77 @@ public class MeteredWindowStore<K, V>
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
-                                                  final K keyTo,
-                                                  final long timeFrom,
-                                                  final long timeTo) {
+    public KeyValueIterator<Windowed<K>, V> fetch(
+        final K keyFrom,
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo
+    ) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().fetch(
-                keyBytes(keyFrom),
-                keyBytes(keyTo),
+                serializeKey(keyFrom),
+                serializeKey(keyTo),
                 timeFrom,
                 timeTo),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::keyFrom,
-            serdes::valueFrom,
+            this::deserializeKey,
+            this::deserializeValue,
             time,
             numOpenIterators,
-            openIterators);
+            openIterators
+        );
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
-                                                          final K keyTo,
-                                                          final long timeFrom,
-                                                          final long timeTo) {
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(
+        final K keyFrom,
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo
+    ) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().backwardFetch(
-                keyBytes(keyFrom),
-                keyBytes(keyTo),
+                serializeKey(keyFrom),
+                serializeKey(keyTo),
                 timeFrom,
                 timeTo),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::keyFrom,
-            serdes::valueFrom,
+            this::deserializeKey,
+            this::deserializeValue,
             time,
             numOpenIterators,
-            openIterators);
+            openIterators
+        );
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
-                                                     final long timeTo) {
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, 
final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().fetchAll(timeFrom, timeTo),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::keyFrom,
-            serdes::valueFrom,
+            this::deserializeKey,
+            this::deserializeValue,
             time,
             numOpenIterators,
-            openIterators);
+            openIterators
+        );
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom,
-                                                             final long 
timeTo) {
+    public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom, final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().backwardFetchAll(timeFrom, timeTo),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::keyFrom,
-            serdes::valueFrom,
+            this::deserializeKey,
+            this::deserializeValue,
             time,
             numOpenIterators,
-            openIterators);
+            openIterators
+        );
     }
 
     @Override
@@ -361,9 +367,8 @@ public class MeteredWindowStore<K, V>
             wrapped().all(),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::keyFrom,
-            serdes::valueFrom,
+            this::deserializeKey,
+            this::deserializeValue,
             time,
             numOpenIterators,
             openIterators
@@ -376,9 +381,8 @@ public class MeteredWindowStore<K, V>
             wrapped().backwardAll(),
             fetchSensor,
             iteratorDurationSensor,
-            streamsMetrics,
-            serdes::keyFrom,
-            serdes::valueFrom,
+            this::deserializeKey,
+            this::deserializeValue,
             time,
             numOpenIterators,
             openIterators
@@ -455,8 +459,7 @@ public class MeteredWindowStore<K, V>
                         rawResult.getResult(),
                         fetchSensor,
                         iteratorDurationSensor,
-                        streamsMetrics,
-                        serdes::keyFrom,
+                        this::deserializeKey,
                         StoreQueryUtils.deserializeValue(serdes, wrapped()),
                         time,
                         numOpenIterators,
@@ -494,7 +497,7 @@ public class MeteredWindowStore<K, V>
         if (typedQuery.getTimeFrom().isPresent() && 
typedQuery.getTimeTo().isPresent()) {
             final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
                 WindowKeyQuery.withKeyAndWindowStartRange(
-                    keyBytes(typedQuery.getKey()),
+                    serializeKey(typedQuery.getKey()),
                     typedQuery.getTimeFrom().get(),
                     typedQuery.getTimeTo().get()
                 );
@@ -508,7 +511,6 @@ public class MeteredWindowStore<K, V>
                     rawResult.getResult(),
                     fetchSensor,
                     iteratorDurationSensor,
-                    streamsMetrics,
                     StoreQueryUtils.deserializeValue(serdes, wrapped()),
                     time,
                     numOpenIterators,
@@ -534,12 +536,20 @@ public class MeteredWindowStore<K, V>
         return queryResult;
     }
 
-    private Bytes keyBytes(final K key) {
-        return Bytes.wrap(serdes.rawKey(key, new RecordHeaders()));
+    private Bytes serializeKey(final K key) {
+        return Bytes.wrap(serdes.rawKey(key, internalContext.headers()));
+    }
+
+    protected K deserializeKey(final byte[] rawKey) {
+        return serdes.keyFrom(rawKey, internalContext.headers());
+    }
+
+    protected byte[] serializeValue(final V value) {
+        return value != null ? serdes.rawValue(value, 
internalContext.headers()) : null;
     }
 
-    protected V outerValue(final byte[] value) {
-        return value != null ? serdes.valueFrom(value, new RecordHeaders()) : 
null;
+    protected V deserializeValue(final byte[] rawValue) {
+        return rawValue != null ? serdes.valueFrom(rawValue, 
internalContext.headers()) : null;
     }
 
     protected void maybeRecordE2ELatency() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
index 119cb739558..b0c752f7d50 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Set;
@@ -31,7 +30,6 @@ class MeteredWindowStoreIterator<V> implements 
WindowStoreIterator<V>, MeteredIt
     private final WindowStoreIterator<byte[]> iter;
     private final Sensor operationSensor;
     private final Sensor iteratorSensor;
-    private final StreamsMetrics metrics;
     private final Function<byte[], V> valueFrom;
     private final long startNs;
     private final long startTimestampMs;
@@ -42,7 +40,6 @@ class MeteredWindowStoreIterator<V> implements 
WindowStoreIterator<V>, MeteredIt
     MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
                                final Sensor operationSensor,
                                final Sensor iteratorSensor,
-                               final StreamsMetrics metrics,
                                final Function<byte[], V> valueFrom,
                                final Time time,
                                final LongAdder numOpenIterators,
@@ -50,7 +47,6 @@ class MeteredWindowStoreIterator<V> implements 
WindowStoreIterator<V>, MeteredIt
         this.iter = iter;
         this.operationSensor = operationSensor;
         this.iteratorSensor = iteratorSensor;
-        this.metrics = metrics;
         this.valueFrom = valueFrom;
         this.startNs = time.nanoseconds();
         this.startTimestampMs = time.milliseconds();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
index 2f4bf65a56e..a441d825408 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
@@ -30,12 +29,11 @@ import java.util.function.Function;
 
 class MeteredWindowedKeyValueIterator<K, V> implements 
KeyValueIterator<Windowed<K>, V>, MeteredIterator {
 
-    private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
+    final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
     private final Sensor operationSensor;
     private final Sensor iteratorSensor;
-    private final StreamsMetrics metrics;
-    private final Function<byte[], K> deserializeKey;
-    private final Function<byte[], V> deserializeValue;
+    final Function<byte[], K> deserializeKey;
+    final Function<byte[], V> deserializeValue;
     private final long startNs;
     private final long startTimestampMs;
     private final Time time;
@@ -45,7 +43,6 @@ class MeteredWindowedKeyValueIterator<K, V> implements 
KeyValueIterator<Windowed
     MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, 
byte[]> iter,
                                     final Sensor operationSensor,
                                     final Sensor iteratorSensor,
-                                    final StreamsMetrics metrics,
                                     final Function<byte[], K> deserializeKey,
                                     final Function<byte[], V> deserializeValue,
                                     final Time time,
@@ -54,7 +51,6 @@ class MeteredWindowedKeyValueIterator<K, V> implements 
KeyValueIterator<Windowed
         this.iter = iter;
         this.operationSensor = operationSensor;
         this.iteratorSensor = iteratorSensor;
-        this.metrics = metrics;
         this.deserializeKey = deserializeKey;
         this.deserializeValue = deserializeValue;
         this.startNs = time.nanoseconds();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index d294f04b663..e837b060c5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -220,17 +220,6 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
         return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - 
SEQNUM_SIZE);
     }
 
-    // TODO: Remove this method when MeteredWindowStore will use headers 
version
-    @Deprecated
-    public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
-                                               final long windowSize,
-                                               final Deserializer<K> 
deserializer,
-                                               final String topic) {
-        final K key = deserializer.deserialize(topic, 
extractStoreKeyBytes(binaryKey));
-        final Window window = extractStoreWindow(binaryKey, windowSize);
-        return new Windowed<>(key, window);
-    }
-
     public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
                                                final long windowSize,
                                                final Deserializer<K> 
deserializer,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index f3f29572030..250e10169b6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serde;
@@ -106,7 +109,9 @@ public class KTableAggregateTest {
             final TestInputTopic<String, String> inputTopic =
                 driver.createInputTopic(topic1, new StringSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
 
-            inputTopic.pipeInput("A", "1", 10L);
+            final Headers headers = new RecordHeaders();
+            headers.add(new RecordHeader("header-key",  
"header-value".getBytes(StandardCharsets.UTF_8)));
+            inputTopic.pipeInput(new TestRecord<>("A", "1", headers, 10L));
             inputTopic.pipeInput("B", "2", 15L);
             inputTopic.pipeInput("A", "3", 20L);
             inputTopic.pipeInput("B", "4", 18L);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
index ec379f578bd..726054e787f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -34,6 +35,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -252,7 +254,7 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     private static byte[] serializeValueTimestampHeaders() {
-        final ValueTimestampHeadersSerializer<String> serializer = new 
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+        final ValueTimestampHeadersSerializer<String> serializer = new 
ValueTimestampHeadersSerializer<>(new StringSerializer());
         return serializer.serialize("topic", VALUE_TIMESTAMP_HEADERS);
     }
 
@@ -264,15 +266,20 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
         final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
         final Serializer<ValueTimestampHeaders<String>> valueSerializer = 
mock(Serializer.class);
         when(keySerde.serializer()).thenReturn(keySerializer);
-        // For fetch: key serialization uses empty headers (no value context 
available)
-        when(keySerializer.serialize(topic, new RecordHeaders(), 
KEY)).thenReturn(KEY.getBytes());
         // For put: key serialization uses value's headers
         when(keySerializer.serialize(topic, HEADERS, 
KEY)).thenReturn(KEY.getBytes());
         when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-        when(valueDeserializer.deserialize(topic, new RecordHeaders(), 
VALUE_TIMESTAMP_HEADERS_BYTES)).thenReturn(VALUE_TIMESTAMP_HEADERS);
+        when(valueDeserializer.deserialize(topic, HEADERS, 
VALUE_TIMESTAMP_HEADERS_BYTES)).thenReturn(VALUE_TIMESTAMP_HEADERS);
         when(valueSerde.serializer()).thenReturn(valueSerializer);
         // For put: value serialization uses value's headers
         when(valueSerializer.serialize(topic, HEADERS, 
VALUE_TIMESTAMP_HEADERS)).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+        context.setRecordContext(new ProcessorRecordContext(
+            0L,
+            0L,
+            0,
+            topic,
+            HEADERS
+        ));
         when(innerStoreMock.fetch(KEY_BYTES, 
TIMESTAMP)).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
         store = new MeteredTimestampedWindowStoreWithHeaders<>(
             innerStoreMock,
@@ -324,7 +331,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
         setUp();
 
@@ -351,7 +357,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
         setUp();
 
@@ -377,7 +382,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldUseHeadersFromValueToDeserializeKeyInFetchRange() {
         setUp();
 
@@ -405,7 +409,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchAll() {
         setUp();
 
@@ -431,7 +434,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldUseHeadersFromValueToDeserializeKeyInBackwardAll() {
         setUp();
 
@@ -457,7 +459,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void 
shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchRange() {
         setUp();
 

Reply via email to