This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c0d143dd769 KAFKA-20173: Propagate headers into serde 4/N (#21755)
c0d143dd769 is described below
commit c0d143dd769076722c95cf50ac5de2d953d9bf83
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Mar 15 23:44:14 2026 +0100
KAFKA-20173: Propagate headers into serde 4/N (#21755)
This PR fixes headers propagation in SessionWindowedSerializer and
TimeWindowedSerializer so they works as expected (from headers
propagation perspective).
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/common/serialization/ListSerializerTest.java | 8 ++++----
.../kafka/streams/kstream/SessionWindowedSerializer.java | 7 ++++++-
.../kafka/streams/kstream/TimeWindowedSerializer.java | 7 ++++++-
.../streams/kstream/internals/WindowedSerializer.java | 5 +++++
.../kstream/internals/WindowedStreamPartitioner.java | 3 ++-
.../streams/kstream/SessionWindowedDeserializerTest.java | 7 ++++---
.../streams/kstream/SessionWindowedSerializerTest.java | 14 ++++++++++----
.../streams/kstream/TimeWindowedDeserializerTest.java | 7 ++++---
.../kafka/streams/kstream/TimeWindowedSerializerTest.java | 14 ++++++++++----
.../apache/kafka/streams/kstream/WindowedSerdesTest.java | 5 +++--
10 files changed, 54 insertions(+), 23 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
index e9984a8a7a8..65196a84d1d 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
@@ -34,7 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -167,15 +166,16 @@ public class ListSerializerTest {
when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
final String topic = "topic";
- final List<String> data = List.of("test-key");
+ final String key = "test-key";
+ final List<String> data = List.of(key);
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
final ListSerializer<String> testSerializer = new
ListSerializer<>(mockSerializer);
testSerializer.serialize(topic, headers, data);
- verify(mockSerializer).serialize(eq(topic), eq(headers),
eq("test-key"));
- verify(mockSerializer, never()).serialize(anyString(), anyString());
+ verify(mockSerializer).serialize(topic, headers, key);
+ verify(mockSerializer, never()).serialize(topic, key);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 50edd86f95d..9a87752e288 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -112,9 +112,14 @@ public class SessionWindowedSerializer<T> implements
WindowedSerializer<T> {
@Override
public byte[] serializeBaseKey(final String topic, final Windowed<T> data)
{
+ return serializeBaseKey(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public byte[] serializeBaseKey(final String topic, final Headers headers,
final Windowed<T> data) {
WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
- return inner.serialize(topic, data.key());
+ return inner.serialize(topic, headers, data.key());
}
// Only for testing
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index b185d830e9e..5eef64da780 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -113,9 +113,14 @@ public class TimeWindowedSerializer<T> implements
WindowedSerializer<T> {
@Override
public byte[] serializeBaseKey(final String topic, final Windowed<T> data)
{
+ return serializeBaseKey(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public byte[] serializeBaseKey(final String topic, final Headers headers,
final Windowed<T> data) {
WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
- return inner.serialize(topic, data.key());
+ return inner.serialize(topic, headers, data.key());
}
// Only for testing
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index 09185b2ab2f..9cffa7c2f7a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Windowed;
public interface WindowedSerializer<T> extends Serializer<Windowed<T>> {
byte[] serializeBaseKey(String topic, Windowed<T> data);
+
+ default byte[] serializeBaseKey(final String topic, final Headers
headers, final Windowed<T> data) {
+ return serializeBaseKey(topic, data);
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 9a8fb9ddd7c..54735b98bb1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -46,7 +47,7 @@ public class WindowedStreamPartitioner<K, V> implements
StreamPartitioner<Window
@Override
public Optional<Set<Integer>> partitions(final String topic, final
Windowed<K> windowedKey, final V value, final int numPartitions) {
// for windowed key, the key bytes should never be null
- final byte[] keyBytes = serializer.serializeBaseKey(topic,
windowedKey);
+ final byte[] keyBytes = serializer.serializeBaseKey(topic, new
RecordHeaders(), windowedKey);
// stick with the same built-in partitioner util functions that
producer used
// to make sure its behavior is consistent with the producer
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 35adaed1ee3..3b82fb04194 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -122,15 +122,16 @@ public class SessionWindowedDeserializerTest {
final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("test-value");
+ final String topic = "dummy";
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
final Windowed<String> windowed = new Windowed<>("test-key", new
SessionWindow(0, 1));
- final byte[] data = new
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy",
headers, windowed);
+ final byte[] data = new
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize(topic,
headers, windowed);
final SessionWindowedDeserializer<String> testDeserializer = new
SessionWindowedDeserializer<>(mockDeserializer);
- testDeserializer.deserialize("dummy", headers, data);
+ testDeserializer.deserialize(topic, headers, data);
- verify(mockDeserializer).deserialize(anyString(), eq(headers),
any(byte[].class));
+ verify(mockDeserializer).deserialize(eq(topic), eq(headers),
any(byte[].class));
verify(mockDeserializer, never()).deserialize(anyString(),
any(byte[].class));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index d2e15f542a1..bd2d4711af3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -36,9 +36,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -122,15 +122,21 @@ public class SessionWindowedSerializerTest {
final Serializer<String> mockSerializer = mock(StringSerializer.class);
when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
+ final String topic = "dummy";
final String key = "test-key";
final Windowed<String> data = new Windowed<>(key, new SessionWindow(0,
1));
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
final SessionWindowedSerializer<String> testSerializer = new
SessionWindowedSerializer<>(mockSerializer);
- testSerializer.serialize("dummy", headers, data);
+ testSerializer.serialize(topic, headers, data);
- verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
- verify(mockSerializer, never()).serialize(anyString(), eq(key));
+ verify(mockSerializer, times(1)).serialize(topic, headers, key);
+ verify(mockSerializer, never()).serialize(topic, key);
+
+ testSerializer.serializeBaseKey(topic, headers, data);
+
+ verify(mockSerializer, times(2)).serialize(topic, headers, key);
+ verify(mockSerializer, never()).serialize(topic, key);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
index 0b6f17a5f27..f48d2394c12 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -179,15 +179,16 @@ public class TimeWindowedDeserializerTest {
final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("test-value");
+ final String topic = "dummy";
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
final Windowed<String> windowed = new Windowed<>("test-key", new
TimeWindow(0, 1));
- final byte[] data = new
TimeWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy",
headers, windowed);
+ final byte[] data = new
TimeWindowedSerializer<>(Serdes.String().serializer()).serialize(topic,
headers, windowed);
final TimeWindowedDeserializer<String> testDeserializer = new
TimeWindowedDeserializer<>(mockDeserializer, 1L);
- testDeserializer.deserialize("dummy", headers, data);
+ testDeserializer.deserialize(topic, headers, data);
- verify(mockDeserializer).deserialize(anyString(), eq(headers),
any(byte[].class));
+ verify(mockDeserializer).deserialize(eq(topic), eq(headers),
any(byte[].class));
verify(mockDeserializer, never()).deserialize(anyString(),
any(byte[].class));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
index 54dcfa57142..837bb344542 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -36,9 +36,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -122,15 +122,21 @@ public class TimeWindowedSerializerTest {
final Serializer<String> mockSerializer = mock(StringSerializer.class);
when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
+ final String topic = "dummy";
final String key = "test-key";
final Windowed<String> data = new Windowed<>(key, new TimeWindow(0,
1));
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
final TimeWindowedSerializer<String> testSerializer = new
TimeWindowedSerializer<>(mockSerializer);
- testSerializer.serialize("dummy", headers, data);
+ testSerializer.serialize(topic, headers, data);
- verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
- verify(mockSerializer, never()).serialize(anyString(), eq(key));
+ verify(mockSerializer, times(1)).serialize(topic, headers, key);
+ verify(mockSerializer, never()).serialize(topic, key);
+
+ testSerializer.serializeBaseKey(topic, headers, data);
+
+ verify(mockSerializer, times(2)).serialize(topic, headers, key);
+ verify(mockSerializer, never()).serialize(topic, key);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
index cc858c32aa9..a0a73c829fc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -87,7 +88,7 @@ public class WindowedSerdesTest {
final TimeWindowedSerializer<byte[]> serializer = new
TimeWindowedSerializer<>();
final NullPointerException exception = assertThrows(
NullPointerException.class,
- () -> serializer.serializeBaseKey("topic", new Windowed<>(new
byte[0], new TimeWindow(0, 1))));
+ () -> serializer.serializeBaseKey("topic", new RecordHeaders(),
new Windowed<>(new byte[0], new TimeWindow(0, 1))));
assertThat(
exception.getMessage(),
equalTo("Inner serializer is `null`. User code must use
constructor " +
@@ -123,7 +124,7 @@ public class WindowedSerdesTest {
final SessionWindowedSerializer<byte[]> serializer = new
SessionWindowedSerializer<>();
final NullPointerException exception = assertThrows(
NullPointerException.class,
- () -> serializer.serializeBaseKey("topic", new Windowed<>(new
byte[0], new SessionWindow(0, 0))));
+ () -> serializer.serializeBaseKey("topic", new RecordHeaders(),
new Windowed<>(new byte[0], new SessionWindow(0, 0))));
assertThat(
exception.getMessage(),
equalTo("Inner serializer is `null`. User code must use
constructor " +