adriangb opened a new pull request, #23064:
URL: https://github.com/apache/datafusion/pull/23064

   ## Which issue does this PR close?
   
   - Relates to #20443.
   - Extracted from the abandoned #21267.
   
   ## Rationale for this change
   
   This adds support for generating dynamic filters for the left and right 
sides of a `SortMergeJoinExec`, enabling range-based pruning of both sides of 
the join (for `Inner` / `LeftSemi` / `RightSemi`). Consumers that support 
filter pushdown (e.g. Parquet scans) can use the pushed-down filter to prune at 
scan time.
   
   This is a scoped re-do of #21267, which was abandoned. It differs in two 
important ways.
   
   ### 1. It does not touch unrelated operators
   
   #21267 modified `handle_child_pushdown_result` in a number of operators 
(`coalesce_batches`, `coop`, `filter`, `projection`, `repartition`, `sort`, 
`sort_preserving_merge`, `limit`) to clone-and-rewrap their updated children. 
Those changes are **unnecessary**: the filter-pushdown driver 
(`push_down_filters` in `datafusion-physical-optimizer`) already rebuilds each 
parent with its updated children via `with_new_children_if_necessary` plus the 
`ptr_eq` safety net:
   
   ```rust
   if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) {
       res.updated_node = Some(updated_node)
   }
   ```
   
   The integration test `test_smj_dynamic_filter_present_in_plan` confirms the 
`DynamicFilter` reaches a `DataSourceExec` *through* the inserted `SortExec` + 
`RepartitionExec` with none of those operator changes.
   
   ### 2. Correctness: build from complete info, publish once
   
   The dynamic-filter coordinator is rewritten to follow the model already used 
by `HashJoinExec`'s `SharedBuildAccumulator`: **build the filter from complete 
information and publish it exactly once.**
   
   The original `SharedSortMergeBoundsAccumulator` shared a *single* dynamic 
filter across the concurrently-executing hash partitions feeding the join, and 
advanced a one-sided bound as partitions made progress — *raising* it when a 
partition exhausted. Because that single bound gates rows across all 
partitions, it could non-deterministically prune valid join rows. For example, 
with the existing `joins.slt` `Date32` inner-join test (`t1.c1 = {1,2,NULL,3}` 
⋈ `t2.c1 = {1,NULL,NULL,3}`, `batch_size=2`, `target_partitions=2`), it dropped 
the `key=3` row in roughly 4 of 6 runs.
   
   The accumulator now gathers the feeding side's global `[min, max]` (nulls 
skipped) and, only once **all** partitions are exhausted, publishes a static 
superset predicate `col >= min AND col <= max` (or `lit(false)` when no 
non-null keys were seen) and marks the filter complete.
   
   A consequence of the publish-once design: for the common `scan → SortExec → 
SMJ` shape, the blocking sort drains the scan before the bound is known, so 
there is no skip-ahead benefit there; the benefit accrues to pushdown-capable, 
already-sorted scans. A future enhancement could route per-partition advancing 
bounds via a hashed `CASE` expression (as `HashJoinExec`'s `Partitioned` mode 
does) to recover skip-ahead pruning while staying correct.
   
   ## What changes are included in this PR?
   
   - `SortMergeJoinExec` generates dynamic filters in 
`gather_filters_for_pushdown` (Post phase) and captures accepted ones in 
`handle_child_pushdown_result`.
   - New `SharedSortMergeBoundsAccumulator` (publish-once `[min, max]` 
superset, mirroring HashJoin).
   - The two SMJ stream implementations report join keys to the accumulator.
   - `dynamic_filter_updates` metric.
   - Tests: a fuzz/integration suite (`smj_filter_pushdown.rs`) including a 
multi-partition + NULL-key regression test that runs the previously-flaky shape 
40×, plus updated EXPLAIN expectations in `joins.slt`, `sort_merge_join.slt`, 
`explain_tree.slt`.
   
   ## Are these changes tested?
   
   Yes. New `smj_filter_pushdown` integration tests (correctness with/without 
the filter, plan-presence, and a multi-partition NULL-key regression test), 
updated sqllogictest EXPLAIN plans, plus the existing SMJ unit tests. The full 
sqllogictest suite, `datafusion-physical-optimizer`, and 
`datafusion-physical-plan` join tests pass.
   
   ## Are there any user-facing changes?
   
   EXPLAIN output for plans containing a `SortMergeJoinExec` with 
dynamic-filter pushdown enabled (the default) now shows a `DynamicFilter` 
pushed to the scans it feeds (as a `FilterExec` for sources that cannot absorb 
it, or as a scan predicate for those that can). Like `HashJoinExec`, the filter 
is not rendered on the join node itself.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to