[
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17986251#comment-17986251
]
Antoine Michaud commented on KAFKA-13386:
-----------------------------------------
As you proposed, it would be great to be able to change the hash generation
from the full object to only the foreign key.
[I proposed recently in the community
slack|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1750786531042709?thread_ts=1690375086.077949&cid=C48AHTCUQ]
a solution by mapping the LHS values to only extract the foreign-key, then do
the foreign-join to pull the foreign data, then join back to the LHS:
{code:java}
KTable<MainKey, MainData> mainTable = ...;
KTable<ForeignKey, ForeignData> foreignTable = ...;
// Before
mainTable.join(
foreignTable,
(mainData) -> mainData.foreignKey,
(mainData, foreignData) -> applyJoin(mainData, foreignData)
);
// After
mainTable
// Only select the foreign key, or the minimum possible data which must not
evolve
.mapValues((mainData) -> mainData.foreignKey)
// Pull the foreign data with a foreign-join
.join(
foreignTable,
Function.identity(), // LHS is already the FK
(foreignKey, foreignData) -> foreignData // As LHS is the fk (previously
mapped in mapValues), let's just return the foreign data (or a sub-part if
needed to reduce the join-result store size)
)
// Join back the LHS data with an equi-join
.join(mainTable, (foreignData, mainData) -> applyJoin(mainData,
foreignData));{code}
You may need to provide your Serde if there is no default global serde:
{code:java}
mainTable
// do not provide the store name to not create an additional store
.mapValues((mainData) -> mainData.foreignKey, Materialized.with(null, new
ForeignKeySerde()))
.join(...);{code}
With the given solution:
* The foreign-join result store contains less data as before
* There is no additional store, as the second join is co-localizing the
join-result table and the main table
* You can evolve the main table schema safely (adding a new field, changing
another field type, ...), except if you change the FK serde (or the sub-part
extracted for the foreign-join).
Spoiler: I'm the co-writer of the ticket KAFKA-15303, and it's been months
trying to find a clean solution without the need of an app reset.
> Foreign Key Join filtering out valid records after a code change / schema
> evolved
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-13386
> URL: https://issues.apache.org/jira/browse/KAFKA-13386
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.2
> Reporter: Sergio Duran Vegas
> Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant
> across upgrades. So in case of changes this opimitzation will drop
> invalid/intermediate records. In other situations we have relied on the same
> property, for example when computing whether an update is a duplicate result
> or not.
>
> The problem is that some serializers are sadly not deterministic.
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>
> {code:java}
> //If this value doesn't match the current value from the original table, it
> is stale and should be discarded.
> if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>
> A solution for this problem would be that the comparison use foreign-key
> reference itself instead of the whole message hash.
>
> The bug fix proposal is to be allow the user to choose between one method of
> comparison or another (whole hash or Fk reference). This would fix the
> problem of dropping valid records on certain cases and allow the user to also
> choose the current optimized way of checking valid records and intermediate
> results dropping.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)