[
https://issues.apache.org/jira/browse/KAFKA-15595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax reassigned KAFKA-15595:
---------------------------------------
Assignee: (was: Hao Li)
> Session window aggregate drops records headers
> ----------------------------------------------
>
> Key: KAFKA-15595
> URL: https://issues.apache.org/jira/browse/KAFKA-15595
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.5.1
> Reporter: Abdullah alkhawatrah
> Priority: Major
>
> Hey,
> While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow
> aggregate behaviour, it seems now that custom headers added before the
> aggregate are dropped.
> I could reproduce the behaviour with the following test topology:
> {code:java}
> // code placeholder
> final StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(EARLIEST))
> .process(() -> new Processor<Object, Object, Object, Object>() {
> private ProcessorContext<Object, Object> context;
> @Override
> public void init(final ProcessorContext<Object, Object> context) {
> this.context = context;
> }
> @Override
> public void process(Record<Object, Object> record) {
> record.headers().add("key1",
> record.value().toString().getBytes());
> context.forward(record);
> }
> })
> .groupByKey()
>
> .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L),
> Duration.ofDays(1L)))
> .aggregate(() -> 1,
> (key, value, aggregate) -> aggregate,
> (aggKey, aggOne, aggTwo) -> aggTwo)
> .toStream()
> .map((key, value) -> new KeyValue<>(key.key(), value))
> .to(outputTopic); {code}
> Checking evens in the `outputTopic` show that the headers are empty. With
> 3.2.* the same topology would have propagated the headers.
>
> I can see here:
> [https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205]
> that now a new record is created ignoring the headers, while in 3.2.2, the
> same record was forwarded after changing the key and value while keeping the
> headers:
> [https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)