This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 43e6efe2cc3 KAFKA-20173: Propagate headers into serde 6/N (#21850)
43e6efe2cc3 is described below

commit 43e6efe2cc3dd5d258528b0ece2bf994e697240f
Author: Uladzislau Blok <[email protected]>
AuthorDate: Thu Apr 2 00:13:00 2026 +0200

    KAFKA-20173: Propagate headers into serde 6/N (#21850)
    
    This PR ensures headers are propagated in key related methods from
    `WindowKeySchema` and  `PrefixedWindowKeySchemas`  Related tests are
    updated
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../state/internals/PrefixedWindowKeySchemas.java  | 14 ++++----
 .../streams/state/internals/WindowKeySchema.java   | 11 +++---
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  6 ++--
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  2 +-
 .../internals/AbstractWindowBytesStoreTest.java    |  2 +-
 .../state/internals/InMemoryWindowStoreTest.java   |  6 ++--
 ...dSortedCacheWrappedWindowStoreIteratorTest.java | 41 ++++++++++++----------
 ...acheWrappedWindowStoreKeyValueIteratorTest.java |  6 ++--
 .../state/internals/WindowKeySchemaTest.java       | 37 +++++++++++--------
 9 files changed, 70 insertions(+), 55 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
index d8ef4acedc7..9e2b05264f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
@@ -31,8 +31,6 @@ import java.util.List;
 import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
 import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
 public class PrefixedWindowKeySchemas {
 
     private static final int PREFIX_SIZE = 1;
@@ -179,16 +177,18 @@ public class PrefixedWindowKeySchemas {
 
         public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
                                                  final int seqnum,
+                                                 final Headers headers,
                                                  final StateSerdes<K, ?> 
serdes) {
-            final byte[] serializedKey = serdes.rawKey(timeKey.key());
+            final byte[] serializedKey = serdes.rawKey(timeKey.key(), headers);
             return toStoreKeyBinary(serializedKey, timeKey.window().start(), 
seqnum);
         }
 
         public static <K> Bytes toStoreKeyBinary(final K key,
                                                  final long timestamp,
                                                  final int seqnum,
+                                                 final Headers headers,
                                                  final StateSerdes<K, ?> 
serdes) {
-            final byte[] serializedKey = serdes.rawKey(key);
+            final byte[] serializedKey = serdes.rawKey(key, headers);
             return toStoreKeyBinary(serializedKey, timestamp, seqnum);
         }
 
@@ -312,8 +312,9 @@ public class PrefixedWindowKeySchemas {
         public static <K> Bytes toStoreKeyBinary(final K key,
                                                  final long timestamp,
                                                  final int seqnum,
+                                                 final Headers headers,
                                                  final StateSerdes<K, ?> 
serdes) {
-            final byte[] serializedKey = serdes.rawKey(key);
+            final byte[] serializedKey = serdes.rawKey(key, headers);
             return toStoreKeyBinary(serializedKey, timestamp, seqnum);
         }
 
@@ -324,8 +325,9 @@ public class PrefixedWindowKeySchemas {
 
         public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
                                                  final int seqnum,
+                                                 final Headers headers,
                                                  final StateSerdes<K, ?> 
serdes) {
-            final byte[] serializedKey = serdes.rawKey(timeKey.key());
+            final byte[] serializedKey = serdes.rawKey(timeKey.key(), headers);
             return toStoreKeyBinary(serializedKey, timeKey.window().start(), 
seqnum);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 2d9c8705cc4..d294f04b663 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -33,8 +33,6 @@ import java.util.List;
 
 import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
 
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
 public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WindowKeySchema.class);
@@ -168,8 +166,9 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
     public static <K> Bytes toStoreKeyBinary(final K key,
                                              final long timestamp,
                                              final int seqnum,
+                                             final Headers headers,
                                              final StateSerdes<K, ?> serdes) {
-        final byte[] serializedKey = serdes.rawKey(key);
+        final byte[] serializedKey = serdes.rawKey(key, headers);
         return toStoreKeyBinary(serializedKey, timestamp, seqnum);
     }
 
@@ -181,8 +180,9 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
 
     public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
                                              final int seqnum,
+                                             final Headers headers,
                                              final StateSerdes<K, ?> serdes) {
-        final byte[] serializedKey = serdes.rawKey(timeKey.key());
+        final byte[] serializedKey = serdes.rawKey(timeKey.key(), headers);
         return toStoreKeyBinary(serializedKey, timeKey.window().start(), 
seqnum);
     }
 
@@ -205,10 +205,11 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
     }
 
     static <K> K extractStoreKey(final byte[] binaryKey,
+                                 final Headers headers,
                                  final StateSerdes<K, ?> serdes) {
         final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - 
SEQNUM_SIZE];
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-        return serdes.keyFrom(bytes);
+        return serdes.keyFrom(bytes, headers);
     }
 
     static long extractStoreTimestamp(final byte[] binaryKey) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 861ed349ed2..34405f2705c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1694,9 +1694,9 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
         final StateSerdes<String, Long> stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
         if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
             if (changeLog) {
-                return WindowKeySchema.toStoreKeyBinary(key, seq, stateSerdes);
+                return WindowKeySchema.toStoreKeyBinary(key, seq, new 
RecordHeaders(), stateSerdes);
             }
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq, 
stateSerdes);
+            return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq, new 
RecordHeaders(), stateSerdes);
         } else if (getBaseSchema() instanceof TimeFirstSessionKeySchema) {
             if (changeLog) {
                 return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
@@ -1710,7 +1710,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
     private Bytes serializeKeyForIndex(final Windowed<String> key) {
         final StateSerdes<String, Long> stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
         if (getIndexSchema() instanceof KeyFirstWindowKeySchema) {
-            return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0, 
stateSerdes);
+            return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0, new 
RecordHeaders(), stateSerdes);
         } else if (getIndexSchema() instanceof KeyFirstSessionKeySchema) {
             return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
         } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 201db4bebe3..a9d794c81c5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -928,7 +928,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         if (schema instanceof SessionKeySchema) {
             return Bytes.wrap(SessionKeySchema.toBinary(key, 
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
         } else if (schema instanceof WindowKeySchema) {
-            return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
+            return WindowKeySchema.toStoreKeyBinary(key, 0, new 
RecordHeaders(), stateSerdes);
         } else {
             throw new IllegalStateException("Unrecognized serde schema");
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 32c03005e1c..56cf10aea7b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -1163,7 +1163,7 @@ public abstract class AbstractWindowBytesStoreTest {
 
     <K> K extractStoreKey(final byte[] binaryKey,
                           final StateSerdes<K, ?> serdes) {
-        return WindowKeySchema.extractStoreKey(binaryKey, serdes);
+        return WindowKeySchema.extractStoreKey(binaryKey, new RecordHeaders(), 
serdes);
     }
 
     private Map<Integer, Set<String>> entriesByKey(final List<KeyValue<byte[], 
byte[]>> changeLog,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index cdc23218e73..b9461f75d05 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -143,10 +143,10 @@ public class InMemoryWindowStoreTest extends 
AbstractWindowBytesStoreTest {
         final List<KeyValue<byte[], byte[]>> restorableEntries = new 
LinkedList<>();
 
         restorableEntries
-            .add(new KeyValue<>(toStoreKeyBinary(1, 0L, 0, serdes).get(), 
serdes.rawValue("one")));
-        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(2, WINDOW_SIZE, 
0, serdes).get(),
+            .add(new KeyValue<>(toStoreKeyBinary(1, 0L, 0, new 
RecordHeaders(), serdes).get(), serdes.rawValue("one")));
+        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(2, WINDOW_SIZE, 
0, new RecordHeaders(), serdes).get(),
             serdes.rawValue("two")));
-        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(3, 2 * 
WINDOW_SIZE, 0, serdes).get(),
+        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(3, 2 * 
WINDOW_SIZE, 0, new RecordHeaders(), serdes).get(),
             serdes.rawValue("three")));
 
         context.restore(STORE_NAME, restorableEntries);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index d8e9fb7725b..0675aaea48d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -52,13 +54,14 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
 
     @FunctionalInterface
     private interface StoreKeySerializer<K> {
-        Bytes serialize(final K key, final long ts, final int seq, final 
StateSerdes<K, ?> serdes);
+        Bytes serialize(final K key, final long ts, final int seq, final 
Headers headers, final StateSerdes<K, ?> serdes);
     }
 
     private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new 
ArrayList<>();
     private final ThreadCache cache = new ThreadCache(new 
LogContext("testCache "), 1000000L,  new MockStreamsMetrics(new Metrics()));
     private final String namespace = "0.0-one";
     private final StateSerdes<String, String> stateSerdes = new 
StateSerdes<>("foo", Serdes.String(), Serdes.String());
+    private final Headers headers = new RecordHeaders();
     private Function<byte[], Long> tsExtractor;
     private StoreKeySerializer<String> storeKeySerializer;
 
@@ -97,14 +100,14 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
             final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
             windowStoreKvPairs.add(v1);
             expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
-            final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10, 
0, stateSerdes);
+            final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10, 
0, headers, stateSerdes);
             final byte[] valBytes = String.valueOf(t + 10).getBytes();
             expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
             cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
         }
 
-        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
stateSerdes);
-        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
stateSerdes);
+        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes);
+        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
headers, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
 
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(
@@ -135,14 +138,14 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
             final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
             windowStoreKvPairs.add(v1);
             expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
-            final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10, 
0, stateSerdes);
+            final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10, 
0, headers, stateSerdes);
             final byte[] valBytes = String.valueOf(t + 10).getBytes();
             expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
             cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
         }
 
-        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
stateSerdes);
-        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
stateSerdes);
+        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes);
+        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
headers, stateSerdes);
         Collections.reverse(windowStoreKvPairs);
         final KeyValueIterator<Long, byte[]> storeIterator =
             new DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
@@ -170,9 +173,9 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
     public void shouldPeekNextStoreKey(final SchemaType schemaType) {
         setUp(schemaType);
         windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
-        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0, 
stateSerdes)), new LRUCacheEntry("b".getBytes()));
-        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
stateSerdes);
-        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
stateSerdes);
+        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes);
+        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
headers, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new 
DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(
             namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
@@ -191,9 +194,9 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
     public void shouldPeekNextStoreKeyReverse(final SchemaType schemaType) {
         setUp(schemaType);
         windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
-        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0, 
stateSerdes)), new LRUCacheEntry("b".getBytes()));
-        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
stateSerdes);
-        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
stateSerdes);
+        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes);
+        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
headers, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator =
             new DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.reverseRange(
@@ -214,9 +217,9 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
     public void shouldPeekNextCacheKey(final SchemaType schemaType) {
         setUp(schemaType);
         windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
-        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L, 
0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
-        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
stateSerdes);
-        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
stateSerdes);
+        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L, 
0, headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes);
+        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
headers, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator =
             new DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(
@@ -241,9 +244,9 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
     public void shouldPeekNextCacheKeyReverse(final SchemaType schemaType) {
         setUp(schemaType);
         windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
-        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L, 
0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
-        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
stateSerdes);
-        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
stateSerdes);
+        cache.put(namespace, 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L, 
0, headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+        final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0, 
headers, stateSerdes);
+        final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0, 
headers, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator =
             new DelegatingPeekingKeyValueIterator<>("store", new 
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.reverseRange(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
index f758f3054e7..ab9fd57fd87 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -45,7 +47,7 @@ public class 
MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
 
     @FunctionalInterface
     private interface StoreKeySerializer<K> {
-        Bytes serialize(final Windowed<K> key, final int seq, final 
StateSerdes<K, ?> serdes);
+        Bytes serialize(final Windowed<K> key, final int seq, final Headers 
headers, final StateSerdes<K, ?> serdes);
     }
 
     private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION 
= new SegmentedCacheFunction(null, -1) {
@@ -100,7 +102,7 @@ public class 
MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
         cacheKvs = Collections.singleton(
             KeyValue.pair(
                 
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize(
-                    new Windowed<>(cacheKey, cacheWindow), 0, new 
StateSerdes<>("dummy", Serdes.String(), Serdes.ByteArray()))
+                    new Windowed<>(cacheKey, cacheWindow), 0, new 
RecordHeaders(), new StateSerdes<>("dummy", Serdes.String(), 
Serdes.ByteArray()))
                 ),
                 new LRUCacheEntry(cacheKey.getBytes())
             )
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 597fe5a8288..54b6b0f710d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -89,13 +90,18 @@ public class WindowKeySchemaTest {
         R apply(A a, B b, C c);
     }
 
+    @FunctionalInterface
+    interface QuadFunction<A, B, C, D, R> {
+        R apply(A a, B b, C c, D d);
+    }
+
     private static final Map<SchemaType, TriFunction<byte[], Long, Integer, 
Bytes>> BYTES_TO_STORE_BINARY_MAP = mkMap(
         mkEntry(SchemaType.WindowKeySchema, WindowKeySchema::toStoreKeyBinary),
         mkEntry(SchemaType.PrefixedKeyFirstSchema, 
KeyFirstWindowKeySchema::toStoreKeyBinary),
         mkEntry(SchemaType.PrefixedTimeFirstSchema, 
TimeFirstWindowKeySchema::toStoreKeyBinary)
     );
 
-    private static final Map<SchemaType, TriFunction<Windowed<String>, 
Integer, StateSerdes<String, byte[]>, Bytes>> SERDE_TO_STORE_BINARY_MAP = mkMap(
+    private static final Map<SchemaType, QuadFunction<Windowed<String>, 
Integer, Headers, StateSerdes<String, byte[]>, Bytes>> 
SERDE_TO_STORE_BINARY_MAP = mkMap(
         mkEntry(SchemaType.WindowKeySchema, WindowKeySchema::toStoreKeyBinary),
         mkEntry(SchemaType.PrefixedKeyFirstSchema, 
KeyFirstWindowKeySchema::toStoreKeyBinary),
         mkEntry(SchemaType.PrefixedTimeFirstSchema, 
TimeFirstWindowKeySchema::toStoreKeyBinary)
@@ -129,6 +135,7 @@ public class WindowKeySchemaTest {
     private KeySchema keySchema;
     private final Serde<Windowed<String>> keySerde = new 
WindowedSerdes.TimeWindowedSerde<>(serde, endTime - startTime);
     private final StateSerdes<String, byte[]> stateSerdes = new 
StateSerdes<>("dummy", serde, Serdes.ByteArray());
+    private final Headers headers = new RecordHeaders();
     public SchemaType schemaType;
 
     private enum SchemaType {
@@ -170,7 +177,7 @@ public class WindowKeySchemaTest {
         return EXTRACT_SEQ_MAP.get(schemaType);
     }
 
-    private TriFunction<Windowed<String>, Integer, StateSerdes<String, 
byte[]>, Bytes> getSerdeToStoreKey() {
+    private QuadFunction<Windowed<String>, Integer, Headers, 
StateSerdes<String, byte[]>, Bytes> getSerdeToStoreKey() {
         return SERDE_TO_STORE_BINARY_MAP.get(schemaType);
     }
 
@@ -476,18 +483,18 @@ public class WindowKeySchemaTest {
     @ParameterizedTest
     public void shouldConvertToBinaryAndBack(final SchemaType type) {
         setup(type);
-        final TriFunction<Windowed<String>, Integer, StateSerdes<String, 
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
-        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
stateSerdes);
+        final QuadFunction<Windowed<String>, Integer, Headers, 
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
headers, stateSerdes);
         final Windowed<String> result;
         if (schemaType == SchemaType.WindowKeySchema) {
             result = WindowKeySchema.fromStoreKey(serialized.get(),
-                endTime - startTime, stateSerdes.keyDeserializer(), new 
RecordHeaders(), stateSerdes.topic());
+                endTime - startTime, stateSerdes.keyDeserializer(), headers, 
stateSerdes.topic());
         } else if (schemaType == SchemaType.PrefixedTimeFirstSchema) {
             result = TimeFirstWindowKeySchema.fromStoreKey(serialized.get(),
-                endTime - startTime, stateSerdes.keyDeserializer(), new 
RecordHeaders(), stateSerdes.topic());
+                endTime - startTime, stateSerdes.keyDeserializer(), headers, 
stateSerdes.topic());
         } else {
             result = KeyFirstWindowKeySchema.fromStoreKey(serialized.get(),
-                endTime - startTime, stateSerdes.keyDeserializer(), new 
RecordHeaders(), stateSerdes.topic());
+                endTime - startTime, stateSerdes.keyDeserializer(), headers, 
stateSerdes.topic());
         }
         assertEquals(windowedKey, result);
     }
@@ -496,8 +503,8 @@ public class WindowKeySchemaTest {
     @ParameterizedTest
     public void shouldExtractSequenceFromBinary(final SchemaType type) {
         setup(type);
-        final TriFunction<Windowed<String>, Integer, StateSerdes<String, 
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
-        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
stateSerdes);
+        final QuadFunction<Windowed<String>, Integer, Headers, 
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
headers, stateSerdes);
         final Function<byte[], Integer> extractStoreSequence = 
getExtractSeqFunc();
         assertEquals(0, (int) extractStoreSequence.apply(serialized.get()));
     }
@@ -506,8 +513,8 @@ public class WindowKeySchemaTest {
     @ParameterizedTest
     public void shouldExtractStartTimeFromBinary(final SchemaType type) {
         setup(type);
-        final TriFunction<Windowed<String>, Integer, StateSerdes<String, 
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
-        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
stateSerdes);
+        final QuadFunction<Windowed<String>, Integer, Headers, 
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
headers, stateSerdes);
         final Function<byte[], Long> extractStoreTimestamp = 
getExtractTimestampFunc();
         assertEquals(startTime, (long) 
extractStoreTimestamp.apply(serialized.get()));
     }
@@ -516,8 +523,8 @@ public class WindowKeySchemaTest {
     @ParameterizedTest
     public void shouldExtractWindowFromBinary(final SchemaType type) {
         setup(type);
-        final TriFunction<Windowed<String>, Integer, StateSerdes<String, 
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
-        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
stateSerdes);
+        final QuadFunction<Windowed<String>, Integer, Headers, 
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
headers, stateSerdes);
         final BiFunction<byte[], Long, Window> extractStoreWindow = 
getExtractStoreWindow();
         assertEquals(window, extractStoreWindow.apply(serialized.get(), 
endTime - startTime));
     }
@@ -526,8 +533,8 @@ public class WindowKeySchemaTest {
     @ParameterizedTest
     public void shouldExtractKeyBytesFromBinary(final SchemaType type) {
         setup(type);
-        final TriFunction<Windowed<String>, Integer, StateSerdes<String, 
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
-        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
stateSerdes);
+        final QuadFunction<Windowed<String>, Integer, Headers, 
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+        final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0, 
headers, stateSerdes);
         final Function<byte[], byte[]> extractStoreKeyBytes = 
getExtractStorageKey();
         assertArrayEquals(key.getBytes(), 
extractStoreKeyBytes.apply(serialized.get()));
     }

Reply via email to