adriangb opened a new pull request, #21931:
URL: https://github.com/apache/datafusion/pull/21931

   ## Which issue does this PR close?
   
   - Refs #19858 — "Pushing down HashJoinExec build side dynamic filters makes 
tpch queries slower". This PR addresses the structural side of the problem; the 
adaptive selectivity work (#19639) is complementary.
   
   ## Rationale for this change
   
   Today the `Partitioned`-mode `HashJoinExec` builds a dynamic filter that's 
structured around the *repartition* layout:
   
   ```
   CASE hash_repartition % N
     WHEN 0 THEN p0_bounds AND (p0_inlist | p0_hash_lookup)
     WHEN 1 THEN p1_bounds AND (p1_inlist | p1_hash_lookup)
     ...
     ELSE false
   END
   ```
   
   Two problems with this:
   
   1. **Per-row routing-hash cost.** Every probe row pays a `hash_repartition % 
N` even though the partition's hash table will be probed anyway with a 
different seed (`HASH_JOIN_SEED`).
   2. **Coupling.** The dynamic filter's shape depends on how the build side 
was repartitioned, even though semantically the filter is just "is this key 
somewhere in the build side?"
   
   This PR replaces the routing CASE with a structure that depends only on the 
*content* of the build side, not its layout. It also adds a small 
cross-partition merged `IN (SET)` fast path so small joins can participate in 
parquet stats / bloom-filter pruning at the scan side.
   
   ## What changes are included in this PR?
   
   Four commits:
   
   1. **`perf: collapse all-Map dynamic filter into MultiMapLookupExpr`** — new 
`MultiMapLookupExpr` that hashes the join keys once with `HASH_JOIN_SEED` and 
ORs `contain_hashes()` across every reported partition's hash table. Replaces 
CASE for the all-Map shape; ~halves the regression on TPC-H SF=1 (+3.0% → +1.6% 
TOTAL min vs. dynamic-filter-off).
   
   2. **`perf: collapse small all-InList dynamic filter into one 
cross-partition IN (SET)`** — when every reported partition contributed an 
InList array and the cross-partition union is small enough, concatenate them 
into a single global `IN (SET)`. The point isn't runtime selectivity but 
parquet stats / bloom-filter pruning at the scan, which a `multi_hash_lookup` 
can't do. A cap sweep at SF=1 picked 20–50 as the runtime sweet spot (past ~200 
the larger `static_filter` hash set blows out of L1).
   
   3. **`refactor: drop CASE routing from Partitioned dynamic filters`** — 
`PushdownStrategy` is now a struct that always carries the `Map` (the join's 
hash table is built unconditionally) plus an optional InList array. With the 
map always available, every reported partition can contribute to 
`multi_hash_lookup`, and the routing CASE can go away entirely. Removes 
`repartition_random_state` from `SharedBuildAccumulator`, drops the 
`REPARTITION_RANDOM_STATE` import in `exec.rs`, and unifies the 
`force_hash_collisions` snapshot — both branches now produce the same shape.
   
   4. **`refactor: dedup cross-partition InList, reuse per-partition cap`** — 
combine path explicitly deduplicates by `ScalarValue` (HashSet first-seen walk 
+ one `arrow::compute::take`) and re-gates on distinct count. The same 
`optimizer.hash_join_inlist_pushdown_max_distinct_values` knob now caps both 
per-partition InList eligibility and the cross-partition merged set. Default 
lowered from 150 → 20 to align with parquet stats / bloom-filter pruning 
practicality.
   
   ### Final filter-shape matrix (after the PR)
   
   | input | shape |
   |-------|-------|
   | `enable_dynamic_filter_pushdown=false` | no filter installed |
   | `CollectLeft`, build empty | filter stays at `lit(true)` |
   | `CollectLeft`, build small (under inlist caps) | `bounds AND IN (SET)` |
   | `CollectLeft`, build large | `bounds AND hash_lookup` |
   | `Partitioned`, all reported partitions empty | `lit(false)` |
   | `Partitioned`, any partition canceled | `lit(true)` |
   | `Partitioned`, every partition InList AND combined ≤ cap | `bounds AND IN 
(SET)` (parquet-prunable) |
   | `Partitioned`, anything else | `bounds AND multi_hash_lookup` |
   
   No more `CASE`, no more `hash_repartition`, no more 
`REPARTITION_RANDOM_STATE` in the dynamic-filter path.
   
   ## Are these changes tested?
   
   - Existing `joins::hash_join` lib tests pass (380 tests).
   - `physical_optimizer::filter_pushdown` integration tests pass (51 tests). 
One snapshot updated:
     - `test_hashjoin_dynamic_filter_pushdown_partitioned-2` (the 
no-`force_hash_collisions` snapshot) now matches the merged `IN (SET)` shape 
that the `force_hash_collisions` snapshot already had — the size-gated path 
subsumes that special case.
     - `test_hashjoin_hash_table_pushdown_partitioned` now positively asserts 
`multi_hash_lookup` (and the absence of `hash_repartition`) instead of just 
`hash_lookup`.
   - `push_down_filter_parquet.slt` passes.
   - `cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings` is 
clean.
   
   ### Bench (TPC-H SF=1, 7 iters back-to-back)
   
   Reduction in dynamic-filter-pushdown regression vs. the legacy CASE design:
   
   ```
   TOTAL min vs no_DF:
     legacy CASE:        +3.0%
     this PR (final):    +1.6%   (~halves the regression)
   ```
   
   Per-query wins (this PR vs legacy CASE) include Q4 (-6.0%), Q5 (-3.3%), Q7 
(-6.0%), Q8 (-4.0%), Q9 (-3.5%), Q12 (-3.2%), Q17 (-1.6%), Q21 (-3.8%). A 
handful of small queries (Q11, Q14) regress 1–4 % because the moderate-InList 
shape they used to take through CASE now uses `multi_hash_lookup`; net is still 
a clear win.
   
   The bigger win that won't show in TPC-H runtime is parquet I/O: the small 
merged `IN (SET)` path produces a predicate that scans can use for 
stats/bloom-filter pruning, which the routing CASE never could.
   
   ## Are there any user-facing changes?
   
   - **Default lowered:** 
`optimizer.hash_join_inlist_pushdown_max_distinct_values` 150 → 20. Affects 
which build-side shapes choose InList vs. hash-table pushdown per partition, 
and now also gates the cross-partition merged InList. Users who depend on the 
old per-partition behavior can set it back to 150 in their config. Doc string 
in `config.rs` and `docs/source/user-guide/configs.md` are updated.
   - **Plan output:** `EXPLAIN ANALYZE` for `Partitioned` hash joins no longer 
shows `CASE hash_repartition % N WHEN ...`. Instead it shows either `bounds AND 
struct(...) IN (SET) ([...])` (small joins) or `bounds AND multi_hash_lookup` 
(everything else).
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to