[
https://issues.apache.org/jira/browse/KAFKA-20173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Uladzislau Blok updated KAFKA-20173:
------------------------------------
Description:
List of all checked calls for deserialization (search by pattern {{
deserialize( }} ):
* {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {\{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
data) }}- there is no headers, nothing to propagate;
* {{{}ChangedDeserializer#deserialize(final String topic, final Headers
headers, final byte[] data) }}{}}}- make use of headers. Not related to
current KIP;
* {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]>
serialChange)}} - no headers, nothing to propagate
* {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
propagate
* {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
byte[] data)}} - no headers, nothing to propagate
* {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
data)}} - no headers, nothing to propagate
* {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final
Position position)}} - make use of headers. Not related to current KIP
* {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagation
headers to deserializer. Is it case to be fixed?
was:
List of all checked calls for deserialization (search by pattern {{
deserialize( }} ):
* {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
data) }}- there is no headers, nothing to propagate;
* {{ChangedDeserializer#deserialize(final String topic, final Headers headers,
final byte[] data) }}- make use of headers: {{{}inner.deserialize(topic,
headers, newData), inner.deserialize(topic, headers, oldData){}}}. Interesting
case, so headers were used somewhere before;
* {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]>
serialChange)}} - no headers, nothing to propagate
* {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
propagate
* {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
byte[] data)}} - no headers, nothing to propagate
* SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
data) - no headers, nothing to propagate
> Revisit KS serde code to check if headers are passed correctly
> --------------------------------------------------------------
>
> Key: KAFKA-20173
> URL: https://issues.apache.org/jira/browse/KAFKA-20173
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: TengYao Chi
> Assignee: Uladzislau Blok
> Priority: Major
> Fix For: 4.3.0
>
>
> List of all checked calls for deserialization (search by pattern {{
> deserialize( }} ):
> * {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
> data)}} - there is no headers, nothing to propagate;
> * {\{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
> data) }}- there is no headers, nothing to propagate;
> * {{{}ChangedDeserializer#deserialize(final String topic, final Headers
> headers, final byte[] data) }}{}}}- make use of headers. Not related to
> current KIP;
> * {{FullChangeSerde#deserializeParts(final String topic, final
> Change<byte[]> serialChange)}} - no headers, nothing to propagate
> * {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
> propagate
> * {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
> byte[] data)}} - no headers, nothing to propagate
> * {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
> data)}} - no headers, nothing to propagate
> * {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final
> ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled,
> final Position position)}} - make use of headers. Not related to current KIP
> * {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not
> propagation headers to deserializer. Is it case to be fixed?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)