Alieh Saeedi created KAFKA-20413:
------------------------------------

             Summary: ListValueStore and outer join stores don't propagate 
headers to serdes
                 Key: KAFKA-20413
                 URL: https://issues.apache.org/jira/browse/KAFKA-20413
             Project: Kafka
          Issue Type: Sub-task
          Components: streams
            Reporter: Alieh Saeedi


Description                                                                     
                                                                                
                               
   
While reviewing PR #21969 (KAFKA-20173), we found that ListValueStore and its 
changelog wrapper don't propagate headers to serdes. This affects stream-stream 
left/outer joins which use ListValueStore to buffer non-joined records.
                                                                                
                                                                                
                                 
  Gaps                                                                          
                                                                                
                            
  1. ListValueStore.java: Passes null topic and no headers to LIST_SERDE 
serialize/deserialize calls                                                     
                     
  2. ChangeLoggingListValueBytesStore.java: Uses new RecordHeaders() instead of 
internalContext.recordContext().headers() when logging to changelog
  3. No headers-aware ListValueStoreBuilder variant exists (unlike other stores 
like TimestampedKeyValueStoreBuilderWithHeaders)                                
                                 
                  
  Impact                                                                        
                                                                                
                                              
  - Custom headers lost during stream-stream left/outer join processing         
                          
  - Changelog records don't replicate headers
  - Store recovery loses header information                                     
                                               
  - Serdes depending on headers receive empty headers
                                                                                
                                                                                
                                 
  Fix

  Follow the pattern from PR #21969 (which fixed 
RocksDBTimeOrderedKeyValueBuffer and InMemoryTimeOrderedKeyValueChangeBuffer):  
                                                                
  1. Pass context.headers() to LIST_SERDE serialize/deserialize calls
  2. Use actual headers from recordContext() in changelog logging               
                             
  3. Add comprehensive tests for outer joins with custom headers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to