This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 117a7d1c66c MINOR: Fix headers in KTableMapValues and 
KTableTransformValues (#21835)
117a7d1c66c is described below

commit 117a7d1c66c8b002e4cb25f1733cdf8466a68400
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Apr 3 08:17:22 2026 +0200

    MINOR: Fix headers in KTableMapValues and KTableTransformValues (#21835)
    
    This PR replcases null/empty headers with `context.headers()` where
    `ValueTimestampheaders` is `null`.
    
    Reviewers: Murali Basani <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../streams/kstream/internals/KTableMapValues.java | 12 ++++---
 .../kstream/internals/KTableTransformValues.java   |  2 +-
 .../internals/KTableTransformValuesTest.java       | 37 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index e71dd84cf7c..9c787d535a4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -111,10 +112,11 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
         return newValue;
     }
 
-    private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn 
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders) {
+    private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn 
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders, final Headers  
contextHeaders) {
+
         VOut newValue = null;
         long timestamp = 0;
-        Headers headers = null;
+        Headers headers = contextHeaders;
 
         if (valueTimestampHeaders != null) {
             newValue = mapper.apply(key, valueTimestampHeaders.value());
@@ -174,6 +176,7 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
 
     private class KTableMapValuesValueGetter implements KTableValueGetter<KIn, 
VOut> {
         private final KTableValueGetter<KIn, VIn> parentGetter;
+        private InternalProcessorContext<?, ?> context;
 
         KTableMapValuesValueGetter(final KTableValueGetter<KIn, VIn> 
parentGetter) {
             this.parentGetter = parentGetter;
@@ -182,16 +185,17 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
         @Override
         public void init(final ProcessorContext<?, ?> context) {
             parentGetter.init(context);
+            this.context = (InternalProcessorContext<?, ?>) context;
         }
 
         @Override
         public ValueTimestampHeaders<VOut> get(final KIn key) {
-            return computeValueAndTimestamp(key, parentGetter.get(key));
+            return computeValueAndTimestamp(key, parentGetter.get(key), 
context.headers());
         }
 
         @Override
         public ValueTimestampHeaders<VOut> get(final KIn key, final long 
asOfTimestamp) {
-            return computeValueAndTimestamp(key, parentGetter.get(key, 
asOfTimestamp));
+            return computeValueAndTimestamp(key, parentGetter.get(key, 
asOfTimestamp), context.headers());
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 75b8289132d..2948af604d1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -195,7 +195,7 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
             final ValueTimestampHeaders<VOut> result = 
ValueTimestampHeaders.make(
                 valueTransformer.transform(key, 
getValueOrNull(valueTimestampHeaders)),
                 valueTimestampHeaders == null ? UNKNOWN : 
valueTimestampHeaders.timestamp(),
-                valueTimestampHeaders == null ? null : 
valueTimestampHeaders.headers()
+                valueTimestampHeaders == null ? currentContext.headers() : 
valueTimestampHeaders.headers()
                 );
 
             internalProcessorContext.setRecordContext(currentContext);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index cd8767a2064..625556d7454 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -236,6 +236,43 @@ public class KTableTransformValuesTest {
         assertThat(result, is("Key->Value!"));
     }
 
+    @Test
+    public void shouldUseContextHeadersWhenValueTimestampHeadersIsNull() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), null);
+
+        when(parent.valueGetterSupplier()).thenReturn(parentGetterSupplier);
+        when(parentGetterSupplier.get()).thenReturn(parentGetter);
+        when(parentGetter.get("Key")).thenReturn(null);
+
+        final RecordHeaders contextHeaders = new RecordHeaders();
+        contextHeaders.add("test-header", "test-value".getBytes());
+        final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+            42L,
+            23L,
+            -1,
+            "foo",
+            contextHeaders
+        );
+        when(context.recordContext()).thenReturn(recordContext);
+        doNothing().when(context).setRecordContext(new ProcessorRecordContext(
+            -1L,
+            -1L,
+            -1,
+            null,
+            new RecordHeaders()
+        ));
+        doNothing().when(context).setRecordContext(recordContext);
+
+        final KTableValueGetter<String, String> getter = 
transformValues.view().get();
+        getter.init(context);
+
+        final ValueTimestampHeaders<String> result = getter.get("Key");
+
+        assertThat(result.value(), is("Key->null!"));
+        assertThat(result.headers(), is(contextHeaders));
+    }
+
     @Test
     public void shouldGetFromStateStoreIfMaterialized() {
         final KTableTransformValues<String, String, String> transformValues =

Reply via email to