This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1f901cdb29 KAFKA-20132: Fix header-based key deserialization in 
MeteredTimestampedKeyValueStoreWithHeaders iterator methods (#21736)
c1f901cdb29 is described below

commit c1f901cdb29986b20859ddd38c045e6e699540ff
Author: TengYao Chi <[email protected]>
AuthorDate: Sun Mar 15 16:42:03 2026 +0000

    KAFKA-20132: Fix header-based key deserialization in 
MeteredTimestampedKeyValueStoreWithHeaders iterator methods (#21736)
    
    Fixes bugs where `MeteredTimestampedKeyValueStoreWithHeaders` iterator
    methods fail when key deserializers require headers.
    
     The class inherits iterator-returning methods from
    `MeteredKeyValueStore`:    - `range(K, K)` / `reverseRange(K, K)`    -
    `all()` / `reverseAll()`    - `prefixScan(P, PS)`
    
      These methods use `deserializeKey(rawKey)` which calls
    `serdes.keyFrom(rawKey, internalContext.headers())`, using the current
    processing context's headers instead of each record's own headers.
    
      Additionally, the `peekNextKey()` implementation in iterator classes
    deserializes keys with empty headers.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../state/internals/MeteredKeyValueStore.java      |   6 +-
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 131 ++++++++++++++-
 .../MeteredTimestampedWindowStoreWithHeaders.java  |   7 +-
 ...redTimestampedKeyValueStoreWithHeadersTest.java | 163 +++++++++++++++++++
 ...teredTimestampedWindowStoreWithHeadersTest.java | 175 ++++-----------------
 5 files changed, 325 insertions(+), 157 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index cd6fa23fa53..3be8fc8a962 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -88,9 +88,9 @@ public class MeteredKeyValueStore<K, V>
     protected Sensor getSensor;
     protected Sensor deleteSensor;
     private Sensor putAllSensor;
-    private Sensor allSensor;
-    private Sensor rangeSensor;
-    private Sensor prefixScanSensor;
+    protected Sensor allSensor;
+    protected Sensor rangeSensor;
+    protected Sensor prefixScanSensor;
     private Sensor flushSensor;
     private Sensor e2eLatencySensor;
     protected Sensor iteratorDurationSensor;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 8730aadcc11..86624961650 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
@@ -312,6 +313,57 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         return result;
     }
 
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V>> prefixScan(final P prefix,
+                                                                               
                   final PS prefixKeySerializer) {
+        Objects.requireNonNull(prefix, "prefix cannot be null");
+        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
+        return new MeteredValueTimestampHeadersIterator(
+            wrapped().prefixScan(prefix, prefixKeySerializer),
+            prefixScanSensor
+        );
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
+                                                               final K to) {
+        return new MeteredValueTimestampHeadersIterator(
+            wrapped().range(
+                keyBytes(from, new RecordHeaders()),
+                keyBytes(to, new RecordHeaders())
+            ),
+            rangeSensor
+        );
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K 
from,
+                                                                      final K 
to) {
+        return new MeteredValueTimestampHeadersIterator(
+            wrapped().reverseRange(
+                keyBytes(from, new RecordHeaders()),
+                keyBytes(to, new RecordHeaders())
+            ),
+            rangeSensor
+        );
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
+        return new MeteredValueTimestampHeadersIterator(
+            wrapped().all(),
+            allSensor
+        );
+    }
+
+    @Override
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
+        return new MeteredValueTimestampHeadersIterator(
+            wrapped().reverseAll(),
+            allSensor
+        );
+    }
+
     @SuppressWarnings("unchecked")
     private class MeteredTimestampedKeyValueStoreWithHeadersIterator 
implements KeyValueIterator<K, V>, MeteredIterator {
 
@@ -322,6 +374,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         private final Function<byte[], ValueTimestampHeaders<V>> 
valueTimestampHeadersDeserializer;
 
         private final boolean returnPlainValue;
+        private KeyValue<K, V> cachedNext;
 
         private MeteredTimestampedKeyValueStoreWithHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
                                                                    final 
Sensor sensor,
@@ -343,11 +396,17 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
 
         @Override
         public boolean hasNext() {
-            return iter.hasNext();
+            return cachedNext != null || iter.hasNext();
         }
 
         @Override
         public KeyValue<K, V> next() {
+            if (cachedNext != null) {
+                final KeyValue<K, V> result = cachedNext;
+                cachedNext = null;
+                return result;
+            }
+
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
valueTimestampHeadersDeserializer.apply(keyValue.value);
             final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
@@ -383,12 +442,78 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
 
         @Override
         public K peekNextKey() {
-            return serdes.keyFrom(iter.peekNextKey().get(), new 
RecordHeaders());
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext.key;
+        }
+    }
+
+    private class MeteredValueTimestampHeadersIterator implements 
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final long startTimestampMs;
+        private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
+
+        private MeteredValueTimestampHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                     final Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+            this.startTimestampMs = time.milliseconds();
+            numOpenIterators.increment();
+            openIterators.add(this);
+        }
+
+        @Override
+        public long startTimestamp() {
+            return startTimestampMs;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cachedNext != null || iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, ValueTimestampHeaders<V>> next() {
+            if (cachedNext != null) {
+                final KeyValue<K, ValueTimestampHeaders<V>> result = 
cachedNext;
+                cachedNext = null;
+                return result;
+            }
+
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
serdes.valueFrom(keyValue.value, new RecordHeaders());
+            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
+            final K key = serdes.keyFrom(keyValue.key.get(), headers);
+            return KeyValue.pair(key, valueTimestampHeaders);
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                final long duration = time.nanoseconds() - startNs;
+                sensor.record(duration);
+                iteratorDurationSensor.record(duration);
+                numOpenIterators.decrement();
+                openIterators.remove(this);
+            }
+        }
+
+        @Override
+        public K peekNextKey() {
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext.key;
         }
     }
 
     protected Bytes keyBytes(final K key, final Headers headers) {
         return Bytes.wrap(serdes.rawKey(key, headers));
     }
-
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 2131bb4ec96..13bff9c6e21 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -375,11 +375,6 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
             }
 
             final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
-
-            if (next == null) {
-                return null;
-            }
-
             final ValueTimestampHeaders<V> valueTimestampHeaders = 
serdes.valueFrom(next.value, new RecordHeaders());
             final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
             final K key = serdes.keyFrom(next.key.key().get(), headers);
@@ -405,7 +400,7 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
             if (cachedNext == null) {
                 cachedNext = next();
             }
-            return cachedNext == null ? null : cachedNext.key;
+            return cachedNext.key;
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index 427b495e157..851891ecced 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -65,6 +65,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -99,6 +100,7 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest {
     );
     private final Metrics metrics = new Metrics();
     private Map<String, String> tags;
+    private Deserializer<String> keyDeserializer;
 
     private void setUpWithoutContext() {
         mockTime = new MockTime();
@@ -497,4 +499,165 @@ public class 
MeteredTimestampedKeyValueStoreWithHeadersTest {
     private KafkaMetric metric(final MetricName metricName) {
         return this.metrics.metric(metricName);
     }
+
+    @SuppressWarnings("unchecked")
+    private MeteredTimestampedKeyValueStoreWithHeaders<String, String> 
createStoreWithMockSerdes() {
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        keyDeserializer = mock(Deserializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final MeteredTimestampedKeyValueStoreWithHeaders<String, String> 
mockStore = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        mockStore.init(context, mockStore);
+        return mockStore;
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInRange() {
+        setUp();
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+        when(inner.range(any(Bytes.class), any(Bytes.class)))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = createStoreWithMockSerdes();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.range("a", "z");
+
+        assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();
+
+        assertEquals(KEY, result.key);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        // The critical verification: key deserializer must have been called 
with HEADERS (not empty headers)
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInReverseRange() {
+        setUp();
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+        when(inner.reverseRange(any(Bytes.class), any(Bytes.class)))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = createStoreWithMockSerdes();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.reverseRange("a", "z");
+
+        assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();
+
+        assertEquals(KEY, result.key);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        // The critical verification: key deserializer must have been called 
with HEADERS (not empty headers)
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
+        setUp();
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+        when(inner.all())
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = createStoreWithMockSerdes();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.all();
+
+        assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();
+
+        assertEquals(KEY, result.key);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        // The critical verification: key deserializer must have been called 
with HEADERS (not empty headers)
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInReverseAll() {
+        setUp();
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+        when(inner.reverseAll())
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = createStoreWithMockSerdes();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.reverseAll();
+
+        assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();
+
+        assertEquals(KEY, result.key);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        // The critical verification: key deserializer must have been called 
with HEADERS (not empty headers)
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInPrefixScan() {
+        setUp();
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+        when(inner.prefixScan(any(), any()))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = createStoreWithMockSerdes();
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.prefixScan("prefix", Serdes.String().serializer());
+
+        assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();
+
+        assertEquals(KEY, result.key);
+        assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+
+        // The critical verification: key deserializer must have been called 
with HEADERS (not empty headers)
+        verify(keyDeserializer).deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes()));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
index 5501b8bc48e..ec379f578bd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
@@ -88,6 +88,7 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
     private WindowStore<Bytes, byte[]> innerStoreMock;
     private final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
     private MeteredTimestampedWindowStoreWithHeaders<String, String> store;
+    private Deserializer<String> keyDeserializer;
 
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
@@ -290,14 +291,11 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
         verify(innerStoreMock).put(KEY_BYTES, VALUE_TIMESTAMP_HEADERS_BYTES, 
TIMESTAMP);
     }
 
-    @Test
     @SuppressWarnings("unchecked")
-    public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
-        setUp();
-
+    private MeteredTimestampedWindowStoreWithHeaders<String, String> 
createStoreWithMockSerdes() {
         final Serde<String> keySerde = mock(Serde.class);
         final Serializer<String> keySerializer = mock(Serializer.class);
-        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        keyDeserializer = mock(Deserializer.class);
         final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
         final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
 
@@ -313,13 +311,7 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
         lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
             .thenReturn(KEY);
 
-        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
-        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
-        when(innerStoreMock.fetchAll(0, 100))
-            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
-
-        store = new MeteredTimestampedWindowStoreWithHeaders<>(
+        final MeteredTimestampedWindowStoreWithHeaders<String, String> 
mockStore = new MeteredTimestampedWindowStoreWithHeaders<>(
             innerStoreMock,
             WINDOW_SIZE_MS,
             STORE_TYPE,
@@ -327,11 +319,26 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
             keySerde,
             valueSerde
         );
-        store.init(context, store);
+        mockStore.init(context, mockStore);
+        return mockStore;
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
+        setUp();
+
+        final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
+        final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+        when(innerStoreMock.fetchAll(0, 100))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        store = createStoreWithMockSerdes();
 
         final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.fetchAll(0, 100);
 
         assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey().key());
         final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
 
         assertEquals(KEY, result.key.key());
@@ -348,41 +355,17 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
     public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
         setUp();
 
-        final Serde<String> keySerde = mock(Serde.class);
-        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
-        final Serializer<String> keySerializer = mock(Serializer.class);
-        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
-        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
-
-        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
-        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
-        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
-        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
-            .thenReturn(VALUE_TIMESTAMP_HEADERS);
-        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
-            .thenReturn(KEY);
-
         final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
         final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
         when(innerStoreMock.all())
             .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
 
-        store = new MeteredTimestampedWindowStoreWithHeaders<>(
-            innerStoreMock,
-            WINDOW_SIZE_MS,
-            STORE_TYPE,
-            new MockTime(),
-            keySerde,
-            valueSerde
-        );
-        store.init(context, store);
+        store = createStoreWithMockSerdes();
 
         final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all();
 
         assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey().key());
         final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
 
         assertEquals(KEY, result.key.key());
@@ -398,42 +381,18 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
     public void shouldUseHeadersFromValueToDeserializeKeyInFetchRange() {
         setUp();
 
-        final Serde<String> keySerde = mock(Serde.class);
-        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
-        final Serializer<String> keySerializer = mock(Serializer.class);
-        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
-        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
-
-        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
-        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
-        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
-        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
-            .thenReturn(VALUE_TIMESTAMP_HEADERS);
-        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
-            .thenReturn(KEY);
-
         final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
         final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
         when(innerStoreMock.fetch(any(Bytes.class), any(Bytes.class), eq(0L), 
eq(100L)))
             .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
 
-        store = new MeteredTimestampedWindowStoreWithHeaders<>(
-            innerStoreMock,
-            WINDOW_SIZE_MS,
-            STORE_TYPE,
-            new MockTime(),
-            keySerde,
-            valueSerde
-        );
-        store.init(context, store);
+        store = createStoreWithMockSerdes();
 
         final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator =
             store.fetch(KEY, KEY, 0, 100);
 
         assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey().key());
         final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
=
             iterator.next();
 
@@ -450,43 +409,17 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
     public void shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchAll() {
         setUp();
 
-        final Serde<String> keySerde = mock(Serde.class);
-        final Serializer<String> keySerializer = mock(Serializer.class);
-        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
-        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
-        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
-
-        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
-        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
-        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
-
-        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
-            .thenReturn(VALUE_TIMESTAMP_HEADERS);
-
-        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
-            .thenReturn(KEY);
-
         final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
         final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
         when(innerStoreMock.backwardFetchAll(0, 100))
             .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
 
-        store = new MeteredTimestampedWindowStoreWithHeaders<>(
-            innerStoreMock,
-            WINDOW_SIZE_MS,
-            STORE_TYPE,
-            new MockTime(),
-            keySerde,
-            valueSerde
-        );
-        store.init(context, store);
+        store = createStoreWithMockSerdes();
 
         final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.backwardFetchAll(0, 100);
 
         assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey().key());
         final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
 
         assertEquals(KEY, result.key.key());
@@ -502,41 +435,17 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
     public void shouldUseHeadersFromValueToDeserializeKeyInBackwardAll() {
         setUp();
 
-        final Serde<String> keySerde = mock(Serde.class);
-        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
-        final Serializer<String> keySerializer = mock(Serializer.class);
-        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
-        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
-
-        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
-        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
-        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
-        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
-            .thenReturn(VALUE_TIMESTAMP_HEADERS);
-        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
-            .thenReturn(KEY);
-
         final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
         final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
         when(innerStoreMock.backwardAll())
             .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
 
-        store = new MeteredTimestampedWindowStoreWithHeaders<>(
-            innerStoreMock,
-            WINDOW_SIZE_MS,
-            STORE_TYPE,
-            new MockTime(),
-            keySerde,
-            valueSerde
-        );
-        store.init(context, store);
+        store = createStoreWithMockSerdes();
 
         final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.backwardAll();
 
         assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey().key());
         final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
= iterator.next();
 
         assertEquals(KEY, result.key.key());
@@ -552,42 +461,18 @@ public class MeteredTimestampedWindowStoreWithHeadersTest 
{
     public void 
shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchRange() {
         setUp();
 
-        final Serde<String> keySerde = mock(Serde.class);
-        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
-        final Serializer<String> keySerializer = mock(Serializer.class);
-        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
-        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
-
-        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
-        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
-        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
-        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
-            .thenReturn(VALUE_TIMESTAMP_HEADERS);
-        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
-            .thenReturn(KEY);
-
         final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new 
TimeWindow(0, WINDOW_SIZE_MS));
         final KeyValue<Windowed<Bytes>, byte[]> testData = 
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
         when(innerStoreMock.backwardFetch(any(Bytes.class), any(Bytes.class), 
eq(0L), eq(100L)))
             .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
 
-        store = new MeteredTimestampedWindowStoreWithHeaders<>(
-            innerStoreMock,
-            WINDOW_SIZE_MS,
-            STORE_TYPE,
-            new MockTime(),
-            keySerde,
-            valueSerde
-        );
-        store.init(context, store);
+        store = createStoreWithMockSerdes();
 
         final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator =
             store.backwardFetch(KEY, KEY, 0, 100);
 
         assertTrue(iterator.hasNext());
+        assertEquals(KEY, iterator.peekNextKey().key());
         final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result 
=
             iterator.next();
 


Reply via email to