This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 197e5e748aa KAFKA-20173: Propagate headers into serde 5/N (#21756)
197e5e748aa is described below
commit 197e5e748aac41a6a9ced2920d2ffed7ef67d405
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Mar 15 23:45:01 2026 +0100
KAFKA-20173: Propagate headers into serde 5/N (#21756)
This PR ensures that FullChangeSerde propagates headers to the wrapped
de/serializers.
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/kstream/internals/FullChangeSerde.java | 13 ++--
.../InMemoryTimeOrderedKeyValueChangeBuffer.java | 7 +-
.../kstream/internals/FullChangeSerdeTest.java | 75 +++++++++++++++++++---
3 files changed, 76 insertions(+), 19 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 3a3439448cb..194b93f6029 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
@@ -44,27 +45,27 @@ public final class FullChangeSerde<T> {
return inner;
}
- public Change<byte[]> serializeParts(final String topic, final Change<T>
data) {
+ public Change<byte[]> serializeParts(final String topic, final Headers
headers, final Change<T> data) {
if (data == null) {
return null;
}
final Serializer<T> innerSerializer = innerSerde().serializer();
- final byte[] oldBytes = data.oldValue == null ? null :
innerSerializer.serialize(topic, data.oldValue);
- final byte[] newBytes = data.newValue == null ? null :
innerSerializer.serialize(topic, data.newValue);
+ final byte[] oldBytes = data.oldValue == null ? null :
innerSerializer.serialize(topic, headers, data.oldValue);
+ final byte[] newBytes = data.newValue == null ? null :
innerSerializer.serialize(topic, headers, data.newValue);
return new Change<>(newBytes, oldBytes);
}
- public Change<T> deserializeParts(final String topic, final Change<byte[]>
serialChange) {
+ public Change<T> deserializeParts(final String topic, final Headers
headers, final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final Deserializer<T> innerDeserializer = innerSerde().deserializer();
final T oldValue =
- serialChange.oldValue == null ? null :
innerDeserializer.deserialize(topic, serialChange.oldValue);
+ serialChange.oldValue == null ? null :
innerDeserializer.deserialize(topic, headers, serialChange.oldValue);
final T newValue =
- serialChange.newValue == null ? null :
innerDeserializer.deserialize(topic, serialChange.newValue);
+ serialChange.newValue == null ? null :
innerDeserializer.deserialize(topic, headers, serialChange.newValue);
return new Change<>(newValue, oldValue);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
index eff87a86fee..14bf82121ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
@@ -399,10 +399,11 @@ public final class
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
next.getKey().time() + "]"
);
}
- final K key =
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
+ final K key =
keySerde.deserializer().deserialize(changelogTopic, context.headers(),
next.getKey().key().get());
final BufferValue bufferValue = next.getValue();
final Change<V> value = valueSerde.deserializeParts(
changelogTopic,
+ context.headers(),
new Change<>(bufferValue.newValue(),
bufferValue.oldValue())
);
callback.accept(new Eviction<K, Change<V>>(key, value,
bufferValue.context()));
@@ -470,8 +471,8 @@ public final class
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
requireNonNull(record.value(), "value cannot be null");
requireNonNull(recordContext, "recordContext cannot be null");
- final Bytes serializedKey =
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, record.key()));
- final Change<byte[]> serialChange =
valueSerde.serializeParts(changelogTopic, record.value());
+ final Bytes serializedKey =
Bytes.wrap(keySerde.serializer().serialize(changelogTopic,
recordContext.headers(), record.key()));
+ final Change<byte[]> serialChange =
valueSerde.serializeParts(changelogTopic, recordContext.headers(),
record.value());
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index 3cade798cc4..50074f22dff 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -16,7 +16,14 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
@@ -25,6 +32,10 @@ import java.nio.ByteBuffer;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class FullChangeSerdeTest {
private final FullChangeSerde<String> serde =
FullChangeSerde.wrap(Serdes.String());
@@ -58,22 +69,22 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripNull() {
- assertThat(serde.serializeParts(null, null), nullValue());
+ assertThat(serde.serializeParts(null, null, null), nullValue());
assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null),
nullValue());
assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null),
nullValue());
- assertThat(serde.deserializeParts(null, null), nullValue());
+ assertThat(serde.deserializeParts(null, null, null), nullValue());
}
@Test
public void shouldRoundTripNullChange() {
assertThat(
- serde.serializeParts(null, new Change<>(null, null)),
+ serde.serializeParts(null, null, new Change<>(null, null)),
is(new Change<byte[]>(null, null))
);
assertThat(
- serde.deserializeParts(null, new Change<>(null, null)),
+ serde.deserializeParts(null, null, new Change<>(null, null)),
is(new Change<String>(null, null))
);
@@ -86,34 +97,78 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripOldNull() {
- final Change<byte[]> serialized = serde.serializeParts(null, new
Change<>("new", null));
+ final Change<byte[]> serialized = serde.serializeParts(null, new
RecordHeaders(), new Change<>("new", null));
final byte[] legacyFormat =
mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat =
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
- serde.deserializeParts(null, decomposedLegacyFormat),
+ serde.deserializeParts(null, new RecordHeaders(),
decomposedLegacyFormat),
is(new Change<>("new", null))
);
}
@Test
public void shouldRoundTripNewNull() {
- final Change<byte[]> serialized = serde.serializeParts(null, new
Change<>(null, "old"));
+ final Change<byte[]> serialized = serde.serializeParts(null, new
RecordHeaders(), new Change<>(null, "old"));
final byte[] legacyFormat =
mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat =
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
- serde.deserializeParts(null, decomposedLegacyFormat),
+ serde.deserializeParts(null, new RecordHeaders(),
decomposedLegacyFormat),
is(new Change<>(null, "old"))
);
}
@Test
public void shouldRoundTripChange() {
- final Change<byte[]> serialized = serde.serializeParts(null, new
Change<>("new", "old"));
+ final Change<byte[]> serialized = serde.serializeParts(null, new
RecordHeaders(), new Change<>("new", "old"));
final byte[] legacyFormat =
mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat =
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
- serde.deserializeParts(null, decomposedLegacyFormat),
+ serde.deserializeParts(null, new RecordHeaders(),
decomposedLegacyFormat),
is(new Change<>("new", "old"))
);
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.serializer()).thenReturn(mockSerializer);
+
+ final String topic = "dummy";
+ final String newValue = "new";
+ final String oldValue = "old";
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final Change<String> data = new Change<>(newValue, oldValue);
+
+ final FullChangeSerde<String> testSerde =
FullChangeSerde.wrap(mockSerde);
+
+ testSerde.serializeParts(topic, headers, data);
+
+ verify(mockSerializer).serialize(topic, headers, newValue);
+ verify(mockSerializer).serialize(topic, headers, oldValue);
+ verify(mockSerializer, never()).serialize(topic, newValue);
+ verify(mockSerializer, never()).serialize(topic, oldValue);
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+
+ final String topic = "dummy";
+ final byte[] newValueBytes = "new".getBytes();
+ final byte[] oldValueBytes = "old".getBytes();
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final Change<byte[]> serialChange = new Change<>(newValueBytes,
oldValueBytes);
+
+ final FullChangeSerde<String> testSerde =
FullChangeSerde.wrap(mockSerde);
+
+ testSerde.deserializeParts(topic, headers, serialChange);
+
+ verify(mockDeserializer).deserialize(topic, headers, newValueBytes);
+ verify(mockDeserializer).deserialize(topic, headers, oldValueBytes);
+ verify(mockDeserializer, never()).deserialize(topic, newValueBytes);
+ verify(mockDeserializer, never()).deserialize(topic, oldValueBytes);
+ }
}