This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 7f8708fe831 KAFKA-20173: Propagate headers into serde 9/N (#21969)
7f8708fe831 is described below
commit 7f8708fe831f37bd80b229132ec676d269afc602
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Apr 5 04:45:37 2026 +0200
KAFKA-20173: Propagate headers into serde 9/N (#21969)
Ensures that `CombinedKeySchema`, FK-join processors, and global-store
processor, forward headers to their underlying serdes.
Reviewers: Matthias J. Sax <[email protected]>
---
.../foreignkeyjoin/CombinedKeySchema.java | 27 +++--
.../ForeignTableJoinProcessorSupplier.java | 4 +-
.../ResponseJoinProcessorSupplier.java | 2 +-
.../SubscriptionReceiveProcessorSupplier.java | 2 +-
.../SubscriptionSendProcessorSupplier.java | 15 +--
.../internals/GlobalStateManagerImpl.java | 4 +-
.../InMemoryTimeOrderedKeyValueChangeBuffer.java | 3 +-
.../RocksDBTimeOrderedKeyValueBuffer.java | 7 +-
.../foreignkeyjoin/CombinedKeySchemaTest.java | 130 ++++++++++++++++++---
.../ForeignTableJoinProcessorSupplierTests.java | 2 +-
.../SubscriptionReceiveProcessorSupplierTest.java | 18 +--
11 files changed, 161 insertions(+), 53 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 176205c7248..4d40856a990 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
@@ -60,16 +61,22 @@ public class CombinedKeySchema<KRight, KLeft> {
foreignKeyDeserializer = foreignKeyDeserializer == null ?
(Deserializer<KRight>) context.keySerde().deserializer() :
foreignKeyDeserializer;
}
- Bytes toBytes(final KRight foreignKey, final KLeft primaryKey) {
+ Bytes toBytes(final KRight foreignKey, final KLeft primaryKey, final
Headers headers) {
//The serialization format - note that primaryKeySerialized may be
null, such as when a prefixScan
//key is being created.
//{Integer.BYTES
foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
- final byte[] foreignKeySerializedData =
foreignKeySerializer.serialize(foreignKeySerdeTopic,
-
foreignKey);
+ final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(
+ foreignKeySerdeTopic,
+ headers,
+ foreignKey
+ );
//? bytes
- final byte[] primaryKeySerializedData =
primaryKeySerializer.serialize(primaryKeySerdeTopic,
-
primaryKey);
+ final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
+ primaryKeySerdeTopic,
+ headers,
+ primaryKey
+ );
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES +
foreignKeySerializedData.length + primaryKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length);
@@ -79,26 +86,26 @@ public class CombinedKeySchema<KRight, KLeft> {
}
- public CombinedKey<KRight, KLeft> fromBytes(final Bytes data) {
+ public CombinedKey<KRight, KLeft> fromBytes(final Bytes data, final
Headers headers) {
//{Integer.BYTES
foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] dataArray = data.get();
final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray);
final int foreignKeyLength = dataBuffer.getInt();
final byte[] foreignKeyRaw = new byte[foreignKeyLength];
dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
- final KRight foreignKey =
foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);
+ final KRight foreignKey =
foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, headers,
foreignKeyRaw);
final byte[] primaryKeyRaw = new byte[dataArray.length -
foreignKeyLength - Integer.BYTES];
dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
- final KLeft primaryKey =
primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
+ final KLeft primaryKey =
primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, headers,
primaryKeyRaw);
return new CombinedKey<>(foreignKey, primaryKey);
}
- Bytes prefixBytes(final KRight key) {
+ Bytes prefixBytes(final KRight key, final Headers headers) {
//The serialization format. Note that primaryKeySerialized is not
required/used in this function.
//{Integer.BYTES
foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
- final byte[] foreignKeySerializedData =
foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+ final byte[] foreignKeySerializedData =
foreignKeySerializer.serialize(foreignKeySerdeTopic, headers, key);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES +
foreignKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index fd6038afda0..07e4e58273f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -120,7 +120,7 @@ public class ForeignTableJoinProcessorSupplier<KLeft,
KRight, VRight>
return;
}
- final Bytes prefixBytes = keySchema.prefixBytes(record.key());
+ final Bytes prefixBytes = keySchema.prefixBytes(record.key(),
record.headers());
//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes,
ValueTimestampHeaders<SubscriptionWrapper<KLeft>>> prefixScanResults =
@@ -130,7 +130,7 @@ public class ForeignTableJoinProcessorSupplier<KLeft,
KRight, VRight>
final KeyValue<Bytes,
ValueTimestampHeaders<SubscriptionWrapper<KLeft>>> next =
prefixScanResults.next();
// have to check the prefix because the range end is
inclusive :(
if (prefixEquals(next.key.get(), prefixBytes.get())) {
- final CombinedKey<KRight, KLeft> combinedKey =
keySchema.fromBytes(next.key);
+ final CombinedKey<KRight, KLeft> combinedKey =
keySchema.fromBytes(next.key, record.headers());
context().forward(
record.withKey(combinedKey.primaryKey())
.withValue(new SubscriptionResponseWrapper<>(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 34ce8cc6873..78384c7be5c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -110,7 +110,7 @@ public class ResponseJoinProcessorSupplier<KLeft, VLeft,
VRight, VOut>
final long[] currentHash = currentValueWithTimestamp == null ?
null :
-
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic,
currentValueWithTimestamp.value()));
+
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic,
record.headers(), currentValueWithTimestamp.value()));
final long[] messageHash = record.value().originalValueHash();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index 5ec2bd1e560..d06c4ea55a7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -109,7 +109,7 @@ public class SubscriptionReceiveProcessorSupplier<KLeft,
KRight>
}
private Change<ValueTimestampHeaders<SubscriptionWrapper<KLeft>>>
inferBasedOnState(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
- final Bytes subscriptionKey = keySchema.toBytes(record.key(),
record.value().primaryKey());
+ final Bytes subscriptionKey = keySchema.toBytes(record.key(),
record.value().primaryKey(), record.headers());
final ValueTimestampHeaders<SubscriptionWrapper<KLeft>>
newValue = ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers());
final ValueTimestampHeaders<SubscriptionWrapper<KLeft>>
oldValue = store.get(subscriptionKey);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 6e57ca4991f..6f61e55ee02 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
@@ -145,7 +146,7 @@ public class SubscriptionSendProcessorSupplier<KLeft,
VLeft, KRight>
//
// if FK did change, we need to explicitly delete the old
subscription,
// because the new subscription goes to a different partition
- if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+ if (foreignKeyChanged(newForeignKey, oldForeignKey,
record.headers())) {
// this may lead to unnecessary tombstones if the old FK
did not join;
// however, we cannot avoid it as we have no means to know
if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
@@ -189,7 +190,7 @@ public class SubscriptionSendProcessorSupplier<KLeft,
VLeft, KRight>
if (needToUnsubscribe) {
// update case
- if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+ if (foreignKeyChanged(newForeignKey, oldForeignKey,
record.headers())) {
// if FK did change, we need to explicitly delete
the old subscription,
// because the new subscription goes to a
different partition
//
@@ -227,12 +228,12 @@ public class SubscriptionSendProcessorSupplier<KLeft,
VLeft, KRight>
}
}
- private boolean foreignKeyChanged(final KRight newForeignKey, final
KRight oldForeignKey) {
- return !Arrays.equals(serialize(newForeignKey),
serialize(oldForeignKey));
+ private boolean foreignKeyChanged(final KRight newForeignKey, final
KRight oldForeignKey, final Headers headers) {
+ return !Arrays.equals(serialize(newForeignKey, headers),
serialize(oldForeignKey, headers));
}
- private byte[] serialize(final KRight key) {
- return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+ private byte[] serialize(final KRight key, final Headers headers) {
+ return foreignKeySerializer.serialize(foreignKeySerdeTopic,
headers, key);
}
private void forward(final Record<KLeft, Change<VLeft>> record, final
KRight foreignKey, final Instruction deleteKeyNoPropagate) {
@@ -249,7 +250,7 @@ public class SubscriptionSendProcessorSupplier<KLeft,
VLeft, KRight>
if (recordHash == null) {
recordHash = record.value().newValue == null
? null
- :
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic,
record.value().newValue));
+ :
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.headers(),
record.value().newValue));
}
return recordHash;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 30146fbb7be..4f9ca84f4a2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -371,8 +371,8 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
final Record<?, ?> deserializedRecord;
try {
deserializedRecord = new Record<>(
-
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
-
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
+
reprocessFactory.keyDeserializer().deserialize(record.topic(),
record.headers(), record.key()),
+
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.headers(), record.value()),
record.timestamp(),
record.headers());
} catch (final Exception deserializationException) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
index a73240f1057..04e6911d1f7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
@@ -436,12 +436,13 @@ public final class
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
@Override
public Maybe<ValueTimestampHeaders<V>> priorValueForBuffered(final K key) {
- final Bytes serializedKey =
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+ final Bytes serializedKey =
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, context.headers(),
key));
if (index.containsKey(serializedKey)) {
final byte[] serializedValue =
internalPriorValueForBuffered(serializedKey);
final V deserializedValue =
valueSerde.innerSerde().deserializer().deserialize(
changelogTopic,
+ context.headers(),
serializedValue
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 9d05997f205..956d72f5ffa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -231,6 +231,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
final BufferValue bufferValue =
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
final K key = keySerde.deserializer().deserialize(topic,
+ iternalContext.headers(),
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
if (bufferValue.context().timestamp() < minTimestamp &&
minValid) {
@@ -242,7 +243,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
minTimestamp = bufferValue.context().timestamp();
minValid = true;
- final V value =
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+ final V value =
valueSerde.deserializer().deserialize(topic, iternalContext.headers(),
bufferValue.newValue());
callback.accept(new Eviction<>(key, value,
bufferValue.context()));
@@ -280,10 +281,10 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
}
maybeUpdateSeqnumForDups();
final Bytes serializedKey = Bytes.wrap(
-
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(keySerde.serializer().serialize(topic,
record.key()),
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(keySerde.serializer().serialize(topic,
record.headers(), record.key()),
record.timestamp(),
seqnum).get());
- final byte[] valueBytes = valueSerde.serializer().serialize(topic,
record.value());
+ final byte[] valueBytes = valueSerde.serializer().serialize(topic,
record.headers(), record.value());
final BufferValue buffered = new BufferValue(null, null, valueBytes,
recordContext);
store.put(serializedKey, buffered.serialize(0).array());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index 3142f65febf..22bb83e5d68 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -16,8 +16,16 @@
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.junit.jupiter.api.Test;
@@ -25,19 +33,26 @@ import java.nio.ByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class CombinedKeySchemaTest {
+ private static final String FK_TOPIC = "fkTopic";
+ private static final String PK_TOPIC = "pkTopic";
+ private static final Headers HEADERS = new RecordHeaders().add("key",
"value".getBytes());
@Test
public void nonNullPrimaryKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
- () -> "fkTopic", Serdes.String(),
- () -> "pkTopic", Serdes.Integer()
+ () -> FK_TOPIC, Serdes.String(),
+ () -> PK_TOPIC, Serdes.Integer()
);
final Integer primary = -999;
- final Bytes result = cks.toBytes("foreignKey", primary);
+ final Bytes result = cks.toBytes("foreignKey", primary, HEADERS);
- final CombinedKey<String, Integer> deserializedKey =
cks.fromBytes(result);
+ final CombinedKey<String, Integer> deserializedKey =
cks.fromBytes(result, new RecordHeaders());
assertEquals("foreignKey", deserializedKey.foreignKey());
assertEquals(primary, deserializedKey.primaryKey());
}
@@ -45,31 +60,31 @@ public class CombinedKeySchemaTest {
@Test
public void nullPrimaryKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
- () -> "fkTopic", Serdes.String(),
- () -> "pkTopic", Serdes.Integer()
+ () -> FK_TOPIC, Serdes.String(),
+ () -> PK_TOPIC, Serdes.Integer()
);
- assertThrows(NullPointerException.class, () ->
cks.toBytes("foreignKey", null));
+ assertThrows(NullPointerException.class, () ->
cks.toBytes("foreignKey", null, HEADERS));
}
@Test
public void nullForeignKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
- () -> "fkTopic", Serdes.String(),
- () -> "pkTopic", Serdes.Integer()
+ () -> FK_TOPIC, Serdes.String(),
+ () -> PK_TOPIC, Serdes.Integer()
);
- assertThrows(NullPointerException.class, () -> cks.toBytes(null, 10));
+ assertThrows(NullPointerException.class, () -> cks.toBytes(null, 10,
HEADERS));
}
@Test
public void prefixKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
- () -> "fkTopic", Serdes.String(),
- () -> "pkTopic", Serdes.Integer()
+ () -> FK_TOPIC, Serdes.String(),
+ () -> PK_TOPIC, Serdes.Integer()
);
final String foreignKey = "someForeignKey";
final byte[] foreignKeySerializedData =
- Serdes.String().serializer().serialize("fkTopic", foreignKey);
- final Bytes prefix = cks.prefixBytes(foreignKey);
+ Serdes.String().serializer().serialize(FK_TOPIC, foreignKey);
+ final Bytes prefix = cks.prefixBytes(foreignKey, HEADERS);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES +
foreignKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length);
@@ -82,10 +97,91 @@ public class CombinedKeySchemaTest {
@Test
public void nullPrefixKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
- () -> "fkTopic", Serdes.String(),
- () -> "pkTopic", Serdes.Integer()
+ () -> FK_TOPIC, Serdes.String(),
+ () -> PK_TOPIC, Serdes.Integer()
);
final String foreignKey = null;
- assertThrows(NullPointerException.class, () ->
cks.prefixBytes(foreignKey));
+ assertThrows(NullPointerException.class, () ->
cks.prefixBytes(foreignKey, HEADERS));
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializerOnPrefixBytes() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.serializer()).thenReturn(mockSerializer);
+
when(mockSerde.deserializer()).thenReturn(Serdes.String().deserializer());
+
+ final String foreignKey = "foreignKey";
+ when(mockSerializer.serialize(FK_TOPIC, HEADERS,
foreignKey)).thenReturn(foreignKey.getBytes());
+
+ final CombinedKeySchema<String, String> cks = new CombinedKeySchema<>(
+ () -> FK_TOPIC, mockSerde,
+ () -> PK_TOPIC, mockSerde
+ );
+ cks.init(mock(ProcessorContext.class));
+ cks.prefixBytes(foreignKey, HEADERS);
+
+ verify(mockSerializer).serialize(FK_TOPIC, HEADERS, foreignKey);
+ verify(mockSerializer, never()).serialize(FK_TOPIC, foreignKey);
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializersOnToBytes() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.serializer()).thenReturn(mockSerializer);
+
when(mockSerde.deserializer()).thenReturn(Serdes.String().deserializer());
+
+
+ final String foreignKey = "foreignKey";
+ final String primaryKey = "primaryKey";
+ when(mockSerializer.serialize(FK_TOPIC, HEADERS,
foreignKey)).thenReturn(foreignKey.getBytes());
+ when(mockSerializer.serialize(PK_TOPIC, HEADERS,
primaryKey)).thenReturn(primaryKey.getBytes());
+
+ final CombinedKeySchema<String, String> cks = new CombinedKeySchema<>(
+ () -> FK_TOPIC, mockSerde,
+ () -> PK_TOPIC, mockSerde
+ );
+ cks.init(mock(ProcessorContext.class));
+ cks.toBytes(foreignKey, primaryKey, HEADERS);
+
+ verify(mockSerializer).serialize(FK_TOPIC, HEADERS, foreignKey);
+ verify(mockSerializer).serialize(PK_TOPIC, HEADERS, primaryKey);
+ verify(mockSerializer, never()).serialize(FK_TOPIC, foreignKey);
+ verify(mockSerializer, never()).serialize(PK_TOPIC, primaryKey);
+ }
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializersOnFromBytes() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+ when(mockSerde.serializer()).thenReturn(Serdes.String().serializer());
+ when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+
+ final String foreignKey = "foreignKey";
+ final String primaryKey = "primaryKey";
+ when(mockDeserializer.deserialize(FK_TOPIC, HEADERS,
foreignKey.getBytes())).thenReturn(foreignKey);
+ when(mockDeserializer.deserialize(PK_TOPIC, HEADERS,
primaryKey.getBytes())).thenReturn(primaryKey);
+
+ final CombinedKeySchema<String, String> serializerCks = new
CombinedKeySchema<>(
+ () -> FK_TOPIC, Serdes.String(),
+ () -> PK_TOPIC, Serdes.String()
+ );
+ final Bytes serialized = serializerCks.toBytes(foreignKey, primaryKey,
HEADERS);
+
+ final CombinedKeySchema<String, String> cks = new CombinedKeySchema<>(
+ () -> FK_TOPIC, mockSerde,
+ () -> PK_TOPIC, mockSerde
+ );
+ cks.init(mock(ProcessorContext.class));
+ cks.fromBytes(serialized, HEADERS);
+
+ final byte[] foreignKeyRaw =
Serdes.String().serializer().serialize(null, HEADERS, foreignKey);
+ final byte[] primaryKeyRaw =
Serdes.String().serializer().serialize(null, HEADERS, primaryKey);
+
+ verify(mockDeserializer).deserialize(FK_TOPIC, HEADERS, foreignKeyRaw);
+ verify(mockDeserializer, never()).deserialize(FK_TOPIC, foreignKeyRaw);
+ verify(mockDeserializer).deserialize(PK_TOPIC, HEADERS, primaryKeyRaw);
+ verify(mockDeserializer, never()).deserialize(PK_TOPIC, primaryKeyRaw);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index 2d53b83a94f..f06b59a544c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -203,7 +203,7 @@ public class ForeignTableJoinProcessorSupplierTests {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(fk, pk);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(fk, pk, new
RecordHeaders());
stateStore.put(key, oldValue);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index ddfcadfc261..f3dd326adcd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -60,6 +61,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
private static final String FK = "fk1";
private static final String PK1 = "pk1";
private static final String PK2 = "pk2";
+ private static final Headers HEADERS = new RecordHeaders();
private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () ->
"pk-topic";
private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA
= new CombinedKeySchema<>(
@@ -110,7 +112,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -161,7 +163,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -213,7 +215,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -265,7 +267,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -317,7 +319,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -369,7 +371,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -421,7 +423,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);
@@ -473,7 +475,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
);
final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
- final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+ final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
stateStore.put(key, oldValue);
processor.init(context);