gene-bordegaray commented on issue #21207:
URL: https://github.com/apache/datafusion/issues/21207#issuecomment-4254968115
Hey everyone sorry for delay, after discussions we had I have come up with a
proposal for dynamic filtering with partitioned data. This builds off ideas
brought up from @alamb @NGA-TRAN @stuhood @adriangb and @LiaCastaneda . I
explain why we can't always just bind partition X on the buidl side to
partition X on the probe side (like we suggested in the meeting), and provide
an alternative approach that achieves the filter behavior and API changes local
to `DynamicFilterPhysicalExpr` (no public API changes to `PhysicalExpr`)
<details>
<summary>Click to see proposal</summary>
# Dynamic Filter Partition Routing
## Background
In Partitioned hash join mode, each build partition produces its own filter
consisting of two parts:
- **Bounds:** `col >= min AND col <= max` for each join key.
- **Membership:** An `InList` (small build sides) or `HashTableLookup`
(large build sides) that
checks whether a probe row's join key exists in that partition's hash
table.
Each partition's filter is: `bounds_P AND membership_P`.
## Current Approach: CASE Expression Routing
Today, per-partition filters are combined into a single `CASE` statement
pushed to the probe side
`DataSourceExec`:
```
CASE hash(join_keys) % N
WHEN 0 THEN bounds_0 AND membership_0
WHEN 1 THEN bounds_1 AND membership_1
...
ELSE false
END
```
The `hash()` uses the same algorithm as `RepartitionExec`. Each probe row is
routed to the correct
partition's filter before `RepartitionExec` shuffles it to that partition.
```
HashJoinExec(Partitioned, N=4)
RepartitionExec(Hash, 4)
DataSourceExec -> CASE filter pushed here
RepartitionExec(Hash, 4)
DataSourceExec -> CASE filter pushed here
```
### Problem 1: Incorrect Routing for File-Partitioned Data
When `preserve_file_partitions` is enabled, both sides keep their file-group
partitioning. There is
no `RepartitionExec`, so scan partition P goes directly to join partition P:
```
HashJoinExec(Partitioned, N=3)
DataSourceExec (3 file groups: A, B, C) ─> build
DataSourceExec (3 file groups: A, B, C) ─> probe
```
The CASE expression routes using `hash(col) % 3`, but file groups use
value-based partitioning, even
though they declare `Hash`. These are not aligned:
```
Build partition 0 has col = 'A', creates filter for 'A'
CASE routes probe row with col = 'A':
- hash('A') % 3 = 2 ─> uses partition 2's filter
- Partition 2's filter is for col = 'C'
- Row with col = 'A' is incorrectly rejected
```
### Problem 2: No Statistics Pruning
The pruning evaluator works by testing predicates against row group min/max
statistics. It cannot
evaluate `hash(col) % N` because there is no relationship between
`hash(col)` and the column's
min/max range. Thus, is returns "unknown" for the CASE expression, meaning
no row groups are ever
skipped in Partitioned mode (every row group is opened and scanned even if
all its rows would be
filtered out).
## Why We Can't Always Bind Filters to Partitions
An intuitive fix is (which we discussed): "store a filter per build
partition, bind each scan
partition to its matching filter." This only works when scan partition P
maps 1:1 to join partition
P. When a `RepartitionExec(Hash)` sits between the scan and the join, the
mapping breaks:
```
HashJoinExec(Partitioned, N=4)
RepartitionExec(Hash, 4) -> rows shuffled by hash(col) % 4
DataSourceExec (1 file) -> scan partition 0 contains rows for ALL
join partitions
```
If we bind scan partition 0 to join filter 0, we apply partition 0's filter
to every row in the
file, incorrectly pruning rows meant for partitions 1-3.
## Proposed Solution: Detect Repartition, Choose Routing Mode
At optimizer time, detect whether each side of the join has a
`RepartitionExec` between the scan and
the join. Choose the routing strategy based on alignment:
```
both sides repartitioned ─> CaseHash (existing behavior)
neither side repartitioned ─> PartitionIndex (new)
one side only (misaligned) ─> Error
```
The misaligned case means one side claims Hash partitioning without actually
being hash distributed
(rooting from a `preserve_file_partitions` on only one side). This is a
planning issue as the join
itself would produce incorrect results.
### Repartition Detection in enforce_distribution
The `enforce_distribution` optimizer already decides whether to insert
`RepartitionExec`. We extend
its context to track whether each side was repartitioned.
The existing `PlanContext<bool>` (tracking `dist_changing`) is replaced with
a `DistFlags` struct:
```rust
pub struct DistFlags {
pub dist_changing: bool,
/// Whether output partitioning originates from a RepartitionExec.
pub repartitioned: bool,
}
```
After distribution is enforced, check both children of any `Partitioned`
hash join and set the
routing mode on `HashJoinExec` via a `DynamicFilterRoutingMode` enum.
### CaseHash Mode (RepartitionExec present)
This is the existing behavior. The CASE expression routes each probe row to
the correct partition's
filter using the same hash function as `RepartitionExec`:
```
HashJoinExec(Partitioned, N=4, routing=CaseHash)
RepartitionExec(Hash, 4)
DataSourceExec ─> CASE filter pushed here
RepartitionExec(Hash, 4)
DataSourceExec ─> CASE filter pushed here
```
Each scan partition contains rows for all join partitions (the
`RepartitionExec` will shuffle them
after the scan). The CASE ensures each row is checked against only the
correct partition's filter.
### PartitionIndex Mode (no RepartitionExec)
Scan partition P maps directly to join partition P:
```
HashJoinExec(Partitioned, N=3, routing=PartitionIndex)
DataSourceExec (3 file groups: A, B, C) ─> build
DataSourceExec (3 file groups: A, B, C) ─> probe, dynamic filter pushed
here
```
`DynamicFilterPhysicalExpr` acts as a channel between the join (producer)
and the scan (consumer).
In `PartitionIndex` mode, it keeps two representations of the filter:
- `current()` / `Inner.expr`: the global executable fallback, typically the
OR of all per-partition
filters
- `Inner.partitioned_exprs`: the per-partition filters used when a scan
partition can be bound to a
join partition
This keeps `DynamicFilterPhysicalExpr` usable as a normal `PhysicalExpr`
while still allowing
partition-local specialization at file-open time.
_Note:_ Generic expression consumers still use `current()`, which returns a
single executable
expression (the global OR). Partition-aware consumers such as the parquet
opener use the partiton
filter state by calling `partition_filter(partition_index)`.
Two new methods are added to `DynamicFilterPhysicalExpr` (not to
`PhysicalExpr`, to avoid exposing
one-off partition-routing behavior more broadly):
1. **Producer API:** `update_partitioned(global_expr, partition_exprs)`.
Called by the hash join
build-side accumulator after all build partitions complete. Stores:
- `global_expr` in `Inner.expr`
- `partition_exprs` in `Inner.partitioned_exprs`
2. **Consumer API:** `partition_filter(index) -> Result<Option<Arc<dyn
PhysicalExpr>>>`. Called by
the file opener at open time.
- `None` means no per-partition data is available for this filter (for
example `CaseHash` /
`CollectLeft`)
- `Some(false)` means the requested build partition was empty
- `Some(expr)` means use that partition's concrete filter
At file-open time, the file opener rewrites the pushed predicate tree
recursively, so any
`DynamicFilterPhysicalExpr` found in the predicate is replaced with
`partition_filter(partition_index)`. This rewrite happens before
partition-column / constant-column
literal replacement so that the partition-sepecific filter can have a say in
pruning. This is
important for situations like:
```text
static_predicate AND DynamicFilter[...]
```
where the dynamic filter may not be the root of the predicate tree.
**Example - file-partitioned, 3 partitions:**
```
Build partitions:
Partition 0 (col=A files): HT_0 = {1, 42, 99}, bounds [1, 99]
Partition 1 (col=B files): HT_1 = {200, 250}, bounds [200, 250]
Partition 2 (col=C files): HT_2 = {500, 550}, bounds [500, 550]
Partition 0's opener calls partition_filter(0):
Receives: col >= 1 AND col <= 99 AND col IN HT_0
col = 42: bounds pass, 42 IN HT_0? Yes ─> Pass
col = 55: bounds pass, 55 IN HT_0? No ─> Reject
Partition 1's opener calls partition_filter(1):
Receives: col >= 200 AND col <= 250 AND col IN HT_1
col = 200: bounds pass, 200 IN HT_1? Yes ─> Pass
```
Each partition gets a tight filter with O(1) evaluation cost.
</details>
I have begun working on a draft PR for this and will attach here when it is
in a somewhat readable state. Let me know if you guys have suggestions,
concerns, or questions 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]