adriangb opened a new pull request, #21931:
URL: https://github.com/apache/datafusion/pull/21931
## Which issue does this PR close?
- Refs #19858 — "Pushing down HashJoinExec build side dynamic filters makes
tpch queries slower". This PR addresses the structural side of the problem; the
adaptive selectivity work (#19639) is complementary.
## Rationale for this change
Today the `Partitioned`-mode `HashJoinExec` builds a dynamic filter that's
structured around the *repartition* layout:
```
CASE hash_repartition % N
WHEN 0 THEN p0_bounds AND (p0_inlist | p0_hash_lookup)
WHEN 1 THEN p1_bounds AND (p1_inlist | p1_hash_lookup)
...
ELSE false
END
```
Two problems with this:
1. **Per-row routing-hash cost.** Every probe row pays a `hash_repartition %
N` even though the partition's hash table will be probed anyway with a
different seed (`HASH_JOIN_SEED`).
2. **Coupling.** The dynamic filter's shape depends on how the build side
was repartitioned, even though semantically the filter is just "is this key
somewhere in the build side?"
This PR replaces the routing CASE with a structure that depends only on the
*content* of the build side, not its layout. It also adds a small
cross-partition merged `IN (SET)` fast path so small joins can participate in
parquet stats / bloom-filter pruning at the scan side.
## What changes are included in this PR?
Four commits:
1. **`perf: collapse all-Map dynamic filter into MultiMapLookupExpr`** — new
`MultiMapLookupExpr` that hashes the join keys once with `HASH_JOIN_SEED` and
ORs `contain_hashes()` across every reported partition's hash table. Replaces
CASE for the all-Map shape; ~halves the regression on TPC-H SF=1 (+3.0% → +1.6%
TOTAL min vs. dynamic-filter-off).
2. **`perf: collapse small all-InList dynamic filter into one
cross-partition IN (SET)`** — when every reported partition contributed an
InList array and the cross-partition union is small enough, concatenate them
into a single global `IN (SET)`. The point isn't runtime selectivity but
parquet stats / bloom-filter pruning at the scan, which a `multi_hash_lookup`
can't do. A cap sweep at SF=1 picked 20–50 as the runtime sweet spot (past ~200
the larger `static_filter` hash set blows out of L1).
3. **`refactor: drop CASE routing from Partitioned dynamic filters`** —
`PushdownStrategy` is now a struct that always carries the `Map` (the join's
hash table is built unconditionally) plus an optional InList array. With the
map always available, every reported partition can contribute to
`multi_hash_lookup`, and the routing CASE can go away entirely. Removes
`repartition_random_state` from `SharedBuildAccumulator`, drops the
`REPARTITION_RANDOM_STATE` import in `exec.rs`, and unifies the
`force_hash_collisions` snapshot — both branches now produce the same shape.
4. **`refactor: dedup cross-partition InList, reuse per-partition cap`** —
combine path explicitly deduplicates by `ScalarValue` (HashSet first-seen walk
+ one `arrow::compute::take`) and re-gates on distinct count. The same
`optimizer.hash_join_inlist_pushdown_max_distinct_values` knob now caps both
per-partition InList eligibility and the cross-partition merged set. Default
lowered from 150 → 20 to align with parquet stats / bloom-filter pruning
practicality.
### Final filter-shape matrix (after the PR)
| input | shape |
|-------|-------|
| `enable_dynamic_filter_pushdown=false` | no filter installed |
| `CollectLeft`, build empty | filter stays at `lit(true)` |
| `CollectLeft`, build small (under inlist caps) | `bounds AND IN (SET)` |
| `CollectLeft`, build large | `bounds AND hash_lookup` |
| `Partitioned`, all reported partitions empty | `lit(false)` |
| `Partitioned`, any partition canceled | `lit(true)` |
| `Partitioned`, every partition InList AND combined ≤ cap | `bounds AND IN
(SET)` (parquet-prunable) |
| `Partitioned`, anything else | `bounds AND multi_hash_lookup` |
No more `CASE`, no more `hash_repartition`, no more
`REPARTITION_RANDOM_STATE` in the dynamic-filter path.
## Are these changes tested?
- Existing `joins::hash_join` lib tests pass (380 tests).
- `physical_optimizer::filter_pushdown` integration tests pass (51 tests).
One snapshot updated:
- `test_hashjoin_dynamic_filter_pushdown_partitioned-2` (the
no-`force_hash_collisions` snapshot) now matches the merged `IN (SET)` shape
that the `force_hash_collisions` snapshot already had — the size-gated path
subsumes that special case.
- `test_hashjoin_hash_table_pushdown_partitioned` now positively asserts
`multi_hash_lookup` (and the absence of `hash_repartition`) instead of just
`hash_lookup`.
- `push_down_filter_parquet.slt` passes.
- `cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings` is
clean.
### Bench (TPC-H SF=1, 7 iters back-to-back)
Reduction in dynamic-filter-pushdown regression vs. the legacy CASE design:
```
TOTAL min vs no_DF:
legacy CASE: +3.0%
this PR (final): +1.6% (~halves the regression)
```
Per-query wins (this PR vs legacy CASE) include Q4 (-6.0%), Q5 (-3.3%), Q7
(-6.0%), Q8 (-4.0%), Q9 (-3.5%), Q12 (-3.2%), Q17 (-1.6%), Q21 (-3.8%). A
handful of small queries (Q11, Q14) regress 1–4 % because the moderate-InList
shape they used to take through CASE now uses `multi_hash_lookup`; net is still
a clear win.
The bigger win that won't show in TPC-H runtime is parquet I/O: the small
merged `IN (SET)` path produces a predicate that scans can use for
stats/bloom-filter pruning, which the routing CASE never could.
## Are there any user-facing changes?
- **Default lowered:**
`optimizer.hash_join_inlist_pushdown_max_distinct_values` 150 → 20. Affects
which build-side shapes choose InList vs. hash-table pushdown per partition,
and now also gates the cross-partition merged InList. Users who depend on the
old per-partition behavior can set it back to 150 in their config. Doc string
in `config.rs` and `docs/source/user-guide/configs.md` are updated.
- **Plan output:** `EXPLAIN ANALYZE` for `Partitioned` hash joins no longer
shows `CASE hash_repartition % N WHEN ...`. Instead it shows either `bounds AND
struct(...) IN (SET) ([...])` (small joins) or `bounds AND multi_hash_lookup`
(everything else).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]