alamb commented on issue #21207:
URL: https://github.com/apache/datafusion/issues/21207#issuecomment-4316703423

   # Restatement of the Problem
   I think the core problem is that the DynamicFilter `CASE hash(join_keys) % 
N` is assuming that the declared input 
[`Partitioning`](https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html)
 matches reality (in this case hash partitioning). 
   
   In @gene-bordegaray 's case they do not have hash partitioned data -- the 
plan actually has range partitioned data but *incorrectly reports to DataFusion 
it is hash partitioned* (because DataFusion has no way to represent range 
partitioning)
   
   # Proposed Solution for aligned data
   
   We previously suggested that teaching DataFusion  to avoid the `CASE` 
expression when the inputs to a HashJoin had the exact same hash partitioning 
on join key(s) (aka 1:1 partition alignment) would be a good general purpose 
optimization 
   
   For example if we have three partitions today, the dynamic filter expression 
is like this
   
   ```sql
   CASE hash(join_keys) % N
       WHEN 0 THEN bounds_0 AND membership_0      -- Information from Partition 
0 hash table
       WHEN 1 THEN bounds_1 AND membership_1      -- Information from Partition 
1 hash table
       WHEN 2 THEN bounds_2 AND membership_2      -- Information from Partition 
2 hash table
       ELSE false
   END
   ```
   
   If we know that both inputs are hash partitioned on join_keys then we can 
split this into three separate expressions, one for each partition:
   
   **Partition 0:**
   ```sql
   bounds_0 AND membership_0
   ```
   
   **Partition 1:**
   ```sql
   bounds_1 AND membership_1
   ```
   
   **Partition 2:**
   ```sql
   bounds_2 AND membership_2
   ```
   
   
   However, as @gene-bordegaray  notes 
[here](https://github.com/apache/datafusion/issues/21207#issuecomment-4254968115)
 it won't work if the partitioning doesn't match exactly:
   > Why We Can't Always Bind Filters to Partitions
   > An intuitive fix is (which we discussed): "store a filter per build 
partition, bind each scan
   > partition to its matching filter." This only works when scan partition P 
maps 1:1 to join partition
   > P. When a RepartitionExec(Hash) sits between the scan and the join, the 
mapping breaks:
   
   
   # Possible near term workaround solution
   
   One thing I thought of while reading this description is that maybe you can 
work around the CASE problem with some special purpose rewriter 
   
   For example you can finds the dynamic pattern
   ```sql
   CASE hash(join_keys) % N
       WHEN 0 THEN bounds_0 AND membership_0
       WHEN 1 THEN bounds_1 AND membership_1
       ...
       ELSE false
   END
   ```
   
   And rewrite it to
   
   ```sql
   (bounds_0 AND membership_0) OR (bounds_1 AND membership_1) OR ...
   ```
   
   This will not be as fast to evaluate as the case based evaluation but I 
think it will produce the correct results in your case and it may actually work 
better for pruning  in some cases (as `OR` may be easier to interpret)
   
   # Clean Solution
   
   I think the cleanest solution to make this scenario work well and not be 
brittle (lot's of bugs, etc) is to teach DataFusion to express the actual data 
partitioning (rather than claiming pre-partitioned data that is not Hash 
partitioned is actually Hash partitioned)
   
   One obvious thing to do would be to add a `Partitioning::Range` variant to 
[`Partitioning`](https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html).
 If Range is not expressive enough, we could add a version implemented via a 
trait like`Partitioning::Custom(Arc<dyn MyPartitioning>)`
   
   The downside of this approach, is that it will require substantial effort 
(breaking API change, etc). The upside is that the HashJoin and optimizer will 
have the correct information it needs to make the proper dynamic filters. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to