This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e989ccca21 MINOR: Fix headers in KTableMapValues and
KTableTransformValues (#21835)
6e989ccca21 is described below
commit 6e989ccca21375ae6ee4ac11fb64d9379b089155
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Apr 3 08:17:22 2026 +0200
MINOR: Fix headers in KTableMapValues and KTableTransformValues (#21835)
This PR replcases null/empty headers with `context.headers()` where
`ValueTimestampheaders` is `null`.
Reviewers: Murali Basani <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/kstream/internals/KTableMapValues.java | 12 ++++---
.../kstream/internals/KTableTransformValues.java | 2 +-
.../internals/KTableTransformValuesTest.java | 37 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index e71dd84cf7c..9c787d535a4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -111,10 +112,11 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
return newValue;
}
- private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders) {
+ private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders, final Headers
contextHeaders) {
+
VOut newValue = null;
long timestamp = 0;
- Headers headers = null;
+ Headers headers = contextHeaders;
if (valueTimestampHeaders != null) {
newValue = mapper.apply(key, valueTimestampHeaders.value());
@@ -174,6 +176,7 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
private class KTableMapValuesValueGetter implements KTableValueGetter<KIn,
VOut> {
private final KTableValueGetter<KIn, VIn> parentGetter;
+ private InternalProcessorContext<?, ?> context;
KTableMapValuesValueGetter(final KTableValueGetter<KIn, VIn>
parentGetter) {
this.parentGetter = parentGetter;
@@ -182,16 +185,17 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
@Override
public void init(final ProcessorContext<?, ?> context) {
parentGetter.init(context);
+ this.context = (InternalProcessorContext<?, ?>) context;
}
@Override
public ValueTimestampHeaders<VOut> get(final KIn key) {
- return computeValueAndTimestamp(key, parentGetter.get(key));
+ return computeValueAndTimestamp(key, parentGetter.get(key),
context.headers());
}
@Override
public ValueTimestampHeaders<VOut> get(final KIn key, final long
asOfTimestamp) {
- return computeValueAndTimestamp(key, parentGetter.get(key,
asOfTimestamp));
+ return computeValueAndTimestamp(key, parentGetter.get(key,
asOfTimestamp), context.headers());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 75b8289132d..2948af604d1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -195,7 +195,7 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
final ValueTimestampHeaders<VOut> result =
ValueTimestampHeaders.make(
valueTransformer.transform(key,
getValueOrNull(valueTimestampHeaders)),
valueTimestampHeaders == null ? UNKNOWN :
valueTimestampHeaders.timestamp(),
- valueTimestampHeaders == null ? null :
valueTimestampHeaders.headers()
+ valueTimestampHeaders == null ? currentContext.headers() :
valueTimestampHeaders.headers()
);
internalProcessorContext.setRecordContext(currentContext);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index cd8767a2064..625556d7454 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -236,6 +236,43 @@ public class KTableTransformValuesTest {
assertThat(result, is("Key->Value!"));
}
+ @Test
+ public void shouldUseContextHeadersWhenValueTimestampHeadersIsNull() {
+ final KTableTransformValues<String, String, String> transformValues =
+ new KTableTransformValues<>(parent, new
ExclamationValueTransformerSupplier(), null);
+
+ when(parent.valueGetterSupplier()).thenReturn(parentGetterSupplier);
+ when(parentGetterSupplier.get()).thenReturn(parentGetter);
+ when(parentGetter.get("Key")).thenReturn(null);
+
+ final RecordHeaders contextHeaders = new RecordHeaders();
+ contextHeaders.add("test-header", "test-value".getBytes());
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ 42L,
+ 23L,
+ -1,
+ "foo",
+ contextHeaders
+ );
+ when(context.recordContext()).thenReturn(recordContext);
+ doNothing().when(context).setRecordContext(new ProcessorRecordContext(
+ -1L,
+ -1L,
+ -1,
+ null,
+ new RecordHeaders()
+ ));
+ doNothing().when(context).setRecordContext(recordContext);
+
+ final KTableValueGetter<String, String> getter =
transformValues.view().get();
+ getter.init(context);
+
+ final ValueTimestampHeaders<String> result = getter.get("Key");
+
+ assertThat(result.value(), is("Key->null!"));
+ assertThat(result.headers(), is(contextHeaders));
+ }
+
@Test
public void shouldGetFromStateStoreIfMaterialized() {
final KTableTransformValues<String, String, String> transformValues =