This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e952ccd52ba KAFKA-20173: Ensure Metered kv-stores pass headers 
correctly (#21768)
e952ccd52ba is described below

commit e952ccd52ba0579bdcfd7f177f24bc38321df1e3
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Mar 17 15:38:36 2026 -0700

    KAFKA-20173: Ensure Metered kv-stores pass headers correctly (#21768)
    
    Ensures that all Metered KV-stores (plain, ts, headers, version) pass
    headers into de/serializers.
    
    Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
     <[email protected]>, Uladzislau Blok, <[email protected]>, Bill
     Bejeck <[email protected]>
---
 .../apache/kafka/streams/state/StateSerdes.java    |   4 +-
 .../state/internals/MeteredKeyValueStore.java      |  91 +++---------
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 154 +++++++++++++--------
 .../internals/MeteredVersionedKeyValueStore.java   |  12 +-
 .../streams/state/internals/StoreQueryUtils.java   |  35 +++--
 ...redTimestampedKeyValueStoreWithHeadersTest.java |   1 +
 6 files changed, 152 insertions(+), 145 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 7128dfc85e2..8632dd37bfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -207,6 +207,7 @@ public final class StateSerdes<K, V> {
      * @param key  the key to be serialized
      * @return     the serialized key
      */
+    @SuppressWarnings("resource")
     public byte[] rawKey(final K key, final Headers headers) {
         try {
             return keySerde.serializer().serialize(topic, headers, key);
@@ -230,7 +231,6 @@ public final class StateSerdes<K, V> {
      * @deprecated Since 4.3. Use {@link #rawValue(Object, Headers)} instead.
      */
     @Deprecated
-    @SuppressWarnings("rawtypes")
     public byte[] rawValue(final V value) {
         return rawValue(value, new RecordHeaders());
     }
@@ -241,7 +241,7 @@ public final class StateSerdes<K, V> {
      * @param value  the value to be serialized
      * @return       the serialized value
      */
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "resource"})
     public byte[] rawValue(final V value, final Headers headers) {
         try {
             return valueSerde.serializer().serialize(topic, headers, value);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 3be8fc8a962..d94b8fd5253 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -87,7 +87,7 @@ public class MeteredKeyValueStore<K, V>
     protected Sensor putIfAbsentSensor;
     protected Sensor getSensor;
     protected Sensor deleteSensor;
-    private Sensor putAllSensor;
+    protected Sensor putAllSensor;
     protected Sensor allSensor;
     protected Sensor rangeSensor;
     protected Sensor prefixScanSensor;
@@ -288,10 +288,9 @@ public class MeteredKeyValueStore<K, V>
             wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
-            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueStoreIterator(
                 iterator,
-                getSensor,
-                StoreQueryUtils.deserializeValue(serdes, wrapped())
+                getSensor
             );
             final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
                 InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@@ -387,13 +386,13 @@ public class MeteredKeyValueStore<K, V>
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
-        return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);
+        return new MeteredKeyValueStoreIterator(wrapped().prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);
     }
 
     @Override
     public KeyValueIterator<K, V> range(final K from,
                                         final K to) {
-        return new MeteredKeyValueIterator(
+        return new MeteredKeyValueStoreIterator(
             wrapped().range(serializeKey(from), serializeKey(to)),
             rangeSensor
         );
@@ -402,7 +401,7 @@ public class MeteredKeyValueStore<K, V>
     @Override
     public KeyValueIterator<K, V> reverseRange(final K from,
                                                final K to) {
-        return new MeteredKeyValueIterator(
+        return new MeteredKeyValueStoreIterator(
             wrapped().reverseRange(serializeKey(from), serializeKey(to)),
             rangeSensor
         );
@@ -410,12 +409,12 @@ public class MeteredKeyValueStore<K, V>
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator(wrapped().all(), allSensor);
+        return new MeteredKeyValueStoreIterator(wrapped().all(), allSensor);
     }
 
     @Override
     public KeyValueIterator<K, V> reverseAll() {
-        return new MeteredKeyValueIterator(wrapped().reverseAll(), allSensor);
+        return new MeteredKeyValueStoreIterator(wrapped().reverseAll(), 
allSensor);
     }
 
     @Override
@@ -450,7 +449,7 @@ public class MeteredKeyValueStore<K, V>
     }
 
     protected K deserializeKey(final byte[] rawKey) {
-        return rawKey != null ? serdes.keyFrom(rawKey, 
internalContext.headers()) : null;
+        return serdes.keyFrom(rawKey, internalContext.headers());
     }
 
     private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
V>> from) {
@@ -469,15 +468,15 @@ public class MeteredKeyValueStore<K, V>
         }
     }
 
-    private class MeteredKeyValueIterator implements KeyValueIterator<K, V>, 
MeteredIterator {
+    private class MeteredKeyValueStoreIterator implements KeyValueIterator<K, 
V>, MeteredIterator {
 
         private final KeyValueIterator<Bytes, byte[]> iter;
         private final Sensor sensor;
         private final long startNs;
         private final long startTimestamp;
 
-        private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]> 
iter,
-                                        final Sensor sensor) {
+        private MeteredKeyValueStoreIterator(final KeyValueIterator<Bytes, 
byte[]> iter,
+                                             final Sensor sensor) {
             this.iter = iter;
             this.sensor = sensor;
             this.startTimestamp = time.milliseconds();
@@ -500,6 +499,9 @@ public class MeteredKeyValueStore<K, V>
         public KeyValue<K, V> next() {
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
             return KeyValue.pair(
+                // note: `MeteredKeyValueStoreIterator` is also use on the IQ 
code path,
+                // and that fine: `internalContext.headers()` will return `new 
RecordHeaders()`
+                // what make sense as for IQ there is no "record context" at 
hand.
                 deserializeKey(keyValue.key.get()),
                 deserializeValue(keyValue.value));
         }
@@ -519,66 +521,9 @@ public class MeteredKeyValueStore<K, V>
 
         @Override
         public K peekNextKey() {
-            return deserializeKey(iter.peekNextKey().get());
-        }
-    }
-
-    private class MeteredKeyValueTimestampedIterator implements 
KeyValueIterator<K, V>, MeteredIterator {
-
-        private final KeyValueIterator<Bytes, byte[]> iter;
-        private final Sensor sensor;
-        private final long startNs;
-        private final long startTimestamp;
-        private final Function<byte[], V> valueDeserializer;
-
-        private MeteredKeyValueTimestampedIterator(
-            final KeyValueIterator<Bytes, byte[]> iter,
-            final Sensor sensor,
-            final Function<byte[], V> valueDeserializer
-        ) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.valueDeserializer = valueDeserializer;
-            this.startTimestamp = time.milliseconds();
-            this.startNs = time.nanoseconds();
-            numOpenIterators.increment();
-            openIterators.add(this);
-        }
-
-        @Override
-        public long startTimestamp() {
-            return startTimestamp;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public KeyValue<K, V> next() {
-            final KeyValue<Bytes, byte[]> keyValue = iter.next();
-            return KeyValue.pair(
-                deserializeKey(keyValue.key.get()),
-                valueDeserializer.apply(keyValue.value)
-            );
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                final long duration = time.nanoseconds() - startNs;
-                sensor.record(duration);
-                iteratorDurationSensor.record(duration);
-                numOpenIterators.decrement();
-                openIterators.remove(this);
-            }
-        }
-
-        @Override
-        public K peekNextKey() {
+            // note: `MeteredKeyValueStoreIterator` is also use on the IQ code 
path,
+            // and that fine: `internalContext.headers()` will return `new 
RecordHeaders()`
+            // what make sense as for IQ there is no "record context" at hand.
             return deserializeKey(iter.peekNextKey().get());
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 2346c7646af..f77471ad905 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -43,6 +43,8 @@ import 
org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
@@ -57,7 +59,7 @@ import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDese
  * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
  * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
  *
- * The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
+ * <p> The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
  * hence we use {@link Serde}s to convert from &lt;K, 
ValueTimestampHeaders&lt;V&gt;&gt; to &lt;Bytes, byte[]&gt;.
  *
  * @param <K> key type
@@ -107,6 +109,17 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         }
     }
 
+    @Override
+    public ValueTimestampHeaders<V> get(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key, internalContext.headers()))), 
time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
     @Override
     public void put(final K key,
                     final ValueTimestampHeaders<V> value) {
@@ -118,7 +131,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                         final ProcessorRecordContext currentContext = 
internalContext.recordContext();
 
                         // Create new headers object to isolate tombstone 
operation from input record
-                        final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+                        final Headers tombstoneHeaders = new 
RecordHeaders(currentContext.headers());
 
                         // Create temporary context with new headers
                         final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
@@ -126,19 +139,21 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
                             currentContext.offset(),
                             currentContext.partition(),
                             currentContext.topic(),
-                            deleteHeaders
+                            tombstoneHeaders
                         );
 
                         try {
                             internalContext.setRecordContext(temporaryContext);
-                            wrapped().put(keyBytes(key, deleteHeaders), 
serdes.rawValue(null, deleteHeaders));
+                            wrapped().put(serializeKey(key, tombstoneHeaders), 
serializeValue(null));
                         } finally {
                             // Restore original context
                             internalContext.setRecordContext(currentContext);
                         }
                     } else {
+                        // it's ok to only pass header into `serializeKey`, 
because for the value case passed-in headers are
+                        // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
                         final Headers headers = value.headers();
-                        wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers));
+                        wrapped().put(serializeKey(key, headers), 
serializeValue(value));
                     }
                 },
                 time,
@@ -161,7 +176,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                     final ProcessorRecordContext currentContext = 
internalContext.recordContext();
 
                     // Create new headers object to isolate tombstone 
operation from input record
-                    final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+                    final Headers tombstoneHeaders = new 
RecordHeaders(currentContext.headers());
 
                     // Create temporary context with new headers
                     final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
@@ -169,19 +184,23 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
                         currentContext.offset(),
                         currentContext.partition(),
                         currentContext.topic(),
-                        deleteHeaders
+                        tombstoneHeaders
                     );
 
                     try {
                         internalContext.setRecordContext(temporaryContext);
-                        return 
deserializeValue(wrapped().putIfAbsent(keyBytes(key, deleteHeaders), 
serdes.rawValue(null, deleteHeaders)));
+                        return 
deserializeValue(wrapped().putIfAbsent(serializeKey(key, tombstoneHeaders), 
serializeValue(null)));
                     } finally {
                         // Restore original context
                         internalContext.setRecordContext(currentContext);
                     }
                 } else {
+                    // it's ok to only pass header into `serializeKey`, 
because for the value case passed-in headers are
+                    // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
                     final Headers headers = value.headers();
-                    return 
deserializeValue(wrapped().putIfAbsent(keyBytes(key, headers), 
serdes.rawValue(value, headers)));
+                    // `rawOldValue` returned from 
`wrapped().putIfAbsent(...)` is type ValueTimestampHeader
+                    // -> no need to pass in Headers into `deserializeValue()`
+                    return 
deserializeValue(wrapped().putIfAbsent(serializeKey(key, headers), 
serializeValue(value)));
                 }
             },
             time,
@@ -192,7 +211,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
     }
 
     @Override
-    public void putAll(final java.util.List<KeyValue<K, 
ValueTimestampHeaders<V>>> entries) {
+    public void putAll(final List<KeyValue<K, ValueTimestampHeaders<V>>> 
entries) {
         entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot 
be null"));
 
         final boolean hasNullValue = entries.stream().anyMatch(entry -> 
entry.value == null);
@@ -200,9 +219,19 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         if (hasNullValue) {
             entries.forEach(entry -> put(entry.key, entry.value));
         } else {
-            // If no null values, use parent's batch optimization
-            super.putAll(entries);
+            maybeMeasureLatency(() -> wrapped().putAll(innerEntries(entries)), 
time, putAllSensor);
+        }
+    }
+
+    private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, 
ValueTimestampHeaders<V>>> from) {
+        final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
+        for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
+            // it's ok to only pass header into `serializeKey`, because for 
the value case passed-in headers are
+            // getting ignored anyway, because the value (of type 
`ValueTimestampHeaders`) itself carries the headers
+            final Headers headers = entry.value != null ? 
entry.value.headers() : internalContext.headers();
+            byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers), 
serializeValue(entry.value)));
         }
+        return byteEntries;
     }
 
     @Override
@@ -214,7 +243,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                     final ProcessorRecordContext currentContext = 
internalContext.recordContext();
 
                     // Create new headers object to isolate delete operation 
from input record
-                    final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+                    final Headers tombstoneHeaders = new 
RecordHeaders(currentContext.headers());
 
                     // Create temporary context with new headers
                     final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
@@ -222,12 +251,12 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
                         currentContext.offset(),
                         currentContext.partition(),
                         currentContext.topic(),
-                        deleteHeaders
+                        tombstoneHeaders
                     );
 
                     try {
                         internalContext.setRecordContext(temporaryContext);
-                        final byte[] deletedValue = 
wrapped().delete(keyBytes(key, deleteHeaders));
+                        final byte[] deletedValue = 
wrapped().delete(serializeKey(key, tombstoneHeaders));
                         return deserializeValue(deletedValue);
                     } finally {
                         // Restore original context
@@ -294,10 +323,11 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         final QueryResult<R> result;
         final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
         final KeyQuery<Bytes, byte[]> rawKeyQuery =
-            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), new 
RecordHeaders()));
+            KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), 
internalContext.headers()));
         final QueryResult<byte[]> rawResult =
             wrapped().query(rawKeyQuery, positionBound, config);
         if (rawResult.isSuccess()) {
+            // value will be `rawValueTimestampHeader`; no need to pass 
headers explicitly
             final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
             final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
@@ -318,10 +348,11 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         final QueryResult<R> result;
         final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
         final KeyQuery<Bytes, byte[]> rawKeyQuery =
-                KeyQuery.withKey(keyBytes(typedKeyQuery.key(), new 
RecordHeaders()));
+                KeyQuery.withKey(serializeKey(typedKeyQuery.key(), 
internalContext.headers()));
         final QueryResult<byte[]> rawResult =
                 wrapped().query(rawKeyQuery, positionBound, config);
         if (rawResult.isSuccess()) {
+            // value will be `rawValueTimestampHeader`; no need to pass 
headers explicitly
             final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
             // Convert ValueTimestampHeaders to ValueAndTimestamp for the 
result
@@ -348,8 +379,8 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         RangeQuery<Bytes, byte[]> rawRangeQuery;
         final ResultOrder order = typedQuery.resultOrder();
         rawRangeQuery = RangeQuery.withRange(
-                keyBytes(typedQuery.getLowerBound().orElse(null), new 
RecordHeaders()),
-                keyBytes(typedQuery.getUpperBound().orElse(null), new 
RecordHeaders())
+            serializeKey(typedQuery.getLowerBound().orElse(null), 
internalContext.headers()),
+            serializeKey(typedQuery.getUpperBound().orElse(null), 
internalContext.headers())
         );
         if (order.equals(ResultOrder.DESCENDING)) {
             rawRangeQuery = rawRangeQuery.withDescendingKeys();
@@ -361,9 +392,10 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
             wrapped().query(rawRangeQuery, positionBound, config);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
-            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
                     iterator,
                     getSensor,
+                    // value will be `rawValueTimestampHeader`; no need to 
pass headers explicitly
                     StoreQueryUtils.deserializeValue(serdes, wrapped()),
                     true
             );
@@ -390,8 +422,8 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         RangeQuery<Bytes, byte[]> rawRangeQuery;
         final ResultOrder order = typedQuery.resultOrder();
         rawRangeQuery = RangeQuery.withRange(
-                keyBytes(typedQuery.lowerBound().orElse(null), new 
RecordHeaders()),
-                keyBytes(typedQuery.upperBound().orElse(null), new 
RecordHeaders())
+            serializeKey(typedQuery.lowerBound().orElse(null), 
internalContext.headers()),
+            serializeKey(typedQuery.upperBound().orElse(null), 
internalContext.headers())
         );
         if (order.equals(ResultOrder.DESCENDING)) {
             rawRangeQuery = rawRangeQuery.withDescendingKeys();
@@ -404,9 +436,10 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
             final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
-                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
                             iterator,
                             getSensor,
+                            // value will be `rawValueTimestampHeader`; no 
need to pass headers explicitly
                             StoreQueryUtils.deserializeValue(serdes, 
wrapped()),
                             false
             );
@@ -428,7 +461,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                                                                                
                   final PS prefixKeySerializer) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
-        return new MeteredValueTimestampHeadersIterator(
+        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
             wrapped().prefixScan(prefix, prefixKeySerializer),
             prefixScanSensor
         );
@@ -437,10 +470,10 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     @Override
     public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
                                                                final K to) {
-        return new MeteredValueTimestampHeadersIterator(
+        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
             wrapped().range(
-                keyBytes(from, new RecordHeaders()),
-                keyBytes(to, new RecordHeaders())
+                serializeKey(from, internalContext.headers()),
+                serializeKey(to, internalContext.headers())
             ),
             rangeSensor
         );
@@ -449,10 +482,10 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     @Override
     public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K 
from,
                                                                       final K 
to) {
-        return new MeteredValueTimestampHeadersIterator(
+        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
             wrapped().reverseRange(
-                keyBytes(from, new RecordHeaders()),
-                keyBytes(to, new RecordHeaders())
+                serializeKey(from, internalContext.headers()),
+                serializeKey(to, internalContext.headers())
             ),
             rangeSensor
         );
@@ -460,7 +493,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
 
     @Override
     public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
-        return new MeteredValueTimestampHeadersIterator(
+        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
             wrapped().all(),
             allSensor
         );
@@ -468,14 +501,14 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
 
     @Override
     public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
-        return new MeteredValueTimestampHeadersIterator(
+        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
             wrapped().reverseAll(),
             allSensor
         );
     }
 
     @SuppressWarnings("unchecked")
-    private class MeteredTimestampedKeyValueStoreWithHeadersIterator 
implements KeyValueIterator<K, V>, MeteredIterator {
+    private class MeteredTimestampedKeyValueStoreWithHeadersQueryIterator 
implements KeyValueIterator<K, V>, MeteredIterator {
 
         private final KeyValueIterator<Bytes, byte[]> iter;
         private final Sensor sensor;
@@ -486,10 +519,12 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         private final boolean returnPlainValue;
         private KeyValue<K, V> cachedNext;
 
-        private MeteredTimestampedKeyValueStoreWithHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
-                                                                   final 
Sensor sensor,
-                                                                   final 
Function<byte[], ValueTimestampHeaders<V>> valueTimestampHeadersDeserializer,
-                                                                   final 
boolean returnPlainValue) {
+        private MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
+            final KeyValueIterator<Bytes, byte[]> iter,
+            final Sensor sensor,
+            final Function<byte[], ValueTimestampHeaders<V>> 
valueTimestampHeadersDeserializer,
+            final boolean returnPlainValue
+        ) {
             this.iter = iter;
             this.sensor = sensor;
             this.valueTimestampHeadersDeserializer = 
valueTimestampHeadersDeserializer;
@@ -519,21 +554,15 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
 
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
valueTimestampHeadersDeserializer.apply(keyValue.value);
-            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
+            final Headers headers = valueTimestampHeaders.headers();
+
             if (returnPlainValue) {
-                final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
-                return KeyValue.pair(
-                    serdes.keyFrom(keyValue.key.get(), headers),
-                    plainValue
-                );
+                return KeyValue.pair(deserializeKey(keyValue.key.get(), 
headers), valueTimestampHeaders.value());
             } else {
                 // Return as ValueAndTimestamp
-                final ValueAndTimestamp<V> valueAndTimestamp = 
valueTimestampHeaders == null
-                    ? null
-                    : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp());
                 return KeyValue.pair(
-                    serdes.keyFrom(keyValue.key.get(), headers),
-                    (V) valueAndTimestamp
+                    deserializeKey(keyValue.key.get(), headers),
+                    (V) ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp())
                 );
             }
         }
@@ -559,15 +588,17 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         }
     }
 
-    private class MeteredValueTimestampHeadersIterator implements 
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+    private class MeteredTimestampedKeyValueStoreWithHeadersIterator 
implements KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
         private final KeyValueIterator<Bytes, byte[]> iter;
         private final Sensor sensor;
         private final long startNs;
         private final long startTimestampMs;
         private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
 
-        private MeteredValueTimestampHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
-                                                     final Sensor sensor) {
+        private MeteredTimestampedKeyValueStoreWithHeadersIterator(
+            final KeyValueIterator<Bytes, byte[]> iter,
+            final Sensor sensor
+        ) {
             this.iter = iter;
             this.sensor = sensor;
             this.startNs = time.nanoseconds();
@@ -595,9 +626,8 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
             }
 
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
-            final ValueTimestampHeaders<V> valueTimestampHeaders = 
serdes.valueFrom(keyValue.value, new RecordHeaders());
-            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
-            final K key = serdes.keyFrom(keyValue.key.get(), headers);
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializeValue(keyValue.value);
+            final K key = deserializeKey(keyValue.key.get(), 
valueTimestampHeaders.headers());
             return KeyValue.pair(key, valueTimestampHeaders);
         }
 
@@ -623,7 +653,21 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         }
     }
 
-    protected Bytes keyBytes(final K key, final Headers headers) {
+    @Override
+    protected Bytes serializeKey(final K key) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when serializing a key.");
+    }
+
+    protected Bytes serializeKey(final K key, final Headers headers) {
         return Bytes.wrap(serdes.rawKey(key, headers));
     }
+
+    @Override
+    protected K deserializeKey(final byte[] rawKey) {
+        throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when deserializing a key.");
+    }
+
+    protected K deserializeKey(final byte[] rawKey, final Headers headers) {
+        return serdes.keyFrom(rawKey, headers);
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index c98c5590d7a..0cb35fc63b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -65,8 +65,6 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
  * @param <K> The key type
  * @param <V> The (raw) value type
  */
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
 public class MeteredVersionedKeyValueStore<K, V>
     extends WrappedStateStore<VersionedBytesStore, K, V>
     implements VersionedKeyValueStore<K, V> {
@@ -145,7 +143,15 @@ public class MeteredVersionedKeyValueStore<K, V>
         public long put(final K key, final V value, final long timestamp) {
             Objects.requireNonNull(key, "key cannot be null");
             try {
-                final long validTo = maybeMeasureLatency(() -> 
inner.put(serializeKey(key), plainValueSerdes.rawValue(value), timestamp), 
time, putSensor);
+                final long validTo = maybeMeasureLatency(
+                    () -> inner.put(
+                        serializeKey(key),
+                        plainValueSerdes.rawValue(value, 
internalContext.headers()),
+                        timestamp
+                    ),
+                    time,
+                    putSensor
+                );
                 maybeRecordE2ELatency();
                 return validTo;
             } catch (final ProcessorStateException e) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 4bcbe0089c9..3a3e79570ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
@@ -188,7 +189,7 @@ public final class StoreQueryUtils {
         return true;
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "resource"})
     private static <R> QueryResult<R> runRangeQuery(
         final Query<R> query,
         final PositionBound positionBound,
@@ -409,7 +410,7 @@ public final class StoreQueryUtils {
         }
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
+    @SuppressWarnings({"unchecked", "rawtypes", "resource"})
     public static <V> Function<byte[], V> deserializeValue(final 
StateSerdes<?, V> serdes, final StateStore wrapped) {
         final Serde<V> valueSerde = serdes.valueSerde();
         final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isAdapter(wrapped);
@@ -421,9 +422,11 @@ public final class StoreQueryUtils {
         } else {
             deserializer = valueSerde.deserializer();
         }
-        return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
+        // deserializeValue() is only used via IQ, so it's ok to not pass any 
headers
+        return byteArray -> deserializer.deserialize(serdes.topic(), new 
RecordHeaders(), byteArray);
     }
 
+    @SuppressWarnings("rawtypes")
     public static boolean isAdapter(final StateStore stateStore) {
         if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {
             return true;
@@ -434,22 +437,30 @@ public final class StoreQueryUtils {
         }
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
+    @SuppressWarnings("resource")
     public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> 
deserializeValue(final StateSerdes<?, V> serdes) {
         final Serde<V> valueSerde = serdes.valueSerde();
         final Deserializer<V> deserializer = valueSerde.deserializer();
-        return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent() 
? new VersionedRecord<>(deserializer.deserialize(serdes.topic(), 
rawVersionedRecord.value()),
-                                                                               
                       rawVersionedRecord.timestamp(),
-                                                                               
                       rawVersionedRecord.validTo().get())
-                                                                              
: new VersionedRecord<>(deserializer.deserialize(serdes.topic(), 
rawVersionedRecord.value()),
-                                                                               
                       rawVersionedRecord.timestamp());
+        return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent()
+            ? new VersionedRecord<>(
+                // deserializeValue s only used via IQ, so it's ok to not pass 
any headers
+                deserializer.deserialize(serdes.topic(), new RecordHeaders(), 
rawVersionedRecord.value()),
+                rawVersionedRecord.timestamp(),
+                rawVersionedRecord.validTo().get())
+            : new VersionedRecord<>(
+                // deserializeValue s only used via IQ, so it's ok to not pass 
any headers
+                deserializer.deserialize(serdes.topic(), new RecordHeaders(), 
rawVersionedRecord.value()),
+                rawVersionedRecord.timestamp());
     }
 
+    @SuppressWarnings("resource")
     public static <V> VersionedRecord<V> deserializeVersionedRecord(final 
StateSerdes<?, V> serdes, final VersionedRecord<byte[]> rawVersionedRecord) {
         final Deserializer<V> valueDeserializer = serdes.valueDeserializer();
-        final V value = valueDeserializer.deserialize(serdes.topic(), 
rawVersionedRecord.value());
-        return rawVersionedRecord.validTo().isPresent() ? new 
VersionedRecord<>(value, rawVersionedRecord.timestamp(), 
rawVersionedRecord.validTo().get())
-                                                        : new 
VersionedRecord<>(value, rawVersionedRecord.timestamp());
+        // deserializeValue s only used via IQ, so it's ok to not pass any 
headers
+        final V value = valueDeserializer.deserialize(serdes.topic(), new 
RecordHeaders(), rawVersionedRecord.value());
+        return rawVersionedRecord.validTo().isPresent()
+            ? new VersionedRecord<>(value, rawVersionedRecord.timestamp(), 
rawVersionedRecord.validTo().get())
+            : new VersionedRecord<>(value, rawVersionedRecord.timestamp());
     }
 
     public static void checkpointPosition(final OffsetCheckpoint 
checkpointFile, final Position position) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index fc340282988..58e6ee45e9e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -529,6 +529,7 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest 
{
             keySerde,
             valueSerde
         );
+        when(context.headers()).thenReturn(new RecordHeaders());
         mockStore.init(context, mockStore);
         return mockStore;
     }


Reply via email to