[ 
https://issues.apache.org/jira/browse/KAFKA-20173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uladzislau Blok updated KAFKA-20173:
------------------------------------
    Description: 
List of all checked calls for deserialization (search by pattern {{ 
deserialize( }} ):
 * {{SessionWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}} - there is no headers, nothing to propagate;
 * {{TimeWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}} - there is no headers, nothing to propagate;
 * {{ChangedDeserializer#deserialize(final String topic, final Headers headers, 
final byte[] data)}}  -  make use of headers.
 * {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]> 
serialChange)}} - no headers, nothing to propagate
 * {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to 
propagate
 * {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final 
byte[] data)}} - no headers, nothing to propagate
 * {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[] 
data)}} - no headers, nothing to propagate
 * {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final 
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final 
Position position)}} - make use of headers.
 * {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate 
headers to deserializer. {*}Is it case to be fixed{*}? 
{{reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key())}}
 * {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]> 
record)}} - It uses different type of deserializer 
({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
 * {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no headers, 
nothing to propagate
 * {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - different 
type of deserialiser. Not related
 * {{RecordDeserializer#deserialize(final ProcessorContext<?, ?> 
processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use 
of headers. 
 * {{RecordQueue#updateHead}} - uses different type of deserializer
 * {{SourceNode#deserializeKey}} - make use of headers
 * {{SourceNode#deserializeValue}} - make use of headers
 * {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses 
different type of deserializer
 * {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make 
use of headers
 * {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers headers){}}}- 
make use of headers
 * {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type of 
deserializer
 * {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type 
of deserializer
 * HeadersDeserializer... header deserializer itself
 * {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final 
Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>> 
callback){}}}- doesn't use headers, while they can be accessed from context. 
*Is it a case?*
 * 
{{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
 K key){}}}- doesn't use headers, while they can be accessed from context. *Is 
it a case?*
 *  

  was:
List of all checked calls for deserialization (search by pattern {{ 
deserialize( }} ):
 * {{SessionWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}} - there is no headers, nothing to propagate;
 * {{TimeWindowedDeserializer#deserialize(final String topic, final byte[] 
data)}} - there is no headers, nothing to propagate;
 * {{ChangedDeserializer#deserialize(final String topic, final Headers headers, 
final byte[] data) }}-  make use of headers.
 * {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]> 
serialChange)}} - no headers, nothing to propagate
 * {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to 
propagate
 * {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final 
byte[] data)}} - no headers, nothing to propagate
 * {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[] 
data)}} - no headers, nothing to propagate
 * {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final 
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final 
Position position)}} - make use of headers.
 * {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate 
headers to deserializer. {*}Is it case to be fixed{*}? 
{{reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key())}}
 * {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]> 
record)}} - It uses different type of deserializer 
({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
 * {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no headers, 
nothing to propagate
 * {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - different 
type of deserialiser. Not related
 * {{RecordDeserializer#deserialize(final ProcessorContext<?, ?> 
processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use 
of headers. 
 * {{RecordQueue#updateHead}} - uses different type of deserializer
 * {{SourceNode#deserializeKey}} - make use of headers
 * {{SourceNode#deserializeValue}} - make use of headers
 * {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses 
different type of deserializer
 * {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make 
use of headers
 * {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers headers){}}}- 
make use of headers
 * {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type of 
deserializer
 * {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type 
of deserializer
 * HeadersDeserializer... header deserializer itself
 * {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final 
Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>> 
callback){}}}- doesn't use headers, while they can be accessed from context. 
*Is it a case?*
 * 
{{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
 K key){}}}- doesn't use headers, while they can be accessed from context. *Is 
it a case?*
 *  


> Revisit KS serde code to check if headers are passed correctly
> --------------------------------------------------------------
>
>                 Key: KAFKA-20173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20173
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: TengYao Chi
>            Assignee: Uladzislau Blok
>            Priority: Major
>             Fix For: 4.3.0
>
>
> List of all checked calls for deserialization (search by pattern {{ 
> deserialize( }} ):
>  * {{SessionWindowedDeserializer#deserialize(final String topic, final byte[] 
> data)}} - there is no headers, nothing to propagate;
>  * {{TimeWindowedDeserializer#deserialize(final String topic, final byte[] 
> data)}} - there is no headers, nothing to propagate;
>  * {{ChangedDeserializer#deserialize(final String topic, final Headers 
> headers, final byte[] data)}}  -  make use of headers.
>  * {{FullChangeSerde#deserializeParts(final String topic, final 
> Change<byte[]> serialChange)}} - no headers, nothing to propagate
>  * {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to 
> propagate
>  * {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final 
> byte[] data)}} - no headers, nothing to propagate
>  * {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[] 
> data)}} - no headers, nothing to propagate
>  * {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final 
> ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, 
> final Position position)}} - make use of headers.
>  * {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate 
> headers to deserializer. {*}Is it case to be fixed{*}? 
> {{reprocessFactory.keyDeserializer().deserialize(record.topic(), 
> record.key())}}
>  * {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]> 
> record)}} - It uses different type of deserializer 
> ({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
>  * {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no 
> headers, nothing to propagate
>  * {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - 
> different type of deserialiser. Not related
>  * {{RecordDeserializer#deserialize(final ProcessorContext<?, ?> 
> processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use 
> of headers. 
>  * {{RecordQueue#updateHead}} - uses different type of deserializer
>  * {{SourceNode#deserializeKey}} - make use of headers
>  * {{SourceNode#deserializeValue}} - make use of headers
>  * {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses 
> different type of deserializer
>  * {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make 
> use of headers
>  * {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers 
> headers){}}}- make use of headers
>  * {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type 
> of deserializer
>  * {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type 
> of deserializer
>  * HeadersDeserializer... header deserializer itself
>  * {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final 
> Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>> 
> callback){}}}- doesn't use headers, while they can be accessed from context. 
> *Is it a case?*
>  * 
> {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
>  K key){}}}- doesn't use headers, while they can be accessed from context. 
> *Is it a case?*
>  *  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to