This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 7f8708fe831 KAFKA-20173: Propagate headers into serde 9/N  (#21969)
7f8708fe831 is described below

commit 7f8708fe831f37bd80b229132ec676d269afc602
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Apr 5 04:45:37 2026 +0200

    KAFKA-20173: Propagate headers into serde 9/N  (#21969)
    
    Ensures that `CombinedKeySchema`,  FK-join processors, and global-store
    processor, forward headers to their underlying serdes.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../foreignkeyjoin/CombinedKeySchema.java          |  27 +++--
 .../ForeignTableJoinProcessorSupplier.java         |   4 +-
 .../ResponseJoinProcessorSupplier.java             |   2 +-
 .../SubscriptionReceiveProcessorSupplier.java      |   2 +-
 .../SubscriptionSendProcessorSupplier.java         |  15 +--
 .../internals/GlobalStateManagerImpl.java          |   4 +-
 .../InMemoryTimeOrderedKeyValueChangeBuffer.java   |   3 +-
 .../RocksDBTimeOrderedKeyValueBuffer.java          |   7 +-
 .../foreignkeyjoin/CombinedKeySchemaTest.java      | 130 ++++++++++++++++++---
 .../ForeignTableJoinProcessorSupplierTests.java    |   2 +-
 .../SubscriptionReceiveProcessorSupplierTest.java  |  18 +--
 11 files changed, 161 insertions(+), 53 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 176205c7248..4d40856a990 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -60,16 +61,22 @@ public class CombinedKeySchema<KRight, KLeft> {
         foreignKeyDeserializer = foreignKeyDeserializer == null ? 
(Deserializer<KRight>) context.keySerde().deserializer() : 
foreignKeyDeserializer;
     }
 
-    Bytes toBytes(final KRight foreignKey, final KLeft primaryKey) {
+    Bytes toBytes(final KRight foreignKey, final KLeft primaryKey, final 
Headers headers) {
         //The serialization format - note that primaryKeySerialized may be 
null, such as when a prefixScan
         //key is being created.
         //{Integer.BYTES 
foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
-        final byte[] foreignKeySerializedData = 
foreignKeySerializer.serialize(foreignKeySerdeTopic,
-                                                                               
foreignKey);
+        final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(
+                foreignKeySerdeTopic,
+                headers,
+                foreignKey
+        );
 
         //? bytes
-        final byte[] primaryKeySerializedData = 
primaryKeySerializer.serialize(primaryKeySerdeTopic,
-                                                                               
primaryKey);
+        final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
+                primaryKeySerdeTopic,
+                headers,
+                primaryKey
+        );
 
         final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + 
foreignKeySerializedData.length + primaryKeySerializedData.length);
         buf.putInt(foreignKeySerializedData.length);
@@ -79,26 +86,26 @@ public class CombinedKeySchema<KRight, KLeft> {
     }
 
 
-    public CombinedKey<KRight, KLeft> fromBytes(final Bytes data) {
+    public CombinedKey<KRight, KLeft> fromBytes(final Bytes data, final 
Headers headers) {
         //{Integer.BYTES 
foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
         final byte[] dataArray = data.get();
         final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray);
         final int foreignKeyLength = dataBuffer.getInt();
         final byte[] foreignKeyRaw = new byte[foreignKeyLength];
         dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
-        final KRight foreignKey = 
foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);
+        final KRight foreignKey = 
foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, headers, 
foreignKeyRaw);
 
         final byte[] primaryKeyRaw = new byte[dataArray.length - 
foreignKeyLength - Integer.BYTES];
         dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
-        final KLeft primaryKey = 
primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
+        final KLeft primaryKey = 
primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, headers, 
primaryKeyRaw);
         return new CombinedKey<>(foreignKey, primaryKey);
     }
 
-    Bytes prefixBytes(final KRight key) {
+    Bytes prefixBytes(final KRight key, final Headers headers) {
         //The serialization format. Note that primaryKeySerialized is not 
required/used in this function.
         //{Integer.BYTES 
foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
 
-        final byte[] foreignKeySerializedData = 
foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+        final byte[] foreignKeySerializedData = 
foreignKeySerializer.serialize(foreignKeySerdeTopic, headers, key);
 
         final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + 
foreignKeySerializedData.length);
         buf.putInt(foreignKeySerializedData.length);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index fd6038afda0..07e4e58273f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -120,7 +120,7 @@ public class ForeignTableJoinProcessorSupplier<KLeft, 
KRight, VRight>
                 return;
             }
 
-            final Bytes prefixBytes = keySchema.prefixBytes(record.key());
+            final Bytes prefixBytes = keySchema.prefixBytes(record.key(), 
record.headers());
 
             //Perform the prefixScan and propagate the results
             try (final KeyValueIterator<Bytes, 
ValueTimestampHeaders<SubscriptionWrapper<KLeft>>> prefixScanResults =
@@ -130,7 +130,7 @@ public class ForeignTableJoinProcessorSupplier<KLeft, 
KRight, VRight>
                     final KeyValue<Bytes, 
ValueTimestampHeaders<SubscriptionWrapper<KLeft>>> next = 
prefixScanResults.next();
                     // have to check the prefix because the range end is 
inclusive :(
                     if (prefixEquals(next.key.get(), prefixBytes.get())) {
-                        final CombinedKey<KRight, KLeft> combinedKey = 
keySchema.fromBytes(next.key);
+                        final CombinedKey<KRight, KLeft> combinedKey = 
keySchema.fromBytes(next.key, record.headers());
                         context().forward(
                             record.withKey(combinedKey.primaryKey())
                                 .withValue(new SubscriptionResponseWrapper<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 34ce8cc6873..78384c7be5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -110,7 +110,7 @@ public class ResponseJoinProcessorSupplier<KLeft, VLeft, 
VRight, VOut>
 
                 final long[] currentHash = currentValueWithTimestamp == null ?
                     null :
-                    
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, 
currentValueWithTimestamp.value()));
+                    
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, 
record.headers(), currentValueWithTimestamp.value()));
 
                 final long[] messageHash = record.value().originalValueHash();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index 5ec2bd1e560..d06c4ea55a7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -109,7 +109,7 @@ public class SubscriptionReceiveProcessorSupplier<KLeft, 
KRight>
             }
 
             private Change<ValueTimestampHeaders<SubscriptionWrapper<KLeft>>> 
inferBasedOnState(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
-                final Bytes subscriptionKey = keySchema.toBytes(record.key(), 
record.value().primaryKey());
+                final Bytes subscriptionKey = keySchema.toBytes(record.key(), 
record.value().primaryKey(), record.headers());
 
                 final ValueTimestampHeaders<SubscriptionWrapper<KLeft>> 
newValue = ValueTimestampHeaders.make(record.value(), record.timestamp(), 
record.headers());
                 final ValueTimestampHeaders<SubscriptionWrapper<KLeft>> 
oldValue = store.get(subscriptionKey);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 6e57ca4991f..6f61e55ee02 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -145,7 +146,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, 
VLeft, KRight>
                 //
                 // if FK did change, we need to explicitly delete the old 
subscription,
                 // because the new subscription goes to a different partition
-                if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+                if (foreignKeyChanged(newForeignKey, oldForeignKey, 
record.headers())) {
                     // this may lead to unnecessary tombstones if the old FK 
did not join;
                     // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
                     forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
@@ -189,7 +190,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, 
VLeft, KRight>
                     if (needToUnsubscribe) {
                         // update case
 
-                        if (foreignKeyChanged(newForeignKey, oldForeignKey)) {
+                        if (foreignKeyChanged(newForeignKey, oldForeignKey, 
record.headers())) {
                             // if FK did change, we need to explicitly delete 
the old subscription,
                             // because the new subscription goes to a 
different partition
                             //
@@ -227,12 +228,12 @@ public class SubscriptionSendProcessorSupplier<KLeft, 
VLeft, KRight>
             }
         }
 
-        private boolean foreignKeyChanged(final KRight newForeignKey, final 
KRight oldForeignKey) {
-            return !Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey));
+        private boolean foreignKeyChanged(final KRight newForeignKey, final 
KRight oldForeignKey, final Headers headers) {
+            return !Arrays.equals(serialize(newForeignKey, headers), 
serialize(oldForeignKey, headers));
         }
 
-        private byte[] serialize(final KRight key) {
-            return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+        private byte[] serialize(final KRight key, final Headers headers) {
+            return foreignKeySerializer.serialize(foreignKeySerdeTopic, 
headers, key);
         }
 
         private void forward(final Record<KLeft, Change<VLeft>> record, final 
KRight foreignKey, final Instruction deleteKeyNoPropagate) {
@@ -249,7 +250,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, 
VLeft, KRight>
             if (recordHash == null) {
                 recordHash = record.value().newValue == null
                     ? null
-                    : 
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
+                    : 
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.headers(), 
record.value().newValue));
             }
             return recordHash;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 30146fbb7be..4f9ca84f4a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -371,8 +371,8 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                         final Record<?, ?> deserializedRecord;
                         try {
                             deserializedRecord = new Record<>(
-                                
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
-                                
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
+                                
reprocessFactory.keyDeserializer().deserialize(record.topic(), 
record.headers(), record.key()),
+                                
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.headers(), record.value()),
                                 record.timestamp(),
                                 record.headers());
                         } catch (final Exception deserializationException) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
index a73240f1057..04e6911d1f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
@@ -436,12 +436,13 @@ public final class 
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
 
     @Override
     public Maybe<ValueTimestampHeaders<V>> priorValueForBuffered(final K key) {
-        final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+        final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, context.headers(), 
key));
         if (index.containsKey(serializedKey)) {
             final byte[] serializedValue = 
internalPriorValueForBuffered(serializedKey);
 
             final V deserializedValue = 
valueSerde.innerSerde().deserializer().deserialize(
                 changelogTopic,
+                context.headers(),
                 serializedValue
             );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 9d05997f205..956d72f5ffa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -231,6 +231,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
 
                     final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
                     final K key = keySerde.deserializer().deserialize(topic,
+                        iternalContext.headers(),
                         
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
 
                     if (bufferValue.context().timestamp() < minTimestamp && 
minValid) {
@@ -242,7 +243,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
                     minTimestamp = bufferValue.context().timestamp();
                     minValid = true;
 
-                    final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+                    final V value = 
valueSerde.deserializer().deserialize(topic, iternalContext.headers(), 
bufferValue.newValue());
 
                     callback.accept(new Eviction<>(key, value, 
bufferValue.context()));
 
@@ -280,10 +281,10 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrderedKeyVal
         }
         maybeUpdateSeqnumForDups();
         final Bytes serializedKey = Bytes.wrap(
-            
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(keySerde.serializer().serialize(topic,
 record.key()),
+            
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(keySerde.serializer().serialize(topic,
 record.headers(), record.key()),
                 record.timestamp(),
                 seqnum).get());
-        final byte[] valueBytes = valueSerde.serializer().serialize(topic, 
record.value());
+        final byte[] valueBytes = valueSerde.serializer().serialize(topic, 
record.headers(), record.value());
         final BufferValue buffered = new BufferValue(null, null, valueBytes, 
recordContext);
         store.put(serializedKey, buffered.serialize(0).array());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index 3142f65febf..22bb83e5d68 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -16,8 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 
 import org.junit.jupiter.api.Test;
 
@@ -25,19 +33,26 @@ import java.nio.ByteBuffer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class CombinedKeySchemaTest {
+    private static final String FK_TOPIC = "fkTopic";
+    private static final String PK_TOPIC = "pkTopic";
+    private static final Headers HEADERS = new RecordHeaders().add("key", 
"value".getBytes());
 
     @Test
     public void nonNullPrimaryKeySerdeTest() {
         final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
-            () -> "fkTopic", Serdes.String(),
-            () -> "pkTopic", Serdes.Integer()
+            () -> FK_TOPIC, Serdes.String(),
+            () -> PK_TOPIC, Serdes.Integer()
         );
         final Integer primary = -999;
-        final Bytes result = cks.toBytes("foreignKey", primary);
+        final Bytes result = cks.toBytes("foreignKey", primary, HEADERS);
 
-        final CombinedKey<String, Integer> deserializedKey = 
cks.fromBytes(result);
+        final CombinedKey<String, Integer> deserializedKey = 
cks.fromBytes(result, new RecordHeaders());
         assertEquals("foreignKey", deserializedKey.foreignKey());
         assertEquals(primary, deserializedKey.primaryKey());
     }
@@ -45,31 +60,31 @@ public class CombinedKeySchemaTest {
     @Test
     public void nullPrimaryKeySerdeTest() {
         final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
-            () -> "fkTopic", Serdes.String(),
-            () -> "pkTopic", Serdes.Integer()
+            () -> FK_TOPIC, Serdes.String(),
+            () -> PK_TOPIC, Serdes.Integer()
         );
-        assertThrows(NullPointerException.class, () -> 
cks.toBytes("foreignKey", null));
+        assertThrows(NullPointerException.class, () -> 
cks.toBytes("foreignKey", null, HEADERS));
     }
 
     @Test
     public void nullForeignKeySerdeTest() {
         final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
-            () -> "fkTopic", Serdes.String(),
-            () -> "pkTopic", Serdes.Integer()
+            () -> FK_TOPIC, Serdes.String(),
+            () -> PK_TOPIC, Serdes.Integer()
         );
-        assertThrows(NullPointerException.class, () -> cks.toBytes(null, 10));
+        assertThrows(NullPointerException.class, () -> cks.toBytes(null, 10, 
HEADERS));
     }
 
     @Test
     public void prefixKeySerdeTest() {
         final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
-            () -> "fkTopic", Serdes.String(),
-            () -> "pkTopic", Serdes.Integer()
+            () -> FK_TOPIC, Serdes.String(),
+            () -> PK_TOPIC, Serdes.Integer()
         );
         final String foreignKey = "someForeignKey";
         final byte[] foreignKeySerializedData =
-            Serdes.String().serializer().serialize("fkTopic", foreignKey);
-        final Bytes prefix = cks.prefixBytes(foreignKey);
+            Serdes.String().serializer().serialize(FK_TOPIC, foreignKey);
+        final Bytes prefix = cks.prefixBytes(foreignKey, HEADERS);
 
         final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + 
foreignKeySerializedData.length);
         buf.putInt(foreignKeySerializedData.length);
@@ -82,10 +97,91 @@ public class CombinedKeySchemaTest {
     @Test
     public void nullPrefixKeySerdeTest() {
         final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
-            () -> "fkTopic", Serdes.String(),
-            () -> "pkTopic", Serdes.Integer()
+            () -> FK_TOPIC, Serdes.String(),
+            () -> PK_TOPIC, Serdes.Integer()
         );
         final String foreignKey = null;
-        assertThrows(NullPointerException.class, () -> 
cks.prefixBytes(foreignKey));
+        assertThrows(NullPointerException.class, () -> 
cks.prefixBytes(foreignKey, HEADERS));
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializerOnPrefixBytes() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.serializer()).thenReturn(mockSerializer);
+        
when(mockSerde.deserializer()).thenReturn(Serdes.String().deserializer());
+
+        final String foreignKey = "foreignKey";
+        when(mockSerializer.serialize(FK_TOPIC, HEADERS, 
foreignKey)).thenReturn(foreignKey.getBytes());
+
+        final CombinedKeySchema<String, String> cks = new CombinedKeySchema<>(
+            () -> FK_TOPIC, mockSerde,
+            () -> PK_TOPIC, mockSerde
+        );
+        cks.init(mock(ProcessorContext.class));
+        cks.prefixBytes(foreignKey, HEADERS);
+
+        verify(mockSerializer).serialize(FK_TOPIC, HEADERS, foreignKey);
+        verify(mockSerializer, never()).serialize(FK_TOPIC, foreignKey);
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializersOnToBytes() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.serializer()).thenReturn(mockSerializer);
+        
when(mockSerde.deserializer()).thenReturn(Serdes.String().deserializer());
+
+
+        final String foreignKey = "foreignKey";
+        final String primaryKey = "primaryKey";
+        when(mockSerializer.serialize(FK_TOPIC, HEADERS, 
foreignKey)).thenReturn(foreignKey.getBytes());
+        when(mockSerializer.serialize(PK_TOPIC, HEADERS, 
primaryKey)).thenReturn(primaryKey.getBytes());
+
+        final CombinedKeySchema<String, String> cks = new CombinedKeySchema<>(
+            () -> FK_TOPIC, mockSerde,
+            () -> PK_TOPIC, mockSerde
+        );
+        cks.init(mock(ProcessorContext.class));
+        cks.toBytes(foreignKey, primaryKey, HEADERS);
+
+        verify(mockSerializer).serialize(FK_TOPIC, HEADERS, foreignKey);
+        verify(mockSerializer).serialize(PK_TOPIC, HEADERS, primaryKey);
+        verify(mockSerializer, never()).serialize(FK_TOPIC, foreignKey);
+        verify(mockSerializer, never()).serialize(PK_TOPIC, primaryKey);
+    }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializersOnFromBytes() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        final Serde<String> mockSerde = mock(Serdes.StringSerde.class);
+        when(mockSerde.serializer()).thenReturn(Serdes.String().serializer());
+        when(mockSerde.deserializer()).thenReturn(mockDeserializer);
+
+        final String foreignKey = "foreignKey";
+        final String primaryKey = "primaryKey";
+        when(mockDeserializer.deserialize(FK_TOPIC, HEADERS, 
foreignKey.getBytes())).thenReturn(foreignKey);
+        when(mockDeserializer.deserialize(PK_TOPIC, HEADERS, 
primaryKey.getBytes())).thenReturn(primaryKey);
+
+        final CombinedKeySchema<String, String> serializerCks = new 
CombinedKeySchema<>(
+            () -> FK_TOPIC, Serdes.String(),
+            () -> PK_TOPIC, Serdes.String()
+        );
+        final Bytes serialized = serializerCks.toBytes(foreignKey, primaryKey, 
HEADERS);
+
+        final CombinedKeySchema<String, String> cks = new CombinedKeySchema<>(
+            () -> FK_TOPIC, mockSerde,
+            () -> PK_TOPIC, mockSerde
+        );
+        cks.init(mock(ProcessorContext.class));
+        cks.fromBytes(serialized, HEADERS);
+
+        final byte[] foreignKeyRaw = 
Serdes.String().serializer().serialize(null, HEADERS, foreignKey);
+        final byte[] primaryKeyRaw = 
Serdes.String().serializer().serialize(null, HEADERS, primaryKey);
+
+        verify(mockDeserializer).deserialize(FK_TOPIC, HEADERS, foreignKeyRaw);
+        verify(mockDeserializer, never()).deserialize(FK_TOPIC, foreignKeyRaw);
+        verify(mockDeserializer).deserialize(PK_TOPIC, HEADERS, primaryKeyRaw);
+        verify(mockDeserializer, never()).deserialize(PK_TOPIC, primaryKeyRaw);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index 2d53b83a94f..f06b59a544c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -203,7 +203,7 @@ public class ForeignTableJoinProcessorSupplierTests {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(fk, pk);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(fk, pk, new 
RecordHeaders());
         stateStore.put(key, oldValue);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index ddfcadfc261..f3dd326adcd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -60,6 +61,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
     private static final String FK = "fk1";
     private static final String PK1 = "pk1";
     private static final String PK2 = "pk2";
+    private static final Headers HEADERS = new RecordHeaders();
 
     private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () -> 
"pk-topic";
     private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA 
= new CombinedKeySchema<>(
@@ -110,7 +112,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -161,7 +163,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -213,7 +215,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -265,7 +267,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -317,7 +319,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -369,7 +371,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -421,7 +423,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 
@@ -473,7 +475,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
         );
         final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
 
-        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
+        final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1, HEADERS);
         stateStore.put(key, oldValue);
         processor.init(context);
 

Reply via email to