This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 197e5e748aa KAFKA-20173: Propagate headers into serde 5/N (#21756)
197e5e748aa is described below

commit 197e5e748aac41a6a9ced2920d2ffed7ef67d405
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Mar 15 23:45:01 2026 +0100

    KAFKA-20173: Propagate headers into serde 5/N (#21756)
    
    This PR ensures that FullChangeSerde propagates headers to the wrapped
    de/serializers.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/kstream/internals/FullChangeSerde.java | 13 ++--
 .../InMemoryTimeOrderedKeyValueChangeBuffer.java   |  7 +-
 .../kstream/internals/FullChangeSerdeTest.java     | 75 +++++++++++++++++++---
 3 files changed, 76 insertions(+), 19 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 3a3439448cb..194b93f6029 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -44,27 +45,27 @@ public final class FullChangeSerde<T> {
         return inner;
     }
 
-    public Change<byte[]> serializeParts(final String topic, final Change<T> 
data) {
+    public Change<byte[]> serializeParts(final String topic, final Headers 
headers, final Change<T> data) {
         if (data == null) {
             return null;
         }
         final Serializer<T> innerSerializer = innerSerde().serializer();
-        final byte[] oldBytes = data.oldValue == null ? null : 
innerSerializer.serialize(topic, data.oldValue);
-        final byte[] newBytes = data.newValue == null ? null : 
innerSerializer.serialize(topic, data.newValue);
+        final byte[] oldBytes = data.oldValue == null ? null : 
innerSerializer.serialize(topic, headers, data.oldValue);
+        final byte[] newBytes = data.newValue == null ? null : 
innerSerializer.serialize(topic, headers, data.newValue);
         return new Change<>(newBytes, oldBytes);
     }
 
 
-    public Change<T> deserializeParts(final String topic, final Change<byte[]> 
serialChange) {
+    public Change<T> deserializeParts(final String topic, final Headers 
headers, final Change<byte[]> serialChange) {
         if (serialChange == null) {
             return null;
         }
         final Deserializer<T> innerDeserializer = innerSerde().deserializer();
 
         final T oldValue =
-            serialChange.oldValue == null ? null : 
innerDeserializer.deserialize(topic, serialChange.oldValue);
+            serialChange.oldValue == null ? null : 
innerDeserializer.deserialize(topic, headers, serialChange.oldValue);
         final T newValue =
-            serialChange.newValue == null ? null : 
innerDeserializer.deserialize(topic, serialChange.newValue);
+            serialChange.newValue == null ? null : 
innerDeserializer.deserialize(topic, headers, serialChange.newValue);
 
         return new Change<>(newValue, oldValue);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
index eff87a86fee..14bf82121ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
@@ -399,10 +399,11 @@ public final class 
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
                             next.getKey().time() + "]"
                     );
                 }
-                final K key = 
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
+                final K key = 
keySerde.deserializer().deserialize(changelogTopic, context.headers(), 
next.getKey().key().get());
                 final BufferValue bufferValue = next.getValue();
                 final Change<V> value = valueSerde.deserializeParts(
                     changelogTopic,
+                    context.headers(),
                     new Change<>(bufferValue.newValue(), 
bufferValue.oldValue())
                 );
                 callback.accept(new Eviction<K, Change<V>>(key, value, 
bufferValue.context()));
@@ -470,8 +471,8 @@ public final class 
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
         requireNonNull(record.value(), "value cannot be null");
         requireNonNull(recordContext, "recordContext cannot be null");
 
-        final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, record.key()));
-        final Change<byte[]> serialChange = 
valueSerde.serializeParts(changelogTopic, record.value());
+        final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, 
recordContext.headers(), record.key()));
+        final Change<byte[]> serialChange = 
valueSerde.serializeParts(changelogTopic, recordContext.headers(), 
record.value());
 
         final BufferValue buffered = getBuffered(serializedKey);
         final byte[] serializedPriorValue;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index 3cade798cc4..50074f22dff 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -16,7 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 
 import org.junit.jupiter.api.Test;
 
@@ -25,6 +32,10 @@ import java.nio.ByteBuffer;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class FullChangeSerdeTest {
     private final FullChangeSerde<String> serde = 
FullChangeSerde.wrap(Serdes.String());
@@ -58,22 +69,22 @@ public class FullChangeSerdeTest {
 
     @Test
     public void shouldRoundTripNull() {
-        assertThat(serde.serializeParts(null, null), nullValue());
+        assertThat(serde.serializeParts(null, null, null), nullValue());
         assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null), 
nullValue());
         
assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), 
nullValue());
-        assertThat(serde.deserializeParts(null, null), nullValue());
+        assertThat(serde.deserializeParts(null, null, null), nullValue());
     }
 
 
     @Test
     public void shouldRoundTripNullChange() {
         assertThat(
-            serde.serializeParts(null, new Change<>(null, null)),
+            serde.serializeParts(null, null, new Change<>(null, null)),
             is(new Change<byte[]>(null, null))
         );
 
         assertThat(
-            serde.deserializeParts(null, new Change<>(null, null)),
+            serde.deserializeParts(null, null, new Change<>(null, null)),
             is(new Change<String>(null, null))
         );
 
@@ -86,34 +97,78 @@ public class FullChangeSerdeTest {
 
     @Test
     public void shouldRoundTripOldNull() {
-        final Change<byte[]> serialized = serde.serializeParts(null, new 
Change<>("new", null));
+        final Change<byte[]> serialized = serde.serializeParts(null, new 
RecordHeaders(), new Change<>("new", null));
         final byte[] legacyFormat = 
mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
         final Change<byte[]> decomposedLegacyFormat = 
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
-            serde.deserializeParts(null, decomposedLegacyFormat),
+            serde.deserializeParts(null, new RecordHeaders(), 
decomposedLegacyFormat),
             is(new Change<>("new", null))
         );
     }
 
     @Test
     public void shouldRoundTripNewNull() {
-        final Change<byte[]> serialized = serde.serializeParts(null, new 
Change<>(null, "old"));
+        final Change<byte[]> serialized = serde.serializeParts(null, new 
RecordHeaders(), new Change<>(null, "old"));
         final byte[] legacyFormat = 
mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
         final Change<byte[]> decomposedLegacyFormat = 
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
-            serde.deserializeParts(null, decomposedLegacyFormat),
+            serde.deserializeParts(null, new RecordHeaders(), 
decomposedLegacyFormat),
             is(new Change<>(null, "old"))
         );
     }
 
     @Test
     public void shouldRoundTripChange() {
-        final Change<byte[]> serialized = serde.serializeParts(null, new 
Change<>("new", "old"));
+        final Change<byte[]> serialized = serde.serializeParts(null, new 
RecordHeaders(), new Change<>("new", "old"));
         final byte[] legacyFormat = 
mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
         final Change<byte[]> decomposedLegacyFormat = 
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
-            serde.deserializeParts(null, decomposedLegacyFormat),
+            serde.deserializeParts(null, new RecordHeaders(), 
decomposedLegacyFormat),
             is(new Change<>("new", "old"))
         );
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.serializer()).thenReturn(mockSerializer);
+
+        final String topic = "dummy";
+        final String newValue = "new";
+        final String oldValue = "old";
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+        final Change<String> data = new Change<>(newValue, oldValue);
+
+        final FullChangeSerde<String> testSerde = 
FullChangeSerde.wrap(mockSerde);
+
+        testSerde.serializeParts(topic, headers, data);
+
+        verify(mockSerializer).serialize(topic, headers, newValue);
+        verify(mockSerializer).serialize(topic, headers, oldValue);
+        verify(mockSerializer, never()).serialize(topic, newValue);
+        verify(mockSerializer, never()).serialize(topic, oldValue);
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+
+        final String topic = "dummy";
+        final byte[] newValueBytes = "new".getBytes();
+        final byte[] oldValueBytes = "old".getBytes();
+        final Headers headers = new RecordHeaders().add("key", 
"value".getBytes());
+        final Change<byte[]> serialChange = new Change<>(newValueBytes, 
oldValueBytes);
+
+        final FullChangeSerde<String> testSerde = 
FullChangeSerde.wrap(mockSerde);
+
+        testSerde.deserializeParts(topic, headers, serialChange);
+
+        verify(mockDeserializer).deserialize(topic, headers, newValueBytes);
+        verify(mockDeserializer).deserialize(topic, headers, oldValueBytes);
+        verify(mockDeserializer, never()).deserialize(topic, newValueBytes);
+        verify(mockDeserializer, never()).deserialize(topic, oldValueBytes);
+    }
 }

Reply via email to