alamb commented on issue #21207: URL: https://github.com/apache/datafusion/issues/21207#issuecomment-4316703423
# Restatement of the Problem I think the core problem is that the DynamicFilter `CASE hash(join_keys) % N` is assuming that the declared input [`Partitioning`](https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html) matches reality (in this case hash partitioning). In @gene-bordegaray 's case they do not have hash partitioned data -- the plan actually has range partitioned data but *incorrectly reports to DataFusion it is hash partitioned* (because DataFusion has no way to represent range partitioning) # Proposed Solution for aligned data We previously suggested that teaching DataFusion to avoid the `CASE` expression when the inputs to a HashJoin had the exact same hash partitioning on join key(s) (aka 1:1 partition alignment) would be a good general purpose optimization For example if we have three partitions today, the dynamic filter expression is like this ```sql CASE hash(join_keys) % N WHEN 0 THEN bounds_0 AND membership_0 -- Information from Partition 0 hash table WHEN 1 THEN bounds_1 AND membership_1 -- Information from Partition 1 hash table WHEN 2 THEN bounds_2 AND membership_2 -- Information from Partition 2 hash table ELSE false END ``` If we know that both inputs are hash partitioned on join_keys then we can split this into three separate expressions, one for each partition: **Partition 0:** ```sql bounds_0 AND membership_0 ``` **Partition 1:** ```sql bounds_1 AND membership_1 ``` **Partition 2:** ```sql bounds_2 AND membership_2 ``` However, as @gene-bordegaray notes [here](https://github.com/apache/datafusion/issues/21207#issuecomment-4254968115) it won't work if the partitioning doesn't match exactly: > Why We Can't Always Bind Filters to Partitions > An intuitive fix is (which we discussed): "store a filter per build partition, bind each scan > partition to its matching filter." This only works when scan partition P maps 1:1 to join partition > P. When a RepartitionExec(Hash) sits between the scan and the join, the mapping breaks: # Possible near term workaround solution One thing I thought of while reading this description is that maybe you can work around the CASE problem with some special purpose rewriter For example you can finds the dynamic pattern ```sql CASE hash(join_keys) % N WHEN 0 THEN bounds_0 AND membership_0 WHEN 1 THEN bounds_1 AND membership_1 ... ELSE false END ``` And rewrite it to ```sql (bounds_0 AND membership_0) OR (bounds_1 AND membership_1) OR ... ``` This will not be as fast to evaluate as the case based evaluation but I think it will produce the correct results in your case and it may actually work better for pruning in some cases (as `OR` may be easier to interpret) # Clean Solution I think the cleanest solution to make this scenario work well and not be brittle (lot's of bugs, etc) is to teach DataFusion to express the actual data partitioning (rather than claiming pre-partitioned data that is not Hash partitioned is actually Hash partitioned) One obvious thing to do would be to add a `Partitioning::Range` variant to [`Partitioning`](https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html). If Range is not expressive enough, we could add a version implemented via a trait like`Partitioning::Custom(Arc<dyn MyPartitioning>)` The downside of this approach, is that it will require substantial effort (breaking API change, etc). The upside is that the HashJoin and optimizer will have the correct information it needs to make the proper dynamic filters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
