This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1bf7d6032b9 KAFKA-20173: Propagate headers into serde 8/N (#21967)
1bf7d6032b9 is described below
commit 1bf7d6032b9b4171f510dc9a1c7e0243410e27f5
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Apr 5 04:56:49 2026 +0200
KAFKA-20173: Propagate headers into serde 8/N (#21967)
Ensures that `ChangedSerde`, `SubscriptionWrapperSerde` and
`ValueAndTimestampSerde` forward headers to their underlying serdes.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kstream/internals/ChangedDeserializer.java | 2 +-
.../kstream/internals/ChangedSerializer.java | 2 +-
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 35 ++++---
.../internals/ValueAndTimestampDeserializer.java | 17 +++-
.../internals/ValueAndTimestampSerializer.java | 16 +++-
.../kstream/internals/ChangedSerdeTest.java | 54 +++++++++--
.../SubscriptionWrapperSerdeTest.java | 106 +++++++++++++++++----
.../ValueAndTimestampDeserializerTest.java | 54 +++++++++++
.../internals/ValueAndTimestampSerializerTest.java | 47 +++++----
9 files changed, 269 insertions(+), 64 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 6966ec12373..9fe62752940 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -141,7 +141,7 @@ public class ChangedDeserializer<T> implements
Deserializer<Change<T>>, Wrapping
@Override
public Change<T> deserialize(final String topic, final byte[] data) {
- return deserialize(topic, null, data);
+ throw new UnsupportedOperationException("ChangedDeserializer requires
the headers-aware version of deserialize");
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index d7d88f71b9f..ee968fea62f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -134,7 +134,7 @@ public class ChangedSerializer<T> implements
Serializer<Change<T>>, WrappingNull
@Override
public byte[] serialize(final String topic, final Change<T> data) {
- return serialize(topic, null, data);
+ throw new UnsupportedOperationException("ChangedSerializer requires
the headers-aware version of serialize");
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 064d464d8be..2c473e58e36 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
@@ -94,7 +95,7 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
}
@Override
- public byte[] serialize(final String ignored, final
SubscriptionWrapper<KLeft> data) {
+ public byte[] serialize(final String ignored, final Headers headers,
final SubscriptionWrapper<KLeft> data) {
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
if (data.version() < 0) {
@@ -103,27 +104,33 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
final int version = data.version();
if (upgradeFromV0 || version == 0) {
- return serializeV0(data);
+ return serializeV0(data, headers);
} else if (version == 1) {
- return serializeV1(data);
+ return serializeV1(data, headers);
} else {
throw new UnsupportedVersionException("Unsupported
SubscriptionWrapper version " + data.version());
}
}
- private byte[] serializePrimaryKey(final SubscriptionWrapper<KLeft>
data) {
+ @Override
+ public byte[] serialize(final String ignored, final
SubscriptionWrapper<KLeft> data) {
+ throw new
UnsupportedOperationException("SubscriptionWrapperSerializer requires the
headers-aware version of serialize");
+ }
+
+ private byte[] serializePrimaryKey(final SubscriptionWrapper<KLeft>
data, final Headers headers) {
if (primaryKeySerializationPseudoTopic == null) {
primaryKeySerializationPseudoTopic =
primaryKeySerializationPseudoTopicSupplier.get();
}
return primaryKeySerializer.serialize(
primaryKeySerializationPseudoTopic,
+ headers,
data.primaryKey()
);
}
- private ByteBuffer serializeCommon(final SubscriptionWrapper<KLeft>
data, final byte version, final int extraLength) {
- final byte[] primaryKeySerializedData = serializePrimaryKey(data);
+ private ByteBuffer serializeCommon(final SubscriptionWrapper<KLeft>
data, final Headers headers, final byte version, final int extraLength) {
+ final byte[] primaryKeySerializedData = serializePrimaryKey(data,
headers);
final ByteBuffer buf;
int dataLength = 2 + primaryKeySerializedData.length + extraLength;
if (data.hash() != null) {
@@ -145,12 +152,12 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
return buf;
}
- private byte[] serializeV0(final SubscriptionWrapper<KLeft> data) {
- return serializeCommon(data, (byte) 0, 0).array();
+ private byte[] serializeV0(final SubscriptionWrapper<KLeft> data,
final Headers headers) {
+ return serializeCommon(data, headers, (byte) 0, 0).array();
}
- private byte[] serializeV1(final SubscriptionWrapper<KLeft> data) {
- final ByteBuffer buf = serializeCommon(data, data.version(),
Integer.BYTES);
+ private byte[] serializeV1(final SubscriptionWrapper<KLeft> data,
final Headers headers) {
+ final ByteBuffer buf = serializeCommon(data, headers,
data.version(), Integer.BYTES);
buf.putInt(data.primaryPartition());
return buf.array();
}
@@ -178,7 +185,7 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
}
@Override
- public SubscriptionWrapper<KLeft> deserialize(final String ignored,
final byte[] data) {
+ public SubscriptionWrapper<KLeft> deserialize(final String ignored,
final Headers headers, final byte[] data) {
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
final ByteBuffer buf = ByteBuffer.wrap(data);
final byte versionAndIsHashNull = buf.get();
@@ -212,6 +219,7 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
final KLeft primaryKey = primaryKeyDeserializer.deserialize(
primaryKeySerializationPseudoTopic,
+ headers,
primaryKeyRaw
);
final Integer primaryPartition;
@@ -224,6 +232,11 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
return new SubscriptionWrapper<>(hash, inst, primaryKey, version,
primaryPartition);
}
+ @Override
+ public SubscriptionWrapper<KLeft> deserialize(final String ignored,
final byte[] data) {
+ throw new
UnsupportedOperationException("SubscriptionWrapperDeserializer requires the
headers-aware version of deserialize");
+ }
+
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 5f6cd98b763..d68a1f727bf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
@@ -50,12 +52,19 @@ class ValueAndTimestampDeserializer<V> implements
WrappingNullableDeserializer<V
@Override
public ValueAndTimestamp<V> deserialize(final String topic,
final byte[] valueAndTimestamp) {
+ return deserialize(topic, new RecordHeaders(), valueAndTimestamp);
+ }
+
+ @Override
+ public ValueAndTimestamp<V> deserialize(final String topic,
+ final Headers headers,
+ final byte[] valueAndTimestamp) {
if (valueAndTimestamp == null) {
return null;
}
- final long timestamp = timestampDeserializer.deserialize(topic,
rawTimestamp(valueAndTimestamp));
- final V value = valueDeserializer.deserialize(topic,
rawValue(valueAndTimestamp));
+ final long timestamp = timestampDeserializer.deserialize(topic,
headers, rawTimestamp(valueAndTimestamp));
+ final V value = valueDeserializer.deserialize(topic, headers,
rawValue(valueAndTimestamp));
return ValueAndTimestamp.make(value, timestamp);
}
@@ -85,7 +94,9 @@ class ValueAndTimestampDeserializer<V> implements
WrappingNullableDeserializer<V
}
static long timestamp(final byte[] rawValueAndTimestamp) {
- return LONG_DESERIALIZER.deserialize(null,
rawTimestamp(rawValueAndTimestamp));
+ // We know we use LongDeserializer here, so we don't need headers
+ // still use headers-aware version to be consistent in codebase
+ return LONG_DESERIALIZER.deserialize(null, null,
rawTimestamp(rawValueAndTimestamp));
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index e64e5e18b7f..6e0ad748e10 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
@@ -68,19 +70,27 @@ public class ValueAndTimestampSerializer<V> implements
WrappingNullableSerialize
@Override
public byte[] serialize(final String topic,
final ValueAndTimestamp<V> data) {
+ return serialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public byte[] serialize(final String topic,
+ final Headers headers,
+ final ValueAndTimestamp<V> data) {
if (data == null) {
return null;
}
- return serialize(topic, data.value(), data.timestamp());
+ return serialize(topic, headers, data.value(), data.timestamp());
}
public byte[] serialize(final String topic,
+ final Headers headers,
final V data,
final long timestamp) {
if (data == null) {
return null;
}
- final byte[] rawValue = valueSerializer.serialize(topic, data);
+ final byte[] rawValue = valueSerializer.serialize(topic, headers,
data);
// Since we can't control the result of the internal serializer, we
make sure that the result
// is not null as well.
@@ -91,7 +101,7 @@ public class ValueAndTimestampSerializer<V> implements
WrappingNullableSerialize
return null;
}
- final byte[] rawTimestamp = timestampSerializer.serialize(topic,
timestamp);
+ final byte[] rawTimestamp = timestampSerializer.serialize(topic,
headers, timestamp);
return ByteBuffer
.allocate(rawTimestamp.length + rawValue.length)
.put(rawTimestamp)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
index 50dea706aeb..2024a5b557c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
@@ -16,9 +16,13 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.internals.ByteUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -33,9 +37,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ChangedSerdeTest {
private static final String TOPIC = "some-topic";
+ private static final Headers HEADERS = new RecordHeaders().add("key",
"value".getBytes());
private static final Serializer<String> STRING_SERIALIZER =
Serdes.String().serializer();
private static final ChangedSerializer<String> CHANGED_STRING_SERIALIZER =
@@ -51,9 +60,9 @@ public class ChangedSerdeTest {
final String nonNullOldValue = "world";
private static <T> void checkRoundTrip(final T data, final Serializer<T>
serializer, final Deserializer<T> deserializer) {
- final byte[] serialized = serializer.serialize(TOPIC, data);
+ final byte[] serialized = serializer.serialize(TOPIC, HEADERS, data);
assertThat(serialized, is(notNullValue()));
- final T deserialized = deserializer.deserialize(TOPIC, serialized);
+ final T deserialized = deserializer.deserialize(TOPIC, HEADERS,
serialized);
assertThat(deserialized, is(data));
}
@@ -63,7 +72,7 @@ public class ChangedSerdeTest {
assertThrows(
StreamsException.class,
- () -> CHANGED_STRING_SERIALIZER.serialize(TOPIC, data));
+ () -> CHANGED_STRING_SERIALIZER.serialize(TOPIC, HEADERS,
data));
}
@Test
@@ -75,7 +84,7 @@ public class ChangedSerdeTest {
assertThrows(
StreamsException.class,
- () -> serializer.serialize(TOPIC, data));
+ () -> serializer.serialize(TOPIC, HEADERS, data));
}
@Test
@@ -99,7 +108,7 @@ public class ChangedSerdeTest {
@Test
public void shouldThrowErrorIfEncountersAnUnknownByteValueForOldNewFlag() {
final Change<String> data = new Change<>(null, nonNullOldValue);
- final byte[] serialized = CHANGED_STRING_SERIALIZER.serialize(TOPIC,
data);
+ final byte[] serialized = CHANGED_STRING_SERIALIZER.serialize(TOPIC,
HEADERS, data);
assertThat(serialized, is(notNullValue()));
// mutate the serialized array to replace OLD_NEW_FLAG with an
unsupported byte value
@@ -109,7 +118,7 @@ public class ChangedSerdeTest {
assertThrows(
StreamsException.class,
- () -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized));
+ () -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, HEADERS,
serialized));
}
@Test
@@ -173,7 +182,38 @@ public class ChangedSerdeTest {
private static void checkRoundTripForReservedVersion(final Change<String>
data) {
final byte[] serialized = serializeVersions3Through5(TOPIC, data);
assertThat(serialized, is(notNullValue()));
- final Change<String> deserialized =
CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized);
+ final Change<String> deserialized =
CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, HEADERS, serialized);
assertThat(deserialized, is(data));
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final ChangedSerializer<String> changedSerializer = new
ChangedSerializer<>(mockSerializer);
+
+ final String value = "value";
+ final Change<String> data = new Change<>(value, null);
+ when(mockSerializer.serialize(TOPIC, HEADERS,
value)).thenReturn(value.getBytes());
+
+ changedSerializer.serialize(TOPIC, HEADERS, data);
+
+ verify(mockSerializer).serialize(TOPIC, HEADERS, value);
+ verify(mockSerializer, never()).serialize(TOPIC, value);
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final ChangedDeserializer<String> changedDeserializer = new
ChangedDeserializer<>(mockDeserializer);
+
+ final String value = "value";
+ final Change<String> data = new Change<>(value, null);
+
+ final byte[] serialized = CHANGED_STRING_SERIALIZER.serialize(TOPIC,
HEADERS, data);
+
+ changedDeserializer.deserialize(TOPIC, HEADERS, serialized);
+
+ verify(mockDeserializer).deserialize(TOPIC, HEADERS, value.getBytes());
+ verify(mockDeserializer, never()).deserialize(TOPIC, value.getBytes());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index db2842401b5..3d9d53feebb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -17,7 +17,14 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.Murmur3;
@@ -29,16 +36,23 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@SuppressWarnings({"unchecked", "rawtypes"})
public class SubscriptionWrapperSerdeTest {
+ private static final String TOPIC = "pkTopic";
+ private static final Headers HEADERS = new RecordHeaders().add("key",
"value".getBytes());
+
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeV0Test() {
final byte version = SubscriptionWrapper.VERSION_0;
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF,
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
final Integer primaryPartition = null;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -47,9 +61,9 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
version,
primaryPartition);
- final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
+ final byte[] serialized = swSerde.serializer().serialize(null,
HEADERS, wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
- .deserialize(null, serialized);
+ .deserialize(null, HEADERS, serialized);
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.instruction());
assertArrayEquals(hashedValue, deserialized.hash());
@@ -63,7 +77,7 @@ public class SubscriptionWrapperSerdeTest {
public void shouldSerdeV1Test() {
final byte version = SubscriptionWrapper.VERSION_1;
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF,
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
final Integer primaryPartition = 10;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -72,9 +86,9 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
version,
primaryPartition);
- final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
+ final byte[] serialized = swSerde.serializer().serialize(null,
HEADERS, wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
- .deserialize(null, serialized);
+ .deserialize(null, HEADERS, serialized);
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.instruction());
assertArrayEquals(hashedValue, deserialized.hash());
@@ -88,7 +102,7 @@ public class SubscriptionWrapperSerdeTest {
public void shouldSerdeWithV0IfUpgradeTest() {
final byte version = SubscriptionWrapper.VERSION_1;
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
swSerde.configure(
Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
StreamsConfig.UPGRADE_FROM_32),
true);
@@ -100,9 +114,9 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
version,
primaryPartition);
- final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
+ final byte[] serialized = swSerde.serializer().serialize(null,
HEADERS, wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
- .deserialize(null, serialized);
+ .deserialize(null, HEADERS, serialized);
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.instruction());
assertArrayEquals(hashedValue, deserialized.hash());
@@ -116,7 +130,7 @@ public class SubscriptionWrapperSerdeTest {
public void shouldSerdeNullHashV0Test() {
final byte version = SubscriptionWrapper.VERSION_0;
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
final long[] hashedValue = null;
final Integer primaryPartition = null;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -125,8 +139,8 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
version,
primaryPartition);
- final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
- final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer().deserialize(null, serialized);
+ final byte[] serialized = swSerde.serializer().serialize(null,
HEADERS, wrapper);
+ final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer().deserialize(null, HEADERS, serialized);
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.instruction());
assertArrayEquals(hashedValue, deserialized.hash());
@@ -140,7 +154,7 @@ public class SubscriptionWrapperSerdeTest {
public void shouldSerdeNullHashV1Test() {
final byte version = SubscriptionWrapper.VERSION_1;
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
final long[] hashedValue = null;
final Integer primaryPartition = 10;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
@@ -149,9 +163,9 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
version,
primaryPartition);
- final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
+ final byte[] serialized = swSerde.serializer().serialize(null,
HEADERS, wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
- .deserialize(null, serialized);
+ .deserialize(null, HEADERS, serialized);
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.instruction());
assertArrayEquals(hashedValue, deserialized.hash());
@@ -163,7 +177,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
public void shouldSerdeNullPrimaryPartitionOnV0Test() {
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF,
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
final Integer primaryPartition = null;
final byte version = SubscriptionWrapper.VERSION_0;
@@ -173,8 +187,8 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
version,
primaryPartition);
- final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
- final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer().deserialize(null, serialized);
+ final byte[] serialized = swSerde.serializer().serialize(null,
HEADERS, wrapper);
+ final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer().deserialize(null, HEADERS, serialized);
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.instruction());
assertArrayEquals(hashedValue, deserialized.hash());
@@ -235,7 +249,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
public void shouldThrowExceptionOnNullPrimaryPartitionV1Test() {
- final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, Serdes.String());
final String originalKey = "originalKey";
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF,
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
final Integer primaryPartition = null;
@@ -245,7 +259,7 @@ public class SubscriptionWrapperSerdeTest {
originalKey,
SubscriptionWrapper.VERSION_1,
primaryPartition);
- assertThrows(NullPointerException.class, () ->
swSerde.serializer().serialize(null, wrapper));
+ assertThrows(NullPointerException.class, () ->
swSerde.serializer().serialize(null, HEADERS, wrapper));
}
@Test
@@ -260,4 +274,56 @@ public class SubscriptionWrapperSerdeTest {
(byte) 0x80,
primaryPartition));
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.serializer()).thenReturn(mockSerializer);
+
+ final String primaryKey = "originalKey";
+ final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF,
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
+ final Integer primaryPartition = null;
+ when(mockSerializer.serialize(TOPIC, HEADERS,
primaryKey)).thenReturn(primaryKey.getBytes());
+
+ final SubscriptionWrapper<String> wrapper = new SubscriptionWrapper<>(
+ hashedValue,
+ SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ primaryKey,
+ SubscriptionWrapper.VERSION_0,
+ primaryPartition);
+
+ final SubscriptionWrapperSerde<String> swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, mockSerde);
+ swSerde.serializer().serialize(TOPIC, HEADERS, wrapper);
+
+ verify(mockSerializer).serialize(TOPIC, HEADERS, primaryKey);
+ verify(mockSerializer, never()).serialize(TOPIC, primaryKey);
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+ when(mockSerde.serializer()).thenReturn(Serdes.String().serializer());
+
+ final String primaryKey = "originalKey";
+ final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF,
(byte) 0xAA, (byte) 0x00, (byte) 0x19});
+ final Integer primaryPartition = null;
+ final SubscriptionWrapper<String> wrapper = new SubscriptionWrapper<>(
+ hashedValue,
+ SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ primaryKey,
+ SubscriptionWrapper.VERSION_0,
+ primaryPartition);
+ when(mockDeserializer.deserialize(TOPIC, HEADERS,
primaryKey.getBytes())).thenReturn(primaryKey);
+
+ final SubscriptionWrapperSerde<String> swSerde = new
SubscriptionWrapperSerde<>(() -> TOPIC, mockSerde);
+ final byte[] serialized = swSerde.serializer().serialize(TOPIC,
HEADERS, wrapper);
+
+ swSerde.deserializer().deserialize(TOPIC, HEADERS, serialized);
+
+ verify(mockDeserializer).deserialize(TOPIC, HEADERS,
primaryKey.getBytes());
+ verify(mockDeserializer, never()).deserialize(TOPIC,
primaryKey.getBytes());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializerTest.java
new file mode 100644
index 00000000000..e71156510ca
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class ValueAndTimestampDeserializerTest {
+ private static final String TOPIC = "some-topic";
+ private static final long TIMESTAMP = 23;
+ private static final Headers HEADERS = new RecordHeaders().add("key",
"value".getBytes());
+
+ private static final ValueAndTimestampSerde<String> STRING_SERDE = new
ValueAndTimestampSerde<>(Serdes.String());
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final ValueAndTimestampDeserializer<String> deserializer = new
ValueAndTimestampDeserializer<>(mockDeserializer);
+
+ final String value = "value";
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(value, TIMESTAMP);
+ final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC,
HEADERS, valueAndTimestamp);
+
+ deserializer.deserialize(TOPIC, HEADERS, serialized);
+
+ verify(mockDeserializer).deserialize(TOPIC, HEADERS,
rawValue(serialized));
+ verify(mockDeserializer, never()).deserialize(TOPIC,
rawValue(serialized));
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
index ba4dce54697..e65fbc4daa6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.Test;
@@ -28,10 +31,15 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ValueAndTimestampSerializerTest {
private static final String TOPIC = "some-topic";
private static final long TIMESTAMP = 23;
+ private static final Headers HEADERS = new RecordHeaders().add("key",
"value".getBytes());
private static final ValueAndTimestampSerde<String> STRING_SERDE =
new ValueAndTimestampSerde<>(Serdes.String());
@@ -41,15 +49,9 @@ public class ValueAndTimestampSerializerTest {
final String value = "some-string";
final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(value, TIMESTAMP);
-
- final byte[] serialized =
- STRING_SERDE.serializer().serialize(TOPIC, valueAndTimestamp);
-
+ final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC,
HEADERS, valueAndTimestamp);
assertThat(serialized, is(notNullValue()));
-
- final ValueAndTimestamp<String> deserialized =
- STRING_SERDE.deserializer().deserialize(TOPIC, serialized);
-
+ final ValueAndTimestamp<String> deserialized =
STRING_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
assertThat(deserialized, is(valueAndTimestamp));
}
@@ -58,9 +60,9 @@ public class ValueAndTimestampSerializerTest {
final String value = "food";
final ValueAndTimestamp<String> oldValueAndTimestamp =
ValueAndTimestamp.make(value, TIMESTAMP);
- final byte[] oldSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+ final byte[] oldSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, oldValueAndTimestamp);
final ValueAndTimestamp<String> newValueAndTimestamp =
ValueAndTimestamp.make(value, TIMESTAMP + 1);
- final byte[] newSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, newValueAndTimestamp);
+ final byte[] newSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, newValueAndTimestamp);
assertTrue(ValueAndTimestampSerializer.valuesAreSameAndTimeIsIncreasing(oldSerializedValue,
newSerializedValue));
}
@@ -69,16 +71,15 @@ public class ValueAndTimestampSerializerTest {
final String value = "balls";
final ValueAndTimestamp<String> oldValueAndTimestamp =
ValueAndTimestamp.make(value, TIMESTAMP);
- final byte[] oldSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+ final byte[] oldSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, HEADERS, oldValueAndTimestamp);
final ValueAndTimestamp<String> outOfOrderValueAndTimestamp =
ValueAndTimestamp.make(value, TIMESTAMP - 1);
- final byte[] outOfOrderSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, outOfOrderValueAndTimestamp);
+ final byte[] outOfOrderSerializedValue =
STRING_SERDE.serializer().serialize(TOPIC, HEADERS,
outOfOrderValueAndTimestamp);
assertFalse(ValueAndTimestampSerializer.valuesAreSameAndTimeIsIncreasing(oldSerializedValue,
outOfOrderSerializedValue));
}
@Test
public void shouldSerializeNullDataAsNull() {
- final byte[] serialized =
- STRING_SERDE.serializer().serialize(TOPIC,
ValueAndTimestamp.make(null, TIMESTAMP));
+ final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC,
HEADERS, ValueAndTimestamp.make(null, TIMESTAMP));
assertThat(serialized, is(nullValue()));
}
@@ -88,12 +89,22 @@ public class ValueAndTimestampSerializerTest {
// Testing against regressions with respect to
https://github.com/apache/kafka/pull/7679
final Serializer<String> alwaysNullSerializer = (topic, data) -> null;
+ final ValueAndTimestampSerializer<String> serializer = new
ValueAndTimestampSerializer<>(alwaysNullSerializer);
+ final byte[] serialized = serializer.serialize(TOPIC, HEADERS,
"non-null-data", TIMESTAMP);
+ assertThat(serialized, is(nullValue()));
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final ValueAndTimestampSerializer<String> serializer = new
ValueAndTimestampSerializer<>(mockSerializer);
- final ValueAndTimestampSerializer<String> serializer =
- new ValueAndTimestampSerializer<>(alwaysNullSerializer);
+ final String value = "value";
+ when(mockSerializer.serialize(TOPIC, HEADERS,
value)).thenReturn(value.getBytes());
- final byte[] serialized = serializer.serialize(TOPIC, "non-null-data",
TIMESTAMP);
+ serializer.serialize(TOPIC, HEADERS, ValueAndTimestamp.make(value,
TIMESTAMP));
- assertThat(serialized, is(nullValue()));
+ verify(mockSerializer).serialize(TOPIC, HEADERS, value);
+ verify(mockSerializer, never()).serialize(TOPIC, value);
}
}