gene-bordegaray commented on issue #21207:
URL: https://github.com/apache/datafusion/issues/21207#issuecomment-4254968115

   Hey everyone sorry for delay, after discussions we had I have come up with a 
proposal for dynamic filtering with partitioned data. This builds off ideas 
brought up from @alamb @NGA-TRAN @stuhood @adriangb and @LiaCastaneda . I 
explain why we can't always just bind partition X on the buidl side to 
partition X on the probe side (like we suggested in the meeting), and provide 
an alternative approach that achieves the filter behavior and API changes local 
to `DynamicFilterPhysicalExpr` (no public API changes to `PhysicalExpr`)
   
   <details>
     <summary>Click to see proposal</summary>
   # Dynamic Filter Partition Routing
   
   ## Background
   
   In Partitioned hash join mode, each build partition produces its own filter 
consisting of two parts:
   
   - **Bounds:** `col >= min AND col <= max` for each join key.
   - **Membership:** An `InList` (small build sides) or `HashTableLookup` 
(large build sides) that
     checks whether a probe row's join key exists in that partition's hash 
table.
   
   Each partition's filter is: `bounds_P AND membership_P`.
   
   ## Current Approach: CASE Expression Routing
   
   Today, per-partition filters are combined into a single `CASE` statement 
pushed to the probe side
   `DataSourceExec`:
   
   ```
   CASE hash(join_keys) % N
       WHEN 0 THEN bounds_0 AND membership_0
       WHEN 1 THEN bounds_1 AND membership_1
       ...
       ELSE false
   END
   ```
   
   The `hash()` uses the same algorithm as `RepartitionExec`. Each probe row is 
routed to the correct
   partition's filter before `RepartitionExec` shuffles it to that partition.
   
   ```
   HashJoinExec(Partitioned, N=4)
       RepartitionExec(Hash, 4)
           DataSourceExec -> CASE filter pushed here
       RepartitionExec(Hash, 4)
           DataSourceExec -> CASE filter pushed here
   ```
   
   ### Problem 1: Incorrect Routing for File-Partitioned Data
   
   When `preserve_file_partitions` is enabled, both sides keep their file-group 
partitioning. There is
   no `RepartitionExec`, so scan partition P goes directly to join partition P:
   
   ```
   HashJoinExec(Partitioned, N=3)
       DataSourceExec (3 file groups: A, B, C) ─> build
       DataSourceExec (3 file groups: A, B, C) ─> probe
   ```
   
   The CASE expression routes using `hash(col) % 3`, but file groups use 
value-based partitioning, even
   though they declare `Hash`. These are not aligned:
   
   ```
   Build partition 0 has col = 'A', creates filter for 'A'
   CASE routes probe row with col = 'A':
       - hash('A') % 3 = 2  ─> uses partition 2's filter
       - Partition 2's filter is for col = 'C'
       - Row with col = 'A' is incorrectly rejected
   ```
   
   ### Problem 2: No Statistics Pruning
   
   The pruning evaluator works by testing predicates against row group min/max 
statistics. It cannot
   evaluate `hash(col) % N` because there is no relationship between 
`hash(col)` and the column's
   min/max range. Thus, is returns "unknown" for the CASE expression, meaning 
no row groups are ever
   skipped in Partitioned mode (every row group is opened and scanned even if 
all its rows would be
   filtered out).
   
   ## Why We Can't Always Bind Filters to Partitions
   
   An intuitive fix is (which we discussed): "store a filter per build 
partition, bind each scan
   partition to its matching filter." This only works when scan partition P 
maps 1:1 to join partition
   P. When a `RepartitionExec(Hash)` sits between the scan and the join, the 
mapping breaks:
   
   ```
   HashJoinExec(Partitioned, N=4)
       RepartitionExec(Hash, 4)    -> rows shuffled by hash(col) % 4
           DataSourceExec (1 file) -> scan partition 0 contains rows for ALL 
join partitions
   ```
   
   If we bind scan partition 0 to join filter 0, we apply partition 0's filter 
to every row in the
   file, incorrectly pruning rows meant for partitions 1-3.
   
   ## Proposed Solution: Detect Repartition, Choose Routing Mode
   
   At optimizer time, detect whether each side of the join has a 
`RepartitionExec` between the scan and
   the join. Choose the routing strategy based on alignment:
   
   ```
   both sides repartitioned     ─> CaseHash (existing behavior)
   neither side repartitioned   ─> PartitionIndex (new)
   one side only (misaligned)   ─> Error
   ```
   
   The misaligned case means one side claims Hash partitioning without actually 
being hash distributed
   (rooting from a `preserve_file_partitions` on only one side). This is a 
planning issue as the join
   itself would produce incorrect results.
   
   ### Repartition Detection in enforce_distribution
   
   The `enforce_distribution` optimizer already decides whether to insert 
`RepartitionExec`. We extend
   its context to track whether each side was repartitioned.
   
   The existing `PlanContext<bool>` (tracking `dist_changing`) is replaced with 
a `DistFlags` struct:
   
   ```rust
   pub struct DistFlags {
       pub dist_changing: bool,
       /// Whether output partitioning originates from a RepartitionExec.
       pub repartitioned: bool,
   }
   ```
   
   After distribution is enforced, check both children of any `Partitioned` 
hash join and set the
   routing mode on `HashJoinExec` via a `DynamicFilterRoutingMode` enum.
   
   ### CaseHash Mode (RepartitionExec present)
   
   This is the existing behavior. The CASE expression routes each probe row to 
the correct partition's
   filter using the same hash function as `RepartitionExec`:
   
   ```
   HashJoinExec(Partitioned, N=4, routing=CaseHash)
       RepartitionExec(Hash, 4)
           DataSourceExec ─> CASE filter pushed here
       RepartitionExec(Hash, 4)
           DataSourceExec ─> CASE filter pushed here
   ```
   
   Each scan partition contains rows for all join partitions (the 
`RepartitionExec` will shuffle them
   after the scan). The CASE ensures each row is checked against only the 
correct partition's filter.
   
   ### PartitionIndex Mode (no RepartitionExec)
   
   Scan partition P maps directly to join partition P:
   
   ```
   HashJoinExec(Partitioned, N=3, routing=PartitionIndex)
       DataSourceExec (3 file groups: A, B, C) ─> build
       DataSourceExec (3 file groups: A, B, C) ─> probe, dynamic filter pushed 
here
   ```
   
   `DynamicFilterPhysicalExpr` acts as a channel between the join (producer) 
and the scan (consumer).
   In `PartitionIndex` mode, it keeps two representations of the filter:
   
   - `current()` / `Inner.expr`: the global executable fallback, typically the 
OR of all per-partition
     filters
   - `Inner.partitioned_exprs`: the per-partition filters used when a scan 
partition can be bound to a
     join partition
   
   This keeps `DynamicFilterPhysicalExpr` usable as a normal `PhysicalExpr` 
while still allowing
   partition-local specialization at file-open time.
   
   _Note:_ Generic expression consumers still use `current()`, which returns a 
single executable
   expression (the global OR). Partition-aware consumers such as the parquet 
opener use the partiton
   filter state by calling `partition_filter(partition_index)`.
   
   Two new methods are added to `DynamicFilterPhysicalExpr` (not to 
`PhysicalExpr`, to avoid exposing
   one-off partition-routing behavior more broadly):
   
   1. **Producer API:** `update_partitioned(global_expr, partition_exprs)`. 
Called by the hash join
      build-side accumulator after all build partitions complete. Stores:
      - `global_expr` in `Inner.expr`
      - `partition_exprs` in `Inner.partitioned_exprs`
   
   2. **Consumer API:** `partition_filter(index) -> Result<Option<Arc<dyn 
PhysicalExpr>>>`. Called by
      the file opener at open time.
      - `None` means no per-partition data is available for this filter (for 
example `CaseHash` /
        `CollectLeft`)
      - `Some(false)` means the requested build partition was empty
      - `Some(expr)` means use that partition's concrete filter
   
   At file-open time, the file opener rewrites the pushed predicate tree 
recursively, so any
   `DynamicFilterPhysicalExpr` found in the predicate is replaced with
   `partition_filter(partition_index)`. This rewrite happens before 
partition-column / constant-column
   literal replacement so that the partition-sepecific filter can have a say in 
pruning. This is
   important for situations like:
   
   ```text
   static_predicate AND DynamicFilter[...]
   ```
   
   where the dynamic filter may not be the root of the predicate tree.
   
   **Example - file-partitioned, 3 partitions:**
   
   ```
   Build partitions:
       Partition 0 (col=A files):   HT_0 = {1, 42, 99},  bounds [1, 99]
       Partition 1 (col=B files):   HT_1 = {200, 250},   bounds [200, 250]
       Partition 2 (col=C files):   HT_2 = {500, 550},   bounds [500, 550]
   
   Partition 0's opener calls partition_filter(0):
       Receives: col >= 1 AND col <= 99 AND col IN HT_0
       col = 42:  bounds pass, 42 IN HT_0? Yes ─> Pass
       col = 55:  bounds pass, 55 IN HT_0? No  ─> Reject
   
   Partition 1's opener calls partition_filter(1):
       Receives: col >= 200 AND col <= 250 AND col IN HT_1
       col = 200: bounds pass, 200 IN HT_1? Yes ─> Pass
   ```
   
   Each partition gets a tight filter with O(1) evaluation cost.
   </details>
   
   I have begun working on a draft PR for this and will attach here when it is 
in a somewhat readable state. Let me know if you guys have suggestions, 
concerns, or questions 👍 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to