This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 367efc4a2ce KAFKA-20173: Propagate headers into serde 7/N (#21851)
367efc4a2ce is described below
commit 367efc4a2ce89114f9113af18a811799983ae2d0
Author: Uladzislau Blok <[email protected]>
AuthorDate: Fri Apr 3 06:29:41 2026 +0200
KAFKA-20173: Propagate headers into serde 7/N (#21851)
This PR ensures headers are propagated in
`SubscriptionResponseWrapperSerde`,
`LeftOrRightValueSerializer/Deserializer` and
`TimestampedKeyAndJoinSideSerializer/Deserializer` Added unit tests to
check headers propagation.
Reviewers: Murali Basani <[email protected]>, Alieh Saeedi
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../SubscriptionResponseWrapperSerde.java | 15 ++++-
.../internals/LeftOrRightValueDeserializer.java | 10 ++-
.../internals/LeftOrRightValueSerializer.java | 10 ++-
.../TimestampedKeyAndJoinSideDeserializer.java | 10 ++-
.../TimestampedKeyAndJoinSideSerializer.java | 10 ++-
.../SubscriptionResponseWrapperSerdeTest.java | 71 +++++++++++++++++++---
.../LeftOrRightValueDeserializerTest.java | 53 ++++++++++++++++
.../internals/LeftOrRightValueSerializerTest.java | 44 ++++++++++----
.../TimestampedKeyAndJoinSideDeserializerTest.java | 65 ++++++++++++++++++++
.../TimestampedKeyAndJoinSideSerializerTest.java | 55 +++++++++++++----
10 files changed, 300 insertions(+), 43 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 0523ed2fe79..4e5c1f7fc2d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
@@ -64,13 +65,18 @@ public class SubscriptionResponseWrapperSerde<VRight>
implements Serde<Subscript
@Override
public byte[] serialize(final String topic, final
SubscriptionResponseWrapper<V> data) {
+ throw new
UnsupportedOperationException("SubscriptionResponseWrapperSerializer requires
the headers-aware version of serialize");
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized
data}
if (data.version() < 0) {
throw new
UnsupportedVersionException("SubscriptionResponseWrapper version cannot be
negative");
}
- final byte[] serializedData = data.foreignValue() == null ? null :
serializer.serialize(topic, data.foreignValue());
+ final byte[] serializedData = data.foreignValue() == null ? null :
serializer.serialize(topic, headers, data.foreignValue());
final int serializedDataLength = serializedData == null ? 0 :
serializedData.length;
final long[] originalHash = data.originalValueHash();
final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
@@ -111,6 +117,11 @@ public class SubscriptionResponseWrapperSerde<VRight>
implements Serde<Subscript
@Override
public SubscriptionResponseWrapper<V> deserialize(final String topic,
final byte[] data) {
+ throw new
UnsupportedOperationException("SubscriptionResponseWrapperSerializer requires
the headers-aware version of deserialize");
+ }
+
+ @Override
+ public SubscriptionResponseWrapper<V> deserialize(final String topic,
final Headers headers, final byte[] data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized
data}
final ByteBuffer buf = ByteBuffer.wrap(data);
@@ -134,7 +145,7 @@ public class SubscriptionResponseWrapperSerde<VRight>
implements Serde<Subscript
final byte[] serializedValue;
serializedValue = new byte[data.length - lengthSum];
buf.get(serializedValue, 0, serializedValue.length);
- value = deserializer.deserialize(topic, serializedValue);
+ value = deserializer.deserialize(topic, headers,
serializedValue);
} else {
value = null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
index df45bc683dd..d0df771fe58 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -57,13 +58,18 @@ public class LeftOrRightValueDeserializer<V1, V2>
implements WrappingNullableDes
@Override
public LeftOrRightValue<V1, V2> deserialize(final String topic, final
byte[] data) {
+ throw new UnsupportedOperationException("LeftOrRightValueDeserializer
requires the headers-aware version of deserialize");
+ }
+
+ @Override
+ public LeftOrRightValue<V1, V2> deserialize(final String topic, final
Headers headers, final byte[] data) {
if (data == null || data.length == 0) {
return null;
}
return (data[0] == 1)
- ?
LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic,
rawValue(data)))
- :
LeftOrRightValue.makeRightValue(rightDeserializer.deserialize(topic,
rawValue(data)));
+ ?
LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic, headers,
rawValue(data)))
+ :
LeftOrRightValue.makeRightValue(rightDeserializer.deserialize(topic, headers,
rawValue(data)));
}
private byte[] rawValue(final byte[] data) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
index 1c64c29fd5a..9af03764e32 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -61,13 +62,18 @@ public class LeftOrRightValueSerializer<V1, V2> implements
WrappingNullableSeria
@Override
public byte[] serialize(final String topic, final LeftOrRightValue<V1, V2>
data) {
+ throw new UnsupportedOperationException("LeftOrRightValueSerializer
requires the headers-aware version of serialize");
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers, final
LeftOrRightValue<V1, V2> data) {
if (data == null) {
return null;
}
final byte[] rawValue = (data.leftValue() != null)
- ? leftSerializer.serialize(topic, data.leftValue())
- : rightSerializer.serialize(topic, data.rightValue());
+ ? leftSerializer.serialize(topic, headers, data.leftValue())
+ : rightSerializer.serialize(topic, headers, data.rightValue());
if (rawValue == null) {
return null;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
index 4285dd7bf16..a962f3f3bcb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
@@ -56,9 +57,14 @@ public class TimestampedKeyAndJoinSideDeserializer<K>
implements WrappingNullabl
@Override
public TimestampedKeyAndJoinSide<K> deserialize(final String topic, final
byte[] data) {
+ throw new
UnsupportedOperationException("TimestampedKeyAndJoinSideDeserializer requires
the headers-aware version of deserialize");
+ }
+
+ @Override
+ public TimestampedKeyAndJoinSide<K> deserialize(final String topic, final
Headers headers, final byte[] data) {
final boolean isLeft = data[StateSerdes.TIMESTAMP_SIZE] == 1;
- final K key = keyDeserializer.deserialize(topic, rawKey(data));
- final long timestamp = timestampDeserializer.deserialize(topic,
rawTimestamp(data));
+ final K key = keyDeserializer.deserialize(topic, headers,
rawKey(data));
+ final long timestamp = timestampDeserializer.deserialize(topic,
headers, rawTimestamp(data));
return isLeft ? TimestampedKeyAndJoinSide.makeLeft(key, timestamp) :
TimestampedKeyAndJoinSide.makeRight(key, timestamp);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
index d94cc486357..5d9b892ac90 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
@@ -56,9 +57,14 @@ public class TimestampedKeyAndJoinSideSerializer<K>
implements WrappingNullableS
@Override
public byte[] serialize(final String topic, final
TimestampedKeyAndJoinSide<K> data) {
+ throw new
UnsupportedOperationException("TimestampedKeyAndJoinSideSerializer requires the
headers-aware version of serialize");
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers, final
TimestampedKeyAndJoinSide<K> data) {
final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
- final byte[] keyBytes = keySerializer.serialize(topic, data.key());
- final byte[] timestampBytes = timestampSerializer.serialize(topic,
data.timestamp());
+ final byte[] keyBytes = keySerializer.serialize(topic, headers,
data.key());
+ final byte[] timestampBytes = timestampSerializer.serialize(topic,
headers, data.timestamp());
return ByteBuffer
.allocate(timestampBytes.length + 1 + keyBytes.length)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 276600fd106..9aa5f28fcd7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -17,10 +17,14 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.junit.jupiter.api.Test;
@@ -32,6 +36,10 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class SubscriptionResponseWrapperSerdeTest {
private static final class NonNullableSerde<T> implements Serde<T>,
Serializer<T>, Deserializer<T> {
@@ -68,14 +76,16 @@ public class SubscriptionResponseWrapperSerdeTest {
}
}
+ private static final Headers HEADERS = new RecordHeaders();
+
@Test
- public void ShouldSerdeWithNonNullsTest() {
+ public void shouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01,
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
- final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
HEADERS, srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
@@ -88,8 +98,8 @@ public class SubscriptionResponseWrapperSerdeTest {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01,
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, null, 1);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
- final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
HEADERS, srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
assertArrayEquals(hashedValue, result.originalValueHash());
assertNull(result.foreignValue());
@@ -103,8 +113,8 @@ public class SubscriptionResponseWrapperSerdeTest {
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
- final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
HEADERS, srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
@@ -118,8 +128,8 @@ public class SubscriptionResponseWrapperSerdeTest {
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
- final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
HEADERS, srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, HEADERS, serResponse);
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
@@ -142,11 +152,52 @@ public class SubscriptionResponseWrapperSerdeTest {
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(null)) {
assertThrows(
UnsupportedVersionException.class,
- () -> srwSerde.serializer().serialize(null, srw)
+ () -> srwSerde.serializer().serialize(null, HEADERS, srw)
);
}
}
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.serializer()).thenReturn(mockSerializer);
+
+ final String topic = "dummy";
+ final String foreignValue = "foreignValue";
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final SubscriptionResponseWrapper<String> data = new
SubscriptionResponseWrapper<>(null, foreignValue, 1);
+
+ final SubscriptionResponseWrapperSerde<String> testSerde = new
SubscriptionResponseWrapperSerde<>(mockSerde);
+
+ testSerde.serializer().serialize(topic, headers, data);
+
+ verify(mockSerializer).serialize(topic, headers, foreignValue);
+ verify(mockSerializer, never()).serialize(topic, foreignValue);
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+ when(mockSerde.serializer()).thenReturn(Serdes.String().serializer());
+
+ final String topic = "dummy";
+ final String foreignValue = "foreignValue";
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final SubscriptionResponseWrapper<String> data = new
SubscriptionResponseWrapper<>(null, foreignValue, 1);
+
+ final SubscriptionResponseWrapperSerde<String> testSerde = new
SubscriptionResponseWrapperSerde<>(mockSerde);
+
+ final byte[] serializedData = testSerde.serializer().serialize(topic,
headers, data);
+
+ testSerde.deserializer().deserialize(topic, headers, serializedData);
+
+ verify(mockDeserializer).deserialize(topic, headers,
foreignValue.getBytes());
+ verify(mockDeserializer, never()).deserialize(topic,
foreignValue.getBytes());
+ }
+
public static class InvalidSubscriptionResponseWrapper extends
SubscriptionResponseWrapper<String> {
public InvalidSubscriptionResponseWrapper(final long[]
originalValueHash,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializerTest.java
new file mode 100644
index 00000000000..bc9eca2545f
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class LeftOrRightValueDeserializerTest {
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+
+ final String topic = "dummy";
+ final String value = "some-string";
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final LeftOrRightValue<String, Object> data =
LeftOrRightValue.makeLeftValue(value);
+ final byte[] serializedBytes = new
LeftOrRightValueSerializer<>(Serdes.String().serializer(),
null).serialize(topic, headers, data);
+
+ when(mockDeserializer.deserialize(topic, headers,
value.getBytes())).thenReturn("dummy-value");
+
+ final LeftOrRightValueDeserializer<String, String> testDeserializer =
new LeftOrRightValueDeserializer<>(mockDeserializer, null);
+
+ testDeserializer.deserialize(topic, headers, serializedBytes);
+
+ verify(mockDeserializer).deserialize(topic, headers, value.getBytes());
+ verify(mockDeserializer, never()).deserialize(topic, value.getBytes());
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
index 2a5aa5c891c..db771476927 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializerTest.java
@@ -16,7 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
@@ -24,12 +28,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
public class LeftOrRightValueSerializerTest {
private static final String TOPIC = "some-topic";
+ private static final Headers HEADERS = new RecordHeaders();
- private static final LeftOrRightValueSerde<String, Integer>
STRING_OR_INTEGER_SERDE =
- new LeftOrRightValueSerde<>(Serdes.String(), Serdes.Integer());
+ private static final LeftOrRightValueSerde<String, Integer>
STRING_OR_INTEGER_SERDE = new LeftOrRightValueSerde<>(Serdes.String(),
Serdes.Integer());
@Test
public void shouldSerializeStringValue() {
@@ -37,13 +44,11 @@ public class LeftOrRightValueSerializerTest {
final LeftOrRightValue<String, Integer> leftOrRightValue =
LeftOrRightValue.makeLeftValue(value);
- final byte[] serialized =
- STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC,
leftOrRightValue);
+ final byte[] serialized =
STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, HEADERS,
leftOrRightValue);
assertThat(serialized, is(notNullValue()));
- final LeftOrRightValue<String, Integer> deserialized =
- STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC,
serialized);
+ final LeftOrRightValue<String, Integer> deserialized =
STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
assertThat(deserialized, is(leftOrRightValue));
}
@@ -54,13 +59,11 @@ public class LeftOrRightValueSerializerTest {
final LeftOrRightValue<String, Integer> leftOrRightValue =
LeftOrRightValue.makeRightValue(value);
- final byte[] serialized =
- STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC,
leftOrRightValue);
+ final byte[] serialized =
STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC, HEADERS,
leftOrRightValue);
assertThat(serialized, is(notNullValue()));
- final LeftOrRightValue<String, Integer> deserialized =
- STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC,
serialized);
+ final LeftOrRightValue<String, Integer> deserialized =
STRING_OR_INTEGER_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
assertThat(deserialized, is(leftOrRightValue));
}
@@ -68,12 +71,29 @@ public class LeftOrRightValueSerializerTest {
@Test
public void shouldThrowIfSerializeValueAsNull() {
assertThrows(NullPointerException.class,
- () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC,
LeftOrRightValue.makeLeftValue(null)));
+ () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC,
HEADERS, LeftOrRightValue.makeLeftValue(null)));
}
@Test
public void shouldThrowIfSerializeOtherValueAsNull() {
assertThrows(NullPointerException.class,
- () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC,
LeftOrRightValue.makeRightValue(null)));
+ () -> STRING_OR_INTEGER_SERDE.serializer().serialize(TOPIC,
HEADERS, LeftOrRightValue.makeRightValue(null)));
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+
+ final String topic = "dummy";
+ final String value = "some-string";
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final LeftOrRightValue<String, String> data =
LeftOrRightValue.makeLeftValue(value);
+
+ final LeftOrRightValueSerializer<String, String> testSerializer = new
LeftOrRightValueSerializer<>(mockSerializer, null);
+
+ testSerializer.serialize(topic, headers, data);
+
+ verify(mockSerializer).serialize(topic, headers, value);
+ verify(mockSerializer, never()).serialize(topic, value);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializerTest.java
new file mode 100644
index 00000000000..059d482f8ad
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TimestampedKeyAndJoinSideDeserializerTest {
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ try (MockedConstruction<LongDeserializer> timestampSerializer =
mockConstruction(LongDeserializer.class)) {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final TimestampedKeyAndJoinSideDeserializer<String>
testDeserializer = new
TimestampedKeyAndJoinSideDeserializer<>(mockDeserializer);
+ final Deserializer<Long> innerTimestampDeserializer =
timestampSerializer.constructed().get(0);
+
+ final String topic = "dummy";
+ final String key = "some-key";
+ final long timestamp = 10;
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final TimestampedKeyAndJoinSide<String> data =
TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
+ final byte[] serializedValue = new
TimestampedKeyAndJoinSideSerializer<>(Serdes.String().serializer()).serialize(topic,
headers, data);
+
+ when(mockDeserializer.deserialize(topic, headers,
key.getBytes())).thenReturn(key);
+ when(innerTimestampDeserializer.deserialize(eq(topic),
eq(headers), any(byte[].class))).thenReturn(timestamp);
+
+ testDeserializer.deserialize(topic, headers, serializedValue);
+
+ verify(mockDeserializer).deserialize(topic, headers,
key.getBytes());
+ verify(mockDeserializer, never()).deserialize(topic,
key.getBytes());
+
+ verify(innerTimestampDeserializer).deserialize(eq(topic),
eq(headers), any(byte[].class));
+ verify(innerTimestampDeserializer, never()).deserialize(eq(topic),
any(byte[].class));
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
index 81d5736015a..0604ad64af5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java
@@ -16,20 +16,31 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TimestampedKeyAndJoinSideSerializerTest {
private static final String TOPIC = "some-topic";
+ private static final Headers HEADERS = new RecordHeaders();
- private static final TimestampedKeyAndJoinSideSerde<String> STRING_SERDE =
- new TimestampedKeyAndJoinSideSerde<>(Serdes.String());
+ private static final TimestampedKeyAndJoinSideSerde<String> STRING_SERDE =
new TimestampedKeyAndJoinSideSerde<>(Serdes.String());
@Test
public void shouldSerializeKeyWithJoinSideAsTrue() {
@@ -37,13 +48,11 @@ public class TimestampedKeyAndJoinSideSerializerTest {
final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide =
TimestampedKeyAndJoinSide.makeLeft(value, 10);
- final byte[] serialized =
- STRING_SERDE.serializer().serialize(TOPIC,
timestampedKeyAndJoinSide);
+ final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC,
HEADERS, timestampedKeyAndJoinSide);
assertThat(serialized, is(notNullValue()));
- final TimestampedKeyAndJoinSide<String> deserialized =
- STRING_SERDE.deserializer().deserialize(TOPIC, serialized);
+ final TimestampedKeyAndJoinSide<String> deserialized =
STRING_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
assertThat(deserialized, is(timestampedKeyAndJoinSide));
}
@@ -54,13 +63,11 @@ public class TimestampedKeyAndJoinSideSerializerTest {
final TimestampedKeyAndJoinSide<String> timestampedKeyAndJoinSide =
TimestampedKeyAndJoinSide.makeRight(value, 20);
- final byte[] serialized =
- STRING_SERDE.serializer().serialize(TOPIC,
timestampedKeyAndJoinSide);
+ final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC,
HEADERS, timestampedKeyAndJoinSide);
assertThat(serialized, is(notNullValue()));
- final TimestampedKeyAndJoinSide<String> deserialized =
- STRING_SERDE.deserializer().deserialize(TOPIC, serialized);
+ final TimestampedKeyAndJoinSide<String> deserialized =
STRING_SERDE.deserializer().deserialize(TOPIC, HEADERS, serialized);
assertThat(deserialized, is(timestampedKeyAndJoinSide));
}
@@ -68,6 +75,32 @@ public class TimestampedKeyAndJoinSideSerializerTest {
@Test
public void shouldThrowIfSerializeNullData() {
assertThrows(NullPointerException.class,
- () -> STRING_SERDE.serializer().serialize(TOPIC,
TimestampedKeyAndJoinSide.makeLeft(null, 0)));
+ () -> STRING_SERDE.serializer().serialize(TOPIC, HEADERS,
TimestampedKeyAndJoinSide.makeLeft(null, 0)));
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ try (MockedConstruction<LongSerializer> timestampSerializer =
mockConstruction(LongSerializer.class)) {
+ final Serializer<String> mockSerializer =
mock(StringSerializer.class);
+ final TimestampedKeyAndJoinSideSerializer<String> testSerializer =
new TimestampedKeyAndJoinSideSerializer<>(mockSerializer);
+ final Serializer<Long> innerTimestampSerializer =
timestampSerializer.constructed().get(0);
+
+ final String topic = "dummy";
+ final String key = "some-key";
+ final long timestamp = 10;
+ final Headers headers = new RecordHeaders().add("key",
"value".getBytes());
+ final TimestampedKeyAndJoinSide<String> data =
TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
+
+ when(mockSerializer.serialize(topic, headers,
data.key())).thenReturn(key.getBytes());
+ when(innerTimestampSerializer.serialize(topic, headers,
data.timestamp())).thenReturn(new byte[]{Byte.MAX_VALUE});
+
+ testSerializer.serialize(topic, headers, data);
+
+ verify(mockSerializer).serialize(topic, headers, key);
+ verify(mockSerializer, never()).serialize(topic, key);
+
+ verify(innerTimestampSerializer).serialize(topic, headers,
timestamp);
+ verify(innerTimestampSerializer, never()).serialize(topic,
timestamp);
+ }
}
}