This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 367efc4a2ce KAFKA-20173: Propagate headers into serde 7/N (#21851)
367efc4a2ce is described below

commit 367efc4a2ce89114f9113af18a811799983ae2d0
Author: Uladzislau Blok <[email protected]>
AuthorDate: Fri Apr 3 06:29:41 2026 +0200

    KAFKA-20173: Propagate headers into serde 7/N (#21851)
    
    This PR ensures headers are propagated in
    `SubscriptionResponseWrapperSerde`,
    `LeftOrRightValueSerializer/Deserializer` and
    `TimestampedKeyAndJoinSideSerializer/Deserializer`  Added unit tests to
    check headers propagation.
    
    Reviewers: Murali Basani <[email protected]>, Alieh Saeedi
     <[email protected]>, Matthias J. Sax <[email protected]>
---
 .../SubscriptionResponseWrapperSerde.java          | 15 ++++-
 .../internals/LeftOrRightValueDeserializer.java    | 10 ++-
 .../internals/LeftOrRightValueSerializer.java      | 10 ++-
 .../TimestampedKeyAndJoinSideDeserializer.java     | 10 ++-
 .../TimestampedKeyAndJoinSideSerializer.java       | 10 ++-
 .../SubscriptionResponseWrapperSerdeTest.java      | 71 +++++++++++++++++++---
 .../LeftOrRightValueDeserializerTest.java          | 53 ++++++++++++++++
 .../internals/LeftOrRightValueSerializerTest.java  | 44 ++++++++++----
 .../TimestampedKeyAndJoinSideDeserializerTest.java | 65 ++++++++++++++++++++
 .../TimestampedKeyAndJoinSideSerializerTest.java   | 55 +++++++++++++----
 10 files changed, 300 insertions(+), 43 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 0523ed2fe79..4e5c1f7fc2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -64,13 +65,18 @@ public class SubscriptionResponseWrapperSerde<VRight> 
implements Serde<Subscript
 
         @Override
         public byte[] serialize(final String topic, final 
SubscriptionResponseWrapper<V> data) {
+            throw new 
UnsupportedOperationException("SubscriptionResponseWrapperSerializer requires 
the headers-aware version of serialize");
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Headers headers, 
final SubscriptionResponseWrapper<V> data) {
             
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
 
             if (data.version() < 0) {
                 throw new 
UnsupportedVersionException("SubscriptionResponseWrapper version cannot be 
negative");
             }
 
-            final byte[] serializedData = data.foreignValue() == null ? null : 
serializer.serialize(topic, data.foreignValue());
+            final byte[] serializedData = data.foreignValue() == null ? null : 
serializer.serialize(topic, headers, data.foreignValue());
             final int serializedDataLength = serializedData == null ? 0 : 
serializedData.length;
             final long[] originalHash = data.originalValueHash();
             final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
@@ -111,6 +117,11 @@ public class SubscriptionResponseWrapperSerde<VRight> 
implements Serde<Subscript
 
         @Override
         public SubscriptionResponseWrapper<V> deserialize(final String topic, 
final byte[] data) {
+            throw new 
UnsupportedOperationException("SubscriptionResponseWrapperSerializer requires 
the headers-aware version of deserialize");
+        }
+
+        @Override
+        public SubscriptionResponseWrapper<V> deserialize(final String topic, 
final Headers headers, final byte[] data) {
             
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
 
             final ByteBuffer buf = ByteBuffer.wrap(data);
@@ -134,7 +145,7 @@ public class SubscriptionResponseWrapperSerde<VRight> 
implements Serde<Subscript
                 final byte[] serializedValue;
                 serializedValue = new byte[data.length - lengthSum];
                 buf.get(serializedValue, 0, serializedValue.length);
-                value = deserializer.deserialize(topic, serializedValue);
+                value = deserializer.deserialize(topic, headers, 
serializedValue);
             } else {
                 value = null;
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
index df45bc683dd..d0df771fe58 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -57,13 +58,18 @@ public class LeftOrRightValueDeserializer<V1, V2> 
implements WrappingNullableDes
 
     @Override
     public LeftOrRightValue<V1, V2> deserialize(final String topic, final 
byte[] data) {
+        throw new UnsupportedOperationException("LeftOrRightValueDeserializer 
requires the headers-aware version of deserialize");
+    }
+
+    @Override
+    public LeftOrRightValue<V1, V2> deserialize(final String topic, final 
Headers headers, final byte[] data) {
         if (data == null || data.length == 0) {
             return null;
         }
 
         return (data[0] == 1)
-            ? 
LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic, 
rawValue(data)))
-            : 
LeftOrRightValue.makeRightValue(rightDeserializer.deserialize(topic, 
rawValue(data)));
+            ? 
LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic, headers, 
rawValue(data)))
+            : 
LeftOrRightValue.makeRightValue(rightDeserializer.deserialize(topic, headers, 
rawValue(data)));
     }
 
     private byte[] rawValue(final byte[] data) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
index 1c64c29fd5a..9af03764e32 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -61,13 +62,18 @@ public class LeftOrRightValueSerializer<V1, V2> implements 
WrappingNullableSeria
 
     @Override
     public byte[] serialize(final String topic, final LeftOrRightValue<V1, V2> 
data) {
+        throw new UnsupportedOperationException("LeftOrRightValueSerializer 
requires the headers-aware version of serialize");
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Headers headers, final 
LeftOrRightValue<V1, V2> data) {
         if (data == null) {
             return null;
         }
 
         final byte[] rawValue = (data.leftValue() != null)
-            ? leftSerializer.serialize(topic, data.leftValue())
-            : rightSerializer.serialize(topic, data.rightValue());
+            ? leftSerializer.serialize(topic, headers, data.leftValue())
+            : rightSerializer.serialize(topic, headers, data.rightValue());
 
         if (rawValue == null) {
             return null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
index 4285dd7bf16..a962f3f3bcb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
@@ -56,9 +57,14 @@ public class TimestampedKeyAndJoinSideDeserializer<K> 
implements WrappingNullabl
 
     @Override
     public TimestampedKeyAndJoinSide<K> deserialize(final String topic, final 
byte[] data) {
+        throw new 
UnsupportedOperationException("TimestampedKeyAndJoinSideDeserializer requires 
the headers-aware version of deserialize");
+    }
+
+    @Override
+    public TimestampedKeyAndJoinSide<K> deserialize(final String topic, final 
Headers headers, final byte[] data) {
         final boolean isLeft = data[StateSerdes.TIMESTAMP_SIZE] == 1;
-        final K key = keyDeserializer.deserialize(topic, rawKey(data));
-        final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp(data));
+        final K key = keyDeserializer.deserialize(topic, headers, 
rawKey(data));
+        final long timestamp = timestampDeserializer.deserialize(topic, 
headers, rawTimestamp(data));
 
         return isLeft ? TimestampedKeyAndJoinSide.makeLeft(key, timestamp) :
                 TimestampedKeyAndJoinSide.makeRight(key, timestamp);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
index d94cc486357..5d9b892ac90 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
@@ -56,9 +57,14 @@ public class TimestampedKeyAndJoinSideSerializer<K> 
implements WrappingNullableS
 
     @Override
     public byte[] serialize(final String topic, final 
TimestampedKeyAndJoinSide<K> data) {
+        throw new 
UnsupportedOperationException("TimestampedKeyAndJoinSideSerializer requires the 
headers-aware version of serialize");
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Headers headers, final 
TimestampedKeyAndJoinSide<K> data) {
         final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
-        final byte[] keyBytes = keySerializer.serialize(topic, data.key());
-        final byte[] timestampBytes = timestampSerializer.serialize(topic, 
data.timestamp());
+        final byte[] keyBytes = keySerializer.serialize(topic, headers, 
data.key());
+        final byte[] timestampBytes = timestampSerializer.serialize(topic, 
headers, data.timestamp());
 
         return ByteBuffer
             .allocate(timestampBytes.length + 1 + keyBytes.length)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 276600fd106..9aa5f28fcd7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -17,10 +17,14 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.state.internals.Murmur3;
 
 import org.junit.jupiter.api.Test;
@@ -32,6 +36,10 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class SubscriptionResponseWrapperSerdeTest {
     private static final class NonNullableSerde<T> implements Serde<T>, 
Serializer<T>, Deserializer<T> {
@@ -68,14 +76,16 @@ public class SubscriptionResponseWrapperSerdeTest {
         }
     }
 
+    private static final Headers HEADERS = new RecordHeaders();
+
     @Test
-    public void ShouldSerdeWithNonNullsTest() {
+    public void shouldSerdeWithNonNullsTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
         try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
-            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
-            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
HEADERS, srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
 
             assertArrayEquals(hashedValue, result.originalValueHash());
             assertEquals(foreignValue, result.foreignValue());
@@ -88,8 +98,8 @@ public class SubscriptionResponseWrapperSerdeTest {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, null, 1);
         try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
-            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
-            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
HEADERS, srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
 
             assertArrayEquals(hashedValue, result.originalValueHash());
             assertNull(result.foreignValue());
@@ -103,8 +113,8 @@ public class SubscriptionResponseWrapperSerdeTest {
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
         try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
-            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
-            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
HEADERS, srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
 
             assertArrayEquals(hashedValue, result.originalValueHash());
             assertEquals(foreignValue, result.foreignValue());
@@ -118,8 +128,8 @@ public class SubscriptionResponseWrapperSerdeTest {
         final String foreignValue = null;
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
         try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
-            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
-            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
HEADERS, srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
 
             assertArrayEquals(hashedValue, result.originalValueHash());
             assertEquals(foreignValue, result.foreignValue());
@@ -142,11 +152,52 @@ public class SubscriptionResponseWrapperSerdeTest {
         try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(null)) {
             assertThrows(
                 UnsupportedVersionException.class,
-                () -> srwSerde.serializer().serialize(null, srw)
+                () -> srwSerde.serializer().serialize(null, HEADERS, srw)
             );
         }
     }
 
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.serializer()).thenReturn(mockSerializer);
+
+        final String topic = "dummy";
+        final String foreignValue = "foreignValue";
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+        final SubscriptionResponseWrapper<String> data = new 
SubscriptionResponseWrapper<>(null, foreignValue, 1);
+
+        final SubscriptionResponseWrapperSerde<String> testSerde = new 
SubscriptionResponseWrapperSerde<>(mockSerde);
+
+        testSerde.serializer().serialize(topic, headers, data);
+
+        verify(mockSerializer).serialize(topic, headers, foreignValue);
+        verify(mockSerializer, never()).serialize(topic, foreignValue);
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+        when(mockSerde.serializer()).thenReturn(Serdes.String().serializer());
+
+        final String topic = "dummy";
+        final String foreignValue = "foreignValue";
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+        final SubscriptionResponseWrapper<String> data = new 
SubscriptionResponseWrapper<>(null, foreignValue, 1);
+
+        final SubscriptionResponseWrapperSerde<String> testSerde = new 
SubscriptionResponseWrapperSerde<>(mockSerde);
+
+        final byte[] serializedData = testSerde.serializer().serialize(topic, 
headers, data);
+
+        testSerde.deserializer().deserialize(topic, headers, serializedData);
+
+        verify(mockDeserializer).deserialize(topic, headers, 
foreignValue.getBytes());
+        verify(mockDeserializer, never()).deserialize(topic, 
foreignValue.getBytes());
+    }
+
     public static class InvalidSubscriptionResponseWrapper extends 
SubscriptionResponseWrapper<String> {
 
         public InvalidSubscriptionResponseWrapper(final long[] 
originalValueHash,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializerTest.java
new file mode 100644
index 00000000000..bc9eca2545f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class LeftOrRightValueDeserializerTest {
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+
+        final String topic = "dummy";
+        final String value = "some-string";
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+        final LeftOrRightValue<String, Object> data = 
LeftOrRightValue.makeLeftValue(value);
+        final byte[] serializedBytes = new 
LeftOrRightValueSerializer<>(Serdes.String().serializer(), 
null).serialize(topic, headers, data);
+
+        when(mockDeserializer.deserialize(topic, headers, 
value.getBytes())).thenReturn("dummy-value");
+
+        final LeftOrRightValueDeserializer<String, String> testDeserializer = 
new LeftOrRightValueDeserializer<>(mockDeserializer, null);
+
+        testDeserializer.deserialize(topic, headers, serializedBytes);
+
+        verify(mockDeserializer).deserialize(topic, headers, value.getBytes());
+        verify(mockDeserializer, never()).deserialize(topic, value.getBytes());
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
index 2a5aa5c891c..db771476927 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
@@ -16,7 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 
 import org.junit.jupiter.api.Test;
 
@@ -24,12 +28,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 public class LeftOrRightValueSerializerTest {
     private static final String TOPIC = "some-topic";
+    private static final Headers HEADERS = new RecordHeaders();
 
-    private static final LeftOrRightValueSerde<String, Integer> 
STRING_OR_INTEGER_SERDE =
-        new LeftOrRightValueSerde<>(Serdes.String(), Serdes.Integer());
+    private static final LeftOrRightValueSerde<String, Integer> 
STRING_OR_INTEGER_SERDE = new LeftOrRightValueSerde<>(Serdes.String(), 
Serdes.Integer());
 
     @Test
     public void shouldSerializeStringValue() {
@@ -37,13 +44,11 @@ public class LeftOrRightValueSerializerTest {
 
         final LeftOrRightValue<String, Integer> leftOrRightValue = 
LeftOrRightValue.makeLeftValue(value);
 
-        final byte[] serialized =
-            STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, 
leftOrRightValue);
+        final byte[] serialized = 
STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, HEADERS, 
leftOrRightValue);
 
         assertThat(serialized, is(notNullValue()));
 
-        final LeftOrRightValue<String, Integer> deserialized =
-            STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC, 
serialized);
+        final LeftOrRightValue<String, Integer> deserialized = 
STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
 
         assertThat(deserialized, is(leftOrRightValue));
     }
@@ -54,13 +59,11 @@ public class LeftOrRightValueSerializerTest {
 
         final LeftOrRightValue<String, Integer> leftOrRightValue = 
LeftOrRightValue.makeRightValue(value);
 
-        final byte[] serialized =
-            STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, 
leftOrRightValue);
+        final byte[] serialized = 
STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, HEADERS, 
leftOrRightValue);
 
         assertThat(serialized, is(notNullValue()));
 
-        final LeftOrRightValue<String, Integer> deserialized =
-            STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC, 
serialized);
+        final LeftOrRightValue<String, Integer> deserialized = 
STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
 
         assertThat(deserialized, is(leftOrRightValue));
     }
@@ -68,12 +71,29 @@ public class LeftOrRightValueSerializerTest {
     @Test
     public void shouldThrowIfSerializeValueAsNull() {
         assertThrows(NullPointerException.class,
-            () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, 
LeftOrRightValue.makeLeftValue(null)));
+            () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, 
HEADERS, LeftOrRightValue.makeLeftValue(null)));
     }
 
     @Test
     public void shouldThrowIfSerializeOtherValueAsNull() {
         assertThrows(NullPointerException.class,
-            () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, 
LeftOrRightValue.makeRightValue(null)));
+            () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, 
HEADERS, LeftOrRightValue.makeRightValue(null)));
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+
+        final String topic = "dummy";
+        final String value = "some-string";
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+        final LeftOrRightValue<String, String> data = 
LeftOrRightValue.makeLeftValue(value);
+
+        final LeftOrRightValueSerializer<String, String> testSerializer = new 
LeftOrRightValueSerializer<>(mockSerializer, null);
+
+        testSerializer.serialize(topic, headers, data);
+
+        verify(mockSerializer).serialize(topic, headers, value);
+        verify(mockSerializer, never()).serialize(topic, value);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializerTest.java
new file mode 100644
index 00000000000..059d482f8ad
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TimestampedKeyAndJoinSideDeserializerTest {
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        try (MockedConstruction<LongDeserializer> timestampSerializer = 
mockConstruction(LongDeserializer.class)) {
+            final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+            final TimestampedKeyAndJoinSideDeserializer<String> 
testDeserializer = new 
TimestampedKeyAndJoinSideDeserializer<>(mockDeserializer);
+            final Deserializer<Long> innerTimestampDeserializer = 
timestampSerializer.constructed().get(0);
+
+            final String topic = "dummy";
+            final String key = "some-key";
+            final long timestamp = 10;
+            final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+            final TimestampedKeyAndJoinSide<String> data = 
TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
+            final byte[] serializedValue = new 
TimestampedKeyAndJoinSideSerializer<>(Serdes.String().serializer()).serialize(topic,
 headers, data);
+
+            when(mockDeserializer.deserialize(topic, headers, 
key.getBytes())).thenReturn(key);
+            when(innerTimestampDeserializer.deserialize(eq(topic), 
eq(headers), any(byte[].class))).thenReturn(timestamp);
+
+            testDeserializer.deserialize(topic, headers, serializedValue);
+
+            verify(mockDeserializer).deserialize(topic, headers, 
key.getBytes());
+            verify(mockDeserializer, never()).deserialize(topic, 
key.getBytes());
+
+            verify(innerTimestampDeserializer).deserialize(eq(topic), 
eq(headers), any(byte[].class));
+            verify(innerTimestampDeserializer, never()).deserialize(eq(topic), 
any(byte[].class));
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
index 81d5736015a..0604ad64af5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
@@ -16,20 +16,31 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TimestampedKeyAndJoinSideSerializerTest {
     private static final String TOPIC = "some-topic";
+    private static final Headers HEADERS = new RecordHeaders();
 
-    private static final TimestampedKeyAndJoinSideSerde<String> STRING_SERDE =
-        new TimestampedKeyAndJoinSideSerde<>(Serdes.String());
+    private static final TimestampedKeyAndJoinSideSerde<String> STRING_SERDE = 
new TimestampedKeyAndJoinSideSerde<>(Serdes.String());
 
     @Test
     public void shouldSerializeKeyWithJoinSideAsTrue() {
@@ -37,13 +48,11 @@ public class TimestampedKeyAndJoinSideSerializerTest {
 
         final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide = 
TimestampedKeyAndJoinSide.makeLeft(value, 10);
 
-        final byte[] serialized =
-            STRING_SERDE.serializer().serialize(TOPIC, 
timestampedKeyAndJoinSide);
+        final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, 
HEADERS, timestampedKeyAndJoinSide);
 
         assertThat(serialized, is(notNullValue()));
 
-        final TimestampedKeyAndJoinSide<String> deserialized =
-            STRING_SERDE.deserializer().deserialize(TOPIC, serialized);
+        final TimestampedKeyAndJoinSide<String> deserialized = 
STRING_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
 
         assertThat(deserialized, is(timestampedKeyAndJoinSide));
     }
@@ -54,13 +63,11 @@ public class TimestampedKeyAndJoinSideSerializerTest {
 
         final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide = 
TimestampedKeyAndJoinSide.makeRight(value, 20);
 
-        final byte[] serialized =
-            STRING_SERDE.serializer().serialize(TOPIC, 
timestampedKeyAndJoinSide);
+        final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, 
HEADERS, timestampedKeyAndJoinSide);
 
         assertThat(serialized, is(notNullValue()));
 
-        final TimestampedKeyAndJoinSide<String> deserialized =
-            STRING_SERDE.deserializer().deserialize(TOPIC, serialized);
+        final TimestampedKeyAndJoinSide<String> deserialized = 
STRING_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
 
         assertThat(deserialized, is(timestampedKeyAndJoinSide));
     }
@@ -68,6 +75,32 @@ public class TimestampedKeyAndJoinSideSerializerTest {
     @Test
     public void shouldThrowIfSerializeNullData() {
         assertThrows(NullPointerException.class,
-            () -> STRING_SERDE.serializer().serialize(TOPIC, 
TimestampedKeyAndJoinSide.makeLeft(null, 0)));
+            () -> STRING_SERDE.serializer().serialize(TOPIC, HEADERS, 
TimestampedKeyAndJoinSide.makeLeft(null, 0)));
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        try (MockedConstruction<LongSerializer> timestampSerializer = 
mockConstruction(LongSerializer.class)) {
+            final Serializer<String> mockSerializer = 
mock(StringSerializer.class);
+            final TimestampedKeyAndJoinSideSerializer<String> testSerializer = 
new TimestampedKeyAndJoinSideSerializer<>(mockSerializer);
+            final Serializer<Long> innerTimestampSerializer = 
timestampSerializer.constructed().get(0);
+
+            final String topic = "dummy";
+            final String key = "some-key";
+            final long timestamp = 10;
+            final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+            final TimestampedKeyAndJoinSide<String> data = 
TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
+
+            when(mockSerializer.serialize(topic, headers, 
data.key())).thenReturn(key.getBytes());
+            when(innerTimestampSerializer.serialize(topic, headers, 
data.timestamp())).thenReturn(new byte[]{Byte.MAX_VALUE});
+
+            testSerializer.serialize(topic, headers, data);
+
+            verify(mockSerializer).serialize(topic, headers, key);
+            verify(mockSerializer, never()).serialize(topic, key);
+
+            verify(innerTimestampSerializer).serialize(topic, headers, 
timestamp);
+            verify(innerTimestampSerializer, never()).serialize(topic, 
timestamp);
+        }
     }
 }


Reply via email to