[
https://issues.apache.org/jira/browse/KAFKA-18692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923100#comment-17923100
]
appchemist edited comment on KAFKA-18692 at 2/2/25 2:38 PM:
------------------------------------------------------------
hi [~mjsax]
I don't have a full grasp of the Streams code yet.
but if you don't mind, can I try that POC with your feedback?
was (Author: appchemist):
I don't have a full grasp of the Streams code yet.
but if you don't mind, can I try that POC with your feedback?
> Consider to unify KStreamImpl "repartitionRequired" with GraphNode
> "keyChangingOperation"
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-18692
> URL: https://issues.apache.org/jira/browse/KAFKA-18692
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Priority: Minor
>
> In `KStreamImpl`, we use a flag `repartitionRequired` which allows us to
> track if an operator (or one of its transitive ancestors) did modify the key,
> so if a key-dependent operation like aggregation or join is executed, we know
> if we need to insert an auto-repartition step.
> In parallel, we also track key-changing operations in our internal "topology
> graph" representation (which is use by our topology optimization layer), via
> GraphNode#keyChangingOperation().
> Thus, we have two independent code path which do a similar thing (note, both
> are semantically not exactly the same thing, so we need to be careful to get
> this right; more details below). To avoid subtle bugs, it might be worth to
> refactor the code, and to unify both.
> The high level idea would be (without me looking into details) to remove
> `KStreamImpl#repartitionRequired` flag all together, as we already pass in a
> `GraphNode` into `KStreamImpl` and thus can access
> `isKeyChangingOperation()`. However, `isKeyChangingOperation()` only tracks
> if the _current_ node is key-changing, and if it returns `false` we don't
> know anything about it's ancestors. Thus, semantics are different to
> `repartitionRequired` flag, which already considers ancestor information.
> Hence, we might need to traverse the `GraphNode` structure backwards, to
> verify if a parent did change the key or not (cf
> `InternalStreamsBuilder#getKeyChangingParentNode()` that I believe we could
> re-use).
> Another thing to consider is, that some operators like `repartition()`
> explicit reset `repartitionRequired=false` in a hard-coded way. However,
> `keyChangingOperation=false` does not carry this information – we don't know
> if the current operator just does not touch the key, or if the current
> operator _ensures_ that we are partitioned by the current key. Ie, we need a
> new way to track this information on `GraphNode` to have a way to stop/break
> the backward traversal if we hit such a node which does this "reset".
> Overall, I believe it would be good to do this rewrite (based on an educated
> guess...), but it might also turn out (after some POC coding) that it's not a
> good idea and it's not worth to do as could add more problems as it solves,
> and we might also just need to close this ticket as "won't fix". Only a PR
> could tell for sure.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)