This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c0d143dd769 KAFKA-20173: Propagate headers into serde 4/N (#21755)
c0d143dd769 is described below

commit c0d143dd769076722c95cf50ac5de2d953d9bf83
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Mar 15 23:44:14 2026 +0100

    KAFKA-20173: Propagate headers into serde 4/N (#21755)
    
    This PR fixes headers propagation in SessionWindowedSerializer and
    TimeWindowedSerializer so they works as expected (from headers
    propagation perspective).
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/common/serialization/ListSerializerTest.java     |  8 ++++----
 .../kafka/streams/kstream/SessionWindowedSerializer.java   |  7 ++++++-
 .../kafka/streams/kstream/TimeWindowedSerializer.java      |  7 ++++++-
 .../streams/kstream/internals/WindowedSerializer.java      |  5 +++++
 .../kstream/internals/WindowedStreamPartitioner.java       |  3 ++-
 .../streams/kstream/SessionWindowedDeserializerTest.java   |  7 ++++---
 .../streams/kstream/SessionWindowedSerializerTest.java     | 14 ++++++++++----
 .../streams/kstream/TimeWindowedDeserializerTest.java      |  7 ++++---
 .../kafka/streams/kstream/TimeWindowedSerializerTest.java  | 14 ++++++++++----
 .../apache/kafka/streams/kstream/WindowedSerdesTest.java   |  5 +++--
 10 files changed, 54 insertions(+), 23 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
index e9984a8a7a8..65196a84d1d 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
@@ -34,7 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -167,15 +166,16 @@ public class ListSerializerTest {
         when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
 
         final String topic = "topic";
-        final List<String> data = List.of("test-key");
+        final String key = "test-key";
+        final List<String> data = List.of(key);
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
 
         final ListSerializer<String> testSerializer = new 
ListSerializer<>(mockSerializer);
 
         testSerializer.serialize(topic, headers, data);
 
-        verify(mockSerializer).serialize(eq(topic), eq(headers), 
eq("test-key"));
-        verify(mockSerializer, never()).serialize(anyString(), anyString());
+        verify(mockSerializer).serialize(topic, headers, key);
+        verify(mockSerializer, never()).serialize(topic, key);
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 50edd86f95d..9a87752e288 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -112,9 +112,14 @@ public class SessionWindowedSerializer<T> implements 
WindowedSerializer<T> {
 
     @Override
     public byte[] serializeBaseKey(final String topic, final Windowed<T> data) 
{
+        return serializeBaseKey(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public byte[] serializeBaseKey(final String topic, final Headers headers, 
final Windowed<T> data) {
         WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
 
-        return inner.serialize(topic, data.key());
+        return inner.serialize(topic, headers, data.key());
     }
 
     // Only for testing
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index b185d830e9e..5eef64da780 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -113,9 +113,14 @@ public class TimeWindowedSerializer<T> implements 
WindowedSerializer<T> {
 
     @Override
     public byte[] serializeBaseKey(final String topic, final Windowed<T> data) 
{
+        return serializeBaseKey(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public byte[] serializeBaseKey(final String topic, final Headers headers, 
final Windowed<T> data) {
         WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
 
-        return inner.serialize(topic, data.key());
+        return inner.serialize(topic, headers, data.key());
     }
 
     // Only for testing
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index 09185b2ab2f..9cffa7c2f7a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Windowed;
 
 public interface WindowedSerializer<T> extends Serializer<Windowed<T>> {
 
     byte[] serializeBaseKey(String topic, Windowed<T> data);
+
+    default byte[] serializeBaseKey(final String topic, final  Headers 
headers, final  Windowed<T> data) {
+        return serializeBaseKey(topic, data);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 9a8fb9ddd7c..54735b98bb1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
@@ -46,7 +47,7 @@ public class WindowedStreamPartitioner<K, V> implements 
StreamPartitioner<Window
     @Override
     public Optional<Set<Integer>> partitions(final String topic, final 
Windowed<K> windowedKey, final V value, final int numPartitions) {
         // for windowed key, the key bytes should never be null
-        final byte[] keyBytes = serializer.serializeBaseKey(topic, 
windowedKey);
+        final byte[] keyBytes = serializer.serializeBaseKey(topic, new 
RecordHeaders(), windowedKey);
 
         // stick with the same built-in partitioner util functions that 
producer used
         // to make sure its behavior is consistent with the producer
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 35adaed1ee3..3b82fb04194 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -122,15 +122,16 @@ public class SessionWindowedDeserializerTest {
         final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
         when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("test-value");
 
+        final String topic = "dummy";
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
         final Windowed<String> windowed = new Windowed<>("test-key", new 
SessionWindow(0, 1));
-        final byte[] data = new 
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy", 
headers, windowed);
+        final byte[] data = new 
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize(topic, 
headers, windowed);
 
         final SessionWindowedDeserializer<String> testDeserializer = new 
SessionWindowedDeserializer<>(mockDeserializer);
 
-        testDeserializer.deserialize("dummy", headers, data);
+        testDeserializer.deserialize(topic, headers, data);
 
-        verify(mockDeserializer).deserialize(anyString(), eq(headers), 
any(byte[].class));
+        verify(mockDeserializer).deserialize(eq(topic), eq(headers), 
any(byte[].class));
         verify(mockDeserializer, never()).deserialize(anyString(), 
any(byte[].class));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index d2e15f542a1..bd2d4711af3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -36,9 +36,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -122,15 +122,21 @@ public class SessionWindowedSerializerTest {
         final Serializer<String> mockSerializer = mock(StringSerializer.class);
         when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
 
+        final String topic = "dummy";
         final String key = "test-key";
         final Windowed<String> data = new Windowed<>(key, new SessionWindow(0, 
1));
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
 
         final SessionWindowedSerializer<String> testSerializer = new 
SessionWindowedSerializer<>(mockSerializer);
 
-        testSerializer.serialize("dummy", headers, data);
+        testSerializer.serialize(topic, headers, data);
 
-        verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
-        verify(mockSerializer, never()).serialize(anyString(), eq(key));
+        verify(mockSerializer, times(1)).serialize(topic, headers, key);
+        verify(mockSerializer, never()).serialize(topic, key);
+
+        testSerializer.serializeBaseKey(topic, headers, data);
+
+        verify(mockSerializer, times(2)).serialize(topic, headers, key);
+        verify(mockSerializer, never()).serialize(topic, key);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
index 0b6f17a5f27..f48d2394c12 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -179,15 +179,16 @@ public class TimeWindowedDeserializerTest {
         final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
         when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("test-value");
 
+        final String topic = "dummy";
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
         final Windowed<String> windowed = new Windowed<>("test-key", new 
TimeWindow(0, 1));
-        final byte[] data = new 
TimeWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy", 
headers, windowed);
+        final byte[] data = new 
TimeWindowedSerializer<>(Serdes.String().serializer()).serialize(topic, 
headers, windowed);
 
         final TimeWindowedDeserializer<String> testDeserializer = new 
TimeWindowedDeserializer<>(mockDeserializer, 1L);
 
-        testDeserializer.deserialize("dummy", headers, data);
+        testDeserializer.deserialize(topic, headers, data);
 
-        verify(mockDeserializer).deserialize(anyString(), eq(headers), 
any(byte[].class));
+        verify(mockDeserializer).deserialize(eq(topic), eq(headers), 
any(byte[].class));
         verify(mockDeserializer, never()).deserialize(anyString(), 
any(byte[].class));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
index 54dcfa57142..837bb344542 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -36,9 +36,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -122,15 +122,21 @@ public class TimeWindowedSerializerTest {
         final Serializer<String> mockSerializer = mock(StringSerializer.class);
         when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
 
+        final String topic = "dummy";
         final String key = "test-key";
         final Windowed<String> data = new Windowed<>(key, new TimeWindow(0, 
1));
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
 
         final TimeWindowedSerializer<String> testSerializer = new 
TimeWindowedSerializer<>(mockSerializer);
 
-        testSerializer.serialize("dummy", headers, data);
+        testSerializer.serialize(topic, headers, data);
 
-        verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
-        verify(mockSerializer, never()).serialize(anyString(), eq(key));
+        verify(mockSerializer, times(1)).serialize(topic, headers, key);
+        verify(mockSerializer, never()).serialize(topic, key);
+
+        testSerializer.serializeBaseKey(topic, headers, data);
+
+        verify(mockSerializer, times(2)).serialize(topic, headers, key);
+        verify(mockSerializer, never()).serialize(topic, key);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
index cc858c32aa9..a0a73c829fc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -87,7 +88,7 @@ public class WindowedSerdesTest {
         final TimeWindowedSerializer<byte[]> serializer = new 
TimeWindowedSerializer<>();
         final NullPointerException exception = assertThrows(
             NullPointerException.class,
-            () -> serializer.serializeBaseKey("topic", new Windowed<>(new 
byte[0], new TimeWindow(0, 1))));
+            () -> serializer.serializeBaseKey("topic", new RecordHeaders(), 
new Windowed<>(new byte[0], new TimeWindow(0, 1))));
         assertThat(
             exception.getMessage(),
             equalTo("Inner serializer is `null`. User code must use 
constructor " +
@@ -123,7 +124,7 @@ public class WindowedSerdesTest {
         final SessionWindowedSerializer<byte[]> serializer = new 
SessionWindowedSerializer<>();
         final NullPointerException exception = assertThrows(
             NullPointerException.class,
-            () -> serializer.serializeBaseKey("topic", new Windowed<>(new 
byte[0], new SessionWindow(0, 0))));
+            () -> serializer.serializeBaseKey("topic", new RecordHeaders(), 
new Windowed<>(new byte[0], new SessionWindow(0, 0))));
         assertThat(
             exception.getMessage(),
             equalTo("Inner serializer is `null`. User code must use 
constructor " +

Reply via email to