Alieh Saeedi created KAFKA-20413:
------------------------------------
Summary: ListValueStore and outer join stores don't propagate
headers to serdes
Key: KAFKA-20413
URL: https://issues.apache.org/jira/browse/KAFKA-20413
Project: Kafka
Issue Type: Sub-task
Components: streams
Reporter: Alieh Saeedi
Description
While reviewing PR #21969 (KAFKA-20173), we found that ListValueStore and its
changelog wrapper don't propagate headers to serdes. This affects stream-stream
left/outer joins which use ListValueStore to buffer non-joined records.
Gaps
1. ListValueStore.java: Passes null topic and no headers to LIST_SERDE
serialize/deserialize calls
2. ChangeLoggingListValueBytesStore.java: Uses new RecordHeaders() instead of
internalContext.recordContext().headers() when logging to changelog
3. No headers-aware ListValueStoreBuilder variant exists (unlike other stores
like TimestampedKeyValueStoreBuilderWithHeaders)
Impact
- Custom headers lost during stream-stream left/outer join processing
- Changelog records don't replicate headers
- Store recovery loses header information
- Serdes depending on headers receive empty headers
Fix
Follow the pattern from PR #21969 (which fixed
RocksDBTimeOrderedKeyValueBuffer and InMemoryTimeOrderedKeyValueChangeBuffer):
1. Pass context.headers() to LIST_SERDE serialize/deserialize calls
2. Use actual headers from recordContext() in changelog logging
3. Add comprehensive tests for outer joins with custom headers
--
This message was sent by Atlassian Jira
(v8.20.10#820010)