This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1bf7d6032b9 KAFKA-20173: Propagate headers into serde 8/N  (#21967)
1bf7d6032b9 is described below

commit 1bf7d6032b9b4171f510dc9a1c7e0243410e27f5
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Apr 5 04:56:49 2026 +0200

    KAFKA-20173: Propagate headers into serde 8/N  (#21967)
    
    Ensures that `ChangedSerde`,  `SubscriptionWrapperSerde` and
    `ValueAndTimestampSerde` forward headers to their underlying serdes.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kstream/internals/ChangedDeserializer.java     |   2 +-
 .../kstream/internals/ChangedSerializer.java       |   2 +-
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  35 ++++---
 .../internals/ValueAndTimestampDeserializer.java   |  17 +++-
 .../internals/ValueAndTimestampSerializer.java     |  16 +++-
 .../kstream/internals/ChangedSerdeTest.java        |  54 +++++++++--
 .../SubscriptionWrapperSerdeTest.java              | 106 +++++++++++++++++----
 .../ValueAndTimestampDeserializerTest.java         |  54 +++++++++++
 .../internals/ValueAndTimestampSerializerTest.java |  47 +++++----
 9 files changed, 269 insertions(+), 64 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 6966ec12373..9fe62752940 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -141,7 +141,7 @@ public class ChangedDeserializer<T> implements 
Deserializer<Change<T>>, Wrapping
 
     @Override
     public Change<T> deserialize(final String topic, final byte[] data) {
-        return deserialize(topic, null, data);
+        throw new UnsupportedOperationException("ChangedDeserializer requires 
the headers-aware version of deserialize");
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index d7d88f71b9f..ee968fea62f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -134,7 +134,7 @@ public class ChangedSerializer<T> implements 
Serializer<Change<T>>, WrappingNull
 
     @Override
     public byte[] serialize(final String topic, final Change<T> data) {
-        return serialize(topic, null, data);
+        throw new UnsupportedOperationException("ChangedSerializer requires 
the headers-aware version of serialize");
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 064d464d8be..2c473e58e36 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -94,7 +95,7 @@ public class SubscriptionWrapperSerde<KLeft> extends 
WrappingNullableSerde<Subsc
         }
 
         @Override
-        public byte[] serialize(final String ignored, final 
SubscriptionWrapper<KLeft> data) {
+        public byte[] serialize(final String ignored, final Headers headers, 
final SubscriptionWrapper<KLeft> data) {
             
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
 
             if (data.version() < 0) {
@@ -103,27 +104,33 @@ public class SubscriptionWrapperSerde<KLeft> extends 
WrappingNullableSerde<Subsc
 
             final int version = data.version();
             if (upgradeFromV0 || version == 0) {
-                return serializeV0(data);
+                return serializeV0(data, headers);
             } else if (version == 1) {
-                return serializeV1(data);
+                return serializeV1(data, headers);
             } else {
                 throw new UnsupportedVersionException("Unsupported 
SubscriptionWrapper version " + data.version());
             }
         }
 
-        private byte[] serializePrimaryKey(final SubscriptionWrapper<KLeft> 
data) {
+        @Override
+        public byte[] serialize(final String ignored, final 
SubscriptionWrapper<KLeft> data) {
+            throw new 
UnsupportedOperationException("SubscriptionWrapperSerializer requires the 
headers-aware version of serialize");
+        }
+
+        private byte[] serializePrimaryKey(final SubscriptionWrapper<KLeft> 
data, final Headers headers) {
             if (primaryKeySerializationPseudoTopic == null) {
                 primaryKeySerializationPseudoTopic = 
primaryKeySerializationPseudoTopicSupplier.get();
             }
 
             return  primaryKeySerializer.serialize(
                 primaryKeySerializationPseudoTopic,
+                headers,
                 data.primaryKey()
             );
         }
 
-        private ByteBuffer serializeCommon(final SubscriptionWrapper<KLeft> 
data, final byte version, final int extraLength) {
-            final byte[] primaryKeySerializedData = serializePrimaryKey(data);
+        private ByteBuffer serializeCommon(final SubscriptionWrapper<KLeft> 
data, final Headers headers, final byte version, final int extraLength) {
+            final byte[] primaryKeySerializedData = serializePrimaryKey(data, 
headers);
             final ByteBuffer buf;
             int dataLength = 2 + primaryKeySerializedData.length + extraLength;
             if (data.hash() != null) {
@@ -145,12 +152,12 @@ public class SubscriptionWrapperSerde<KLeft> extends 
WrappingNullableSerde<Subsc
             return buf;
         }
 
-        private byte[] serializeV0(final SubscriptionWrapper<KLeft> data) {
-            return serializeCommon(data, (byte) 0, 0).array();
+        private byte[] serializeV0(final SubscriptionWrapper<KLeft> data, 
final Headers headers) {
+            return serializeCommon(data, headers, (byte) 0, 0).array();
         }
 
-        private byte[] serializeV1(final SubscriptionWrapper<KLeft> data) {
-            final ByteBuffer buf = serializeCommon(data, data.version(), 
Integer.BYTES);
+        private byte[] serializeV1(final SubscriptionWrapper<KLeft> data, 
final Headers headers) {
+            final ByteBuffer buf = serializeCommon(data, headers, 
data.version(), Integer.BYTES);
             buf.putInt(data.primaryPartition());
             return buf.array();
         }
@@ -178,7 +185,7 @@ public class SubscriptionWrapperSerde<KLeft> extends 
WrappingNullableSerde<Subsc
         }
 
         @Override
-        public SubscriptionWrapper<KLeft> deserialize(final String ignored, 
final byte[] data) {
+        public SubscriptionWrapper<KLeft> deserialize(final String ignored, 
final Headers headers, final byte[] data) {
             
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
             final ByteBuffer buf = ByteBuffer.wrap(data);
             final byte versionAndIsHashNull = buf.get();
@@ -212,6 +219,7 @@ public class SubscriptionWrapperSerde<KLeft> extends 
WrappingNullableSerde<Subsc
 
             final KLeft primaryKey = primaryKeyDeserializer.deserialize(
                 primaryKeySerializationPseudoTopic,
+                headers,
                 primaryKeyRaw
             );
             final Integer primaryPartition;
@@ -224,6 +232,11 @@ public class SubscriptionWrapperSerde<KLeft> extends 
WrappingNullableSerde<Subsc
             return new SubscriptionWrapper<>(hash, inst, primaryKey, version, 
primaryPartition);
         }
 
+        @Override
+        public SubscriptionWrapper<KLeft> deserialize(final String ignored, 
final byte[] data) {
+            throw new 
UnsupportedOperationException("SubscriptionWrapperDeserializer requires the 
headers-aware version of deserialize");
+        }
+
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 5f6cd98b763..d68a1f727bf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
@@ -50,12 +52,19 @@ class ValueAndTimestampDeserializer<V> implements 
WrappingNullableDeserializer<V
     @Override
     public ValueAndTimestamp<V> deserialize(final String topic,
                                             final byte[] valueAndTimestamp) {
+        return deserialize(topic, new RecordHeaders(), valueAndTimestamp);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic,
+                                            final Headers headers,
+                                            final byte[] valueAndTimestamp) {
         if (valueAndTimestamp == null) {
             return null;
         }
 
-        final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp(valueAndTimestamp));
-        final V value = valueDeserializer.deserialize(topic, 
rawValue(valueAndTimestamp));
+        final long timestamp = timestampDeserializer.deserialize(topic, 
headers, rawTimestamp(valueAndTimestamp));
+        final V value = valueDeserializer.deserialize(topic, headers, 
rawValue(valueAndTimestamp));
         return ValueAndTimestamp.make(value, timestamp);
     }
 
@@ -85,7 +94,9 @@ class ValueAndTimestampDeserializer<V> implements 
WrappingNullableDeserializer<V
     }
 
     static long timestamp(final byte[] rawValueAndTimestamp) {
-        return LONG_DESERIALIZER.deserialize(null, 
rawTimestamp(rawValueAndTimestamp));
+        // We know we use LongDeserializer here, so we don't need headers
+        // still use headers-aware version to be consistent in codebase
+        return LONG_DESERIALIZER.deserialize(null, null, 
rawTimestamp(rawValueAndTimestamp));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index e64e5e18b7f..6e0ad748e10 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
@@ -68,19 +70,27 @@ public class ValueAndTimestampSerializer<V> implements 
WrappingNullableSerialize
     @Override
     public byte[] serialize(final String topic,
                             final ValueAndTimestamp<V> data) {
+        return serialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public byte[] serialize(final String topic,
+                            final Headers headers,
+                            final ValueAndTimestamp<V> data) {
         if (data == null) {
             return null;
         }
-        return serialize(topic, data.value(), data.timestamp());
+        return serialize(topic, headers, data.value(), data.timestamp());
     }
 
     public byte[] serialize(final String topic,
+                            final Headers headers,
                             final V data,
                             final long timestamp) {
         if (data == null) {
             return null;
         }
-        final byte[] rawValue = valueSerializer.serialize(topic, data);
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
 
         // Since we can't control the result of the internal serializer, we 
make sure that the result
         // is not null as well.
@@ -91,7 +101,7 @@ public class ValueAndTimestampSerializer<V> implements 
WrappingNullableSerialize
             return null;
         }
 
-        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
headers, timestamp);
         return ByteBuffer
             .allocate(rawTimestamp.length + rawValue.length)
             .put(rawTimestamp)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
index 50dea706aeb..2024a5b557c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
@@ -16,9 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.internals.ByteUtils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -33,9 +37,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ChangedSerdeTest {
     private static final String TOPIC = "some-topic";
+    private static final Headers HEADERS = new RecordHeaders().add("key", 
"value".getBytes());
 
     private static final Serializer<String> STRING_SERIALIZER = 
Serdes.String().serializer();
     private static final ChangedSerializer<String> CHANGED_STRING_SERIALIZER =
@@ -51,9 +60,9 @@ public class ChangedSerdeTest {
     final String nonNullOldValue = "world";
 
     private static <T> void checkRoundTrip(final T data, final Serializer<T> 
serializer, final Deserializer<T> deserializer) {
-        final byte[] serialized = serializer.serialize(TOPIC, data);
+        final byte[] serialized = serializer.serialize(TOPIC, HEADERS, data);
         assertThat(serialized, is(notNullValue()));
-        final T deserialized = deserializer.deserialize(TOPIC, serialized);
+        final T deserialized = deserializer.deserialize(TOPIC, HEADERS, 
serialized);
         assertThat(deserialized, is(data));
     }
 
@@ -63,7 +72,7 @@ public class ChangedSerdeTest {
 
         assertThrows(
                 StreamsException.class,
-                () -> CHANGED_STRING_SERIALIZER.serialize(TOPIC, data));
+                () -> CHANGED_STRING_SERIALIZER.serialize(TOPIC, HEADERS, 
data));
     }
 
     @Test
@@ -75,7 +84,7 @@ public class ChangedSerdeTest {
 
         assertThrows(
                 StreamsException.class,
-                () -> serializer.serialize(TOPIC, data));
+                () -> serializer.serialize(TOPIC, HEADERS, data));
     }
 
     @Test
@@ -99,7 +108,7 @@ public class ChangedSerdeTest {
     @Test
     public void shouldThrowErrorIfEncountersAnUnknownByteValueForOldNewFlag() {
         final Change<String> data = new Change<>(null, nonNullOldValue);
-        final byte[] serialized = CHANGED_STRING_SERIALIZER.serialize(TOPIC, 
data);
+        final byte[] serialized = CHANGED_STRING_SERIALIZER.serialize(TOPIC, 
HEADERS, data);
         assertThat(serialized, is(notNullValue()));
 
         // mutate the serialized array to replace OLD_NEW_FLAG with an 
unsupported byte value
@@ -109,7 +118,7 @@ public class ChangedSerdeTest {
 
         assertThrows(
             StreamsException.class,
-            () -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized));
+            () -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, HEADERS, 
serialized));
     }
 
     @Test
@@ -173,7 +182,38 @@ public class ChangedSerdeTest {
     private static void checkRoundTripForReservedVersion(final Change<String> 
data) {
         final byte[] serialized = serializeVersions3Through5(TOPIC, data);
         assertThat(serialized, is(notNullValue()));
-        final Change<String> deserialized = 
CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized);
+        final Change<String> deserialized = 
CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, HEADERS, serialized);
         assertThat(deserialized, is(data));
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final ChangedSerializer<String> changedSerializer = new 
ChangedSerializer<>(mockSerializer);
+
+        final String value = "value";
+        final Change<String> data = new Change<>(value, null);
+        when(mockSerializer.serialize(TOPIC, HEADERS, 
value)).thenReturn(value.getBytes());
+
+        changedSerializer.serialize(TOPIC, HEADERS, data);
+
+        verify(mockSerializer).serialize(TOPIC, HEADERS, value);
+        verify(mockSerializer, never()).serialize(TOPIC, value);
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        final ChangedDeserializer<String> changedDeserializer = new 
ChangedDeserializer<>(mockDeserializer);
+
+        final String value = "value";
+        final Change<String> data = new Change<>(value, null);
+
+        final byte[] serialized = CHANGED_STRING_SERIALIZER.serialize(TOPIC, 
HEADERS, data);
+
+        changedDeserializer.deserialize(TOPIC, HEADERS, serialized);
+
+        verify(mockDeserializer).deserialize(TOPIC, HEADERS, value.getBytes());
+        verify(mockDeserializer, never()).deserialize(TOPIC, value.getBytes());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index db2842401b5..3d9d53feebb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -17,7 +17,14 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.internals.Murmur3;
 
@@ -29,16 +36,23 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class SubscriptionWrapperSerdeTest {
 
+    private static final String TOPIC = "pkTopic";
+    private static final Headers HEADERS = new RecordHeaders().add("key", 
"value".getBytes());
+
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeV0Test() {
         final byte version = SubscriptionWrapper.VERSION_0;
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, 
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final Integer primaryPartition = null;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -47,9 +61,9 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             version,
             primaryPartition);
-        final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
+        final byte[] serialized = swSerde.serializer().serialize(null, 
HEADERS, wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
-            .deserialize(null, serialized);
+            .deserialize(null, HEADERS, serialized);
 
         assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.instruction());
         assertArrayEquals(hashedValue, deserialized.hash());
@@ -63,7 +77,7 @@ public class SubscriptionWrapperSerdeTest {
     public void shouldSerdeV1Test() {
         final byte version = SubscriptionWrapper.VERSION_1;
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, 
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final Integer primaryPartition = 10;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -72,9 +86,9 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             version,
             primaryPartition);
-        final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
+        final byte[] serialized = swSerde.serializer().serialize(null, 
HEADERS, wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
-            .deserialize(null, serialized);
+            .deserialize(null, HEADERS, serialized);
 
         assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.instruction());
         assertArrayEquals(hashedValue, deserialized.hash());
@@ -88,7 +102,7 @@ public class SubscriptionWrapperSerdeTest {
     public void shouldSerdeWithV0IfUpgradeTest() {
         final byte version = SubscriptionWrapper.VERSION_1;
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         swSerde.configure(
             Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, 
StreamsConfig.UPGRADE_FROM_32),
             true);
@@ -100,9 +114,9 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             version,
             primaryPartition);
-        final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
+        final byte[] serialized = swSerde.serializer().serialize(null, 
HEADERS, wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
-            .deserialize(null, serialized);
+            .deserialize(null, HEADERS, serialized);
 
         assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.instruction());
         assertArrayEquals(hashedValue, deserialized.hash());
@@ -116,7 +130,7 @@ public class SubscriptionWrapperSerdeTest {
     public void shouldSerdeNullHashV0Test() {
         final byte version = SubscriptionWrapper.VERSION_0;
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         final long[] hashedValue = null;
         final Integer primaryPartition = null;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -125,8 +139,8 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             version,
             primaryPartition);
-        final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
-        final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer().deserialize(null, serialized);
+        final byte[] serialized = swSerde.serializer().serialize(null, 
HEADERS, wrapper);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer().deserialize(null, HEADERS, serialized);
 
         
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.instruction());
         assertArrayEquals(hashedValue, deserialized.hash());
@@ -140,7 +154,7 @@ public class SubscriptionWrapperSerdeTest {
     public void shouldSerdeNullHashV1Test() {
         final byte version = SubscriptionWrapper.VERSION_1;
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         final long[] hashedValue = null;
         final Integer primaryPartition = 10;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -149,9 +163,9 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             version,
             primaryPartition);
-        final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
+        final byte[] serialized = swSerde.serializer().serialize(null, 
HEADERS, wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
-            .deserialize(null, serialized);
+            .deserialize(null, HEADERS, serialized);
 
         
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.instruction());
         assertArrayEquals(hashedValue, deserialized.hash());
@@ -163,7 +177,7 @@ public class SubscriptionWrapperSerdeTest {
     @Test
     public void shouldSerdeNullPrimaryPartitionOnV0Test() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, 
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final Integer primaryPartition = null;
         final byte version = SubscriptionWrapper.VERSION_0;
@@ -173,8 +187,8 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             version,
             primaryPartition);
-        final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
-        final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer().deserialize(null, serialized);
+        final byte[] serialized = swSerde.serializer().serialize(null, 
HEADERS, wrapper);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer().deserialize(null, HEADERS, serialized);
 
         
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.instruction());
         assertArrayEquals(hashedValue, deserialized.hash());
@@ -235,7 +249,7 @@ public class SubscriptionWrapperSerdeTest {
 
     @Test
     public void shouldThrowExceptionOnNullPrimaryPartitionV1Test() {
-        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
         final String originalKey = "originalKey";
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, 
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final Integer primaryPartition = null;
@@ -245,7 +259,7 @@ public class SubscriptionWrapperSerdeTest {
             originalKey,
             SubscriptionWrapper.VERSION_1,
             primaryPartition);
-        assertThrows(NullPointerException.class, () -> 
swSerde.serializer().serialize(null, wrapper));
+        assertThrows(NullPointerException.class, () -> 
swSerde.serializer().serialize(null, HEADERS, wrapper));
     }
 
     @Test
@@ -260,4 +274,56 @@ public class SubscriptionWrapperSerdeTest {
             (byte) 0x80,
             primaryPartition));
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.serializer()).thenReturn(mockSerializer);
+
+        final String primaryKey = "originalKey";
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, 
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = null;
+        when(mockSerializer.serialize(TOPIC, HEADERS, 
primaryKey)).thenReturn(primaryKey.getBytes());
+
+        final SubscriptionWrapper<String> wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            primaryKey,
+            SubscriptionWrapper.VERSION_0,
+            primaryPartition);
+
+        final SubscriptionWrapperSerde<String> swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, mockSerde);
+        swSerde.serializer().serialize(TOPIC, HEADERS, wrapper);
+
+        verify(mockSerializer).serialize(TOPIC, HEADERS, primaryKey);
+        verify(mockSerializer, never()).serialize(TOPIC, primaryKey);
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+        when(mockSerde.serializer()).thenReturn(Serdes.String().serializer());
+
+        final String primaryKey = "originalKey";
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, 
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = null;
+        final SubscriptionWrapper<String> wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            primaryKey,
+            SubscriptionWrapper.VERSION_0,
+            primaryPartition);
+        when(mockDeserializer.deserialize(TOPIC, HEADERS, 
primaryKey.getBytes())).thenReturn(primaryKey);
+
+        final SubscriptionWrapperSerde<String> swSerde = new 
SubscriptionWrapperSerde<>(() -> TOPIC, mockSerde);
+        final byte[] serialized = swSerde.serializer().serialize(TOPIC, 
HEADERS, wrapper);
+
+        swSerde.deserializer().deserialize(TOPIC, HEADERS, serialized);
+
+        verify(mockDeserializer).deserialize(TOPIC, HEADERS, 
primaryKey.getBytes());
+        verify(mockDeserializer, never()).deserialize(TOPIC, 
primaryKey.getBytes());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializerTest.java
new file mode 100644
index 00000000000..e71156510ca
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class ValueAndTimestampDeserializerTest {
+    private static final String TOPIC = "some-topic";
+    private static final long TIMESTAMP = 23;
+    private static final Headers HEADERS = new RecordHeaders().add("key", 
"value".getBytes());
+
+    private static final ValueAndTimestampSerde<String> STRING_SERDE = new 
ValueAndTimestampSerde<>(Serdes.String());
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        final ValueAndTimestampDeserializer<String> deserializer = new 
ValueAndTimestampDeserializer<>(mockDeserializer);
+
+        final String value = "value";
+        final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(value, TIMESTAMP);
+        final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, 
HEADERS, valueAndTimestamp);
+
+        deserializer.deserialize(TOPIC, HEADERS, serialized);
+
+        verify(mockDeserializer).deserialize(TOPIC, HEADERS, 
rawValue(serialized));
+        verify(mockDeserializer, never()).deserialize(TOPIC, 
rawValue(serialized));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
index ba4dce54697..e65fbc4daa6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import org.junit.jupiter.api.Test;
@@ -28,10 +31,15 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ValueAndTimestampSerializerTest {
     private static final String TOPIC = "some-topic";
     private static final long TIMESTAMP = 23;
+    private static final Headers HEADERS = new RecordHeaders().add("key", 
"value".getBytes());
 
     private static final ValueAndTimestampSerde<String> STRING_SERDE =
             new ValueAndTimestampSerde<>(Serdes.String());
@@ -41,15 +49,9 @@ public class ValueAndTimestampSerializerTest {
         final String value = "some-string";
 
         final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(value, TIMESTAMP);
-
-        final byte[] serialized =
-                STRING_SERDE.serializer().serialize(TOPIC, valueAndTimestamp);
-
+        final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, 
HEADERS, valueAndTimestamp);
         assertThat(serialized, is(notNullValue()));
-
-        final ValueAndTimestamp<String> deserialized =
-                STRING_SERDE.deserializer().deserialize(TOPIC, serialized);
-
+        final ValueAndTimestamp<String> deserialized = 
STRING_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
         assertThat(deserialized, is(valueAndTimestamp));
     }
 
@@ -58,9 +60,9 @@ public class ValueAndTimestampSerializerTest {
         final String value = "food";
 
         final ValueAndTimestamp<String> oldValueAndTimestamp = 
ValueAndTimestamp.make(value, TIMESTAMP);
-        final byte[] oldSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+        final byte[] oldSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, oldValueAndTimestamp);
         final ValueAndTimestamp<String> newValueAndTimestamp = 
ValueAndTimestamp.make(value, TIMESTAMP + 1);
-        final byte[] newSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, newValueAndTimestamp);
+        final byte[] newSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, newValueAndTimestamp);
         
assertTrue(ValueAndTimestampSerializer.valuesAreSameAndTimeIsIncreasing(oldSerializedValue,
 newSerializedValue));
     }
 
@@ -69,16 +71,15 @@ public class ValueAndTimestampSerializerTest {
         final String value = "balls";
 
         final ValueAndTimestamp<String> oldValueAndTimestamp = 
ValueAndTimestamp.make(value, TIMESTAMP);
-        final byte[] oldSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+        final byte[] oldSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, oldValueAndTimestamp);
         final ValueAndTimestamp<String> outOfOrderValueAndTimestamp = 
ValueAndTimestamp.make(value, TIMESTAMP - 1);
-        final byte[] outOfOrderSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, outOfOrderValueAndTimestamp);
+        final byte[] outOfOrderSerializedValue = 
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, 
outOfOrderValueAndTimestamp);
         
assertFalse(ValueAndTimestampSerializer.valuesAreSameAndTimeIsIncreasing(oldSerializedValue,
 outOfOrderSerializedValue));
     }
 
     @Test
     public void shouldSerializeNullDataAsNull() {
-        final byte[] serialized =
-                STRING_SERDE.serializer().serialize(TOPIC, 
ValueAndTimestamp.make(null, TIMESTAMP));
+        final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, 
HEADERS, ValueAndTimestamp.make(null, TIMESTAMP));
 
         assertThat(serialized, is(nullValue()));
     }
@@ -88,12 +89,22 @@ public class ValueAndTimestampSerializerTest {
         // Testing against regressions with respect to 
https://github.com/apache/kafka/pull/7679
 
         final Serializer<String> alwaysNullSerializer = (topic, data) -> null;
+        final ValueAndTimestampSerializer<String> serializer = new 
ValueAndTimestampSerializer<>(alwaysNullSerializer);
+        final byte[] serialized = serializer.serialize(TOPIC, HEADERS, 
"non-null-data", TIMESTAMP);
+        assertThat(serialized, is(nullValue()));
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final ValueAndTimestampSerializer<String> serializer = new 
ValueAndTimestampSerializer<>(mockSerializer);
 
-        final ValueAndTimestampSerializer<String> serializer =
-                new ValueAndTimestampSerializer<>(alwaysNullSerializer);
+        final String value = "value";
+        when(mockSerializer.serialize(TOPIC, HEADERS, 
value)).thenReturn(value.getBytes());
 
-        final byte[] serialized = serializer.serialize(TOPIC, "non-null-data", 
TIMESTAMP);
+        serializer.serialize(TOPIC, HEADERS, ValueAndTimestamp.make(value, 
TIMESTAMP));
 
-        assertThat(serialized, is(nullValue()));
+        verify(mockSerializer).serialize(TOPIC, HEADERS, value);
+        verify(mockSerializer, never()).serialize(TOPIC, value);
     }
 }


Reply via email to