John Roesler created KAFKA-14364:
------------------------------------
Summary: Support evolving serde with Foreign Key Join
Key: KAFKA-14364
URL: https://issues.apache.org/jira/browse/KAFKA-14364
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: John Roesler
The current implementation of Foreign-Key join uses a hash comparison to
determine whether it should emit join results or not. See
[https://github.com/apache/kafka/blob/807c5b4d282e7a7a16d0bb94aa2cda9566a7cc2d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L94-L110]
As specified in KIP-213
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]
), we must do a comparison of this nature in order to get correct results when
the foreign-key reference changes, as the old reference might emit delayed
results after the new instance generates its updated results, leading to an
incorrect final join state.
The hash comparison prevents this race condition by ensuring that any emitted
results correspond to the _current_ version of the left-hand-side record (and
therefore that the foreign-key reference itself has not changed).
An undesired side-effect of this is that if users update their serdes (in a
compatible way), for example to add a new optional field to the record, then
the resulting hash will change for existing records. This will cause Streams to
stop emitting results for those records until a new left-hand-side update comes
in, recording a new hash for those records.
It should be possible to provide a fix. Some ideas:
* only consider the foreign-key references itself in the hash function (this
was the original proposal, but we opted to hash the entire record as an
optimization to suppress unnecessary updates).
* provide a user-overridable hash function. This would be more flexible, but
also pushes a lot of complexity onto users, and opens up the possibility to
completely break semantics.
We will need to design the solution carefully so that we can preserve the
desired correctness guarantee.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)