[
https://issues.apache.org/jira/browse/KAFKA-20173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Uladzislau Blok updated KAFKA-20173:
------------------------------------
Description:
List of all checked calls for deserialization (search by pattern {{
deserialize( }} ):
* {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{ChangedDeserializer#deserialize(final String topic, final Headers headers,
final byte[] data) }}- make use of headers.
* {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]>
serialChange)}} - no headers, nothing to propagate
* {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
propagate
* {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
byte[] data)}} - no headers, nothing to propagate
* {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
data)}} - no headers, nothing to propagate
* {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final
Position position)}} - make use of headers.
* {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate
headers to deserializer. {*}Is it case to be fixed{*}?
{{reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key())}}
* {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]>
record)}} - It uses different type of deserializer
({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
* {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no headers,
nothing to propagate
* {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - different
type of deserialiser. Not related
* {{RecordDeserializer#deserialize(final ProcessorContext<?, ?>
processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use
of headers.
* {{RecordQueue#updateHead}} - uses different type of deserializer
* {{SourceNode#deserializeKey}} - make use of headers
* {{SourceNode#deserializeValue}} - make use of headers
* {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses
different type of deserializer
* {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make
use of headers
* {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers headers){}}}-
make use of headers
* {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type of
deserializer
* {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type
of deserializer
* HeadersDeserializer... header deserializer itself
* {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final
Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>>
callback){}}}- doesn't use headers, while they can be accessed from context.
*Is it a case?*
*
{{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
K key){}}}- doesn't use headers, while they can be accessed from context. *Is
it a case?*
*
was:
List of all checked calls for deserialization (search by pattern {{
deserialize( }} ):
* {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
data)}} - there is no headers, nothing to propagate;
* {{ChangedDeserializer#deserialize(final String topic, final Headers headers,
final byte[] data) }} }}- make use of headers.
* {{FullChangeSerde#deserializeParts(final String topic, final Change<byte[]>
serialChange)}} - no headers, nothing to propagate
* {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
propagate
* {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
byte[] data)}} - no headers, nothing to propagate
* {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
data)}} - no headers, nothing to propagate
* {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final
ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled, final
Position position)}} - make use of headers.
* {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate
headers to deserializer. {*}Is it case to be fixed{*}?
{{reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key())}}
* {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]>
record)}} - It uses different type of deserializer
({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
* {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no headers,
nothing to propagate
* {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} - different
type of deserialiser. Not related
* {{RecordDeserializer#deserialize(final ProcessorContext<?, ?>
processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use
of headers.
* {{RecordQueue#updateHead}} - uses different type of deserializer
* {{SourceNode#deserializeKey}} - make use of headers
* {{SourceNode#deserializeValue}} - make use of headers
* {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses
different type of deserializer
* {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make
use of headers
* {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers headers){}}}-
make use of headers
* {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type of
deserializer
* {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type
of deserializer
* HeadersDeserializer... header deserializer itself
* {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final
Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>>
callback){}}}- doesn't use headers, while they can be accessed from context.
*Is it a case?*
*
{{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
K key){}}}- doesn't use headers, while they can be accessed from context. *Is
it a case?*
*
> Revisit KS serde code to check if headers are passed correctly
> --------------------------------------------------------------
>
> Key: KAFKA-20173
> URL: https://issues.apache.org/jira/browse/KAFKA-20173
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: TengYao Chi
> Assignee: Uladzislau Blok
> Priority: Major
> Fix For: 4.3.0
>
>
> List of all checked calls for deserialization (search by pattern {{
> deserialize( }} ):
> * {{SessionWindowedDeserializer#deserialize(final String topic, final byte[]
> data)}} - there is no headers, nothing to propagate;
> * {{TimeWindowedDeserializer#deserialize(final String topic, final byte[]
> data)}} - there is no headers, nothing to propagate;
> * {{ChangedDeserializer#deserialize(final String topic, final Headers
> headers, final byte[] data) }}- make use of headers.
> * {{FullChangeSerde#deserializeParts(final String topic, final
> Change<byte[]> serialChange)}} - no headers, nothing to propagate
> * {{CombinedKeySchema#fromBytes(final Bytes data)}} - no headers, nothing to
> propagate
> * {{SubscriptionResponseWrapperSerde#deserialize(final String topic, final
> byte[] data)}} - no headers, nothing to propagate
> * {{SubscriptionWrapperSerde#deserialize(final String ignored, final byte[]
> data)}} - no headers, nothing to propagate
> * {{ChangelogRecordDeserializationHelper#applyChecksAndUpdatePosition(final
> ConsumerRecord<byte[], byte[]> record, final boolean consistencyEnabled,
> final Position position)}} - make use of headers.
> * {{GlobalStateManagerImpl#reprocessState(... many params)}} - Not propagate
> headers to deserializer. {*}Is it case to be fixed{*}?
> {{reprocessFactory.keyDeserializer().deserialize(record.topic(),
> record.key())}}
> * {{GlobalStateUpdateTask#update(final ConsumerRecord<byte[], byte[]>
> record)}} - It uses different type of deserializer
> ({{{}RecordDeserializer{}}}). From my understanding we don't need headers here
> * {{ProcessorMetadata#deserialize(final byte[] metaDataBytes)}} - no
> headers, nothing to propagate
> * {{ProcessorRecordContext#deserialize(final ByteBuffer buffer)}} -
> different type of deserialiser. Not related
> * {{RecordDeserializer#deserialize(final ProcessorContext<?, ?>
> processorContext,final ConsumerRecord<byte[], byte[]> rawRecord)}} - make use
> of headers.
> * {{RecordQueue#updateHead}} - uses different type of deserializer
> * {{SourceNode#deserializeKey}} - make use of headers
> * {{SourceNode#deserializeValue}} - make use of headers
> * {{TopicPartitionMetadata#decode(final String encryptedString)}} - uses
> different type of deserializer
> * {{StateSerdes#keyFrom(final byte[] rawKey, final Headers headers)}} - make
> use of headers
> * {{{}StateSerdes#valueFrom(final byte[] rawValue, final Headers
> headers){}}}- make use of headers
> * {{{}BufferValue#deserialize(final ByteBuffer buffer{}}}) - different type
> of deserializer
> * {{ContextualRecord#deserialize(final ByteBuffer buffer)}} - different type
> of deserializer
> * HeadersDeserializer... header deserializer itself
> * {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhile(final
> Supplier<Boolean> predicate, final Consumer<Eviction<K, Change<V>>>
> callback){}}}- doesn't use headers, while they can be accessed from context.
> *Is it a case?*
> *
> {{{}InMemoryTimeOrderedKeyValueChangeBuffer#evictWhilepriorValueForBuffered(final
> K key){}}}- doesn't use headers, while they can be accessed from context.
> *Is it a case?*
> *
--
This message was sent by Atlassian Jira
(v8.20.10#820010)