2010YOUY01 opened a new issue, #21921: URL: https://github.com/apache/datafusion/issues/21921
### Is your feature request related to a problem or challenge? ## Motivation There has been increasing interest in specialized join algorithms for different workloads. One example is range join support: [https://github.com/apache/datafusion/issues/318](https://github.com/apache/datafusion/issues/318). Join is one of the most expensive operators in OLAP workloads. When DataFusion cannot use an equi-join implementation, the fallback is often a nested loop join, which may need to consider `|build_side| * |probe_side|` row pairs. For queries with multiple joins, this cost can compound quickly as intermediate cardinalities grow. One way to support these workloads is to add more specialized join executors, but this is harder than it first appears. For functional requirements, a join executor has to support semi, anti, mark, and other join types correctly. For non-functional requirements, a join executor also has to handle internal buffering and incremental output. For example, it should buffer enough data to preserve vectorized execution, while avoiding materializing too many intermediate results in memory. This proposal introduces a simpler extension API for specialized join implementations. Many specialized joins follow a common build-probe pattern, and can reduce candidate pairs with runtime indexes or runtime filters. We can reuse much of the existing NLJ code for common logic that addresses the challenges above, while each specialized implementation only provides the acceleration logic: how to build an index or summary from the build side, and how to probe more efficiently using the runtime index. ## Proposed abstraction For joins that fit the build-then-probe pattern, the execution flow can be abstracted as follows: ```rust for build_batch in build_input { accelerator.add_build_batch(build_batch)?; } accelerator.finish()?; for probe_batch in probe_input { let (probe_batch, mut prober) = accelerator.init_prober(probe_batch, batch_size)?; while let Some(candidates) = prober.probe()? { // The common join driver consumes candidate row pairs, // evaluates residual predicates, tracks join state, and builds output. } } ``` This abstraction provides two main optimization opportunities: * **Runtime index on the build side.** After all build batches are buffered, the accelerator can create an index-like representation that finds candidate build rows for each probe row more cheaply than a Cartesian scan. * **Dynamic filter on the probe side.** Build-side statistics can sometimes prove that a probe row cannot match any build row, allowing the executor to skip that probe row before candidate generation. This is especially useful for inner and semi joins. For outer, anti, and mark joins, the common driver must preserve the correct unmatched-row semantics. ## Walkthrough: piecewise merge join Consider this query: ```sql SELECT * FROM generate_series(1000) AS t1(v1) JOIN generate_series(1000000) AS t2(v1) ON (t1.v1 > t2.v1) AND ((t1.v1 + t2.v1) % 2 = 0); ``` Here, `t1.v1 > t2.v1` is the accelerated predicate. It has structure that a specialized algorithm can exploit. The second predicate, `((t1.v1 + t2.v1) % 2 = 0)`, is the residual predicate. The accelerator does not need to understand it; the common join driver can evaluate it after candidate pairs are produced. ### Dynamic filter After buffering `t1` as the build side, the accelerator can compute `max(t1.v1)`. For an inner join, any probe row with `t2.v1 >= max(t1.v1)` cannot satisfy `t1.v1 > t2.v1`, so those probe rows can be filtered out before candidate generation. In this example, the dynamic filter reduces the probe side from roughly `1M` rows to roughly `1K` rows. The candidate search space therefore goes from roughly `1K x 1M` to roughly `1K x 1K` before residual predicate evaluation. ### Runtime index The accelerator can further sort the buffered `t1` rows by `v1`. For each incoming `t2` batch, it can sort the remaining probe rows by `v1` and scan the two sorted runs. For a probe row where `t2.v1 = 10`, the matching build rows are the suffix of sorted `t1` rows with `t1.v1 > 10`. For a later probe row where `t2.v1 = 20`, the scan cursor only moves forward. This avoids repeatedly checking every build row for every probe row. Ignoring the cost of emitting the final matching pairs, this further reduces the search work from roughly `1K x 1K` to `sort(1K) + 1K + 1K` (sort and linear scan) ## Other possible accelerators This framework could support several specialized join families: * Range joins. * ASOF joins. * Spatial joins, where the build side can be indexed with an R-tree or a similar structure, and build-side bounding boxes can also derive probe-side pruning filters. * Array/list joins, such as joins based on `array_contains`, `array_intersect`, or tag membership, where the build side can be represented as an inverted index. * Full-text-style joins and semantic joins ## Limitations Not every specialized join fits perfectly into this build-buffer/probe framework. For example, an optimal ASOF join can often stream both sides in sorted order without fully buffering one side. That design may be better implemented as a dedicated physical operator. The goal of this proposal is to provide a simple but useful abstraction for specialized joins, and hopefully inspire a broader class of practical optimizations. If a particular workload later needs a deeper optimization, it can still be implemented as a dedicated executor. ## Proposed implementation plan 1. Introduce an API for custom join indexes and dynamic filters: ```rust pub trait JoinAccelerator: Debug + Send + Sync { /// Add one build-side input batch to the accelerator. /// /// Implementations may buffer the batch, update build-side statistics, /// or incrementally build a runtime index. fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>; /// Signal the end of build-side input and prepare for probing. /// /// Implementations can finalize indexes, summaries, or dynamic filters here. fn finish(&mut self) -> Result<()>; /// Initialize probing for one probe-side batch. /// /// Returns the prepared probe batch and a prober that iterates over /// candidate pairs satisfying the accelerated predicate. /// /// - Prepared probe batch: implementations may preprocess the probe batch. /// The outer join loop owns this returned batch while consuming the prober. /// - Output shape: an iterator over candidate pairs from /// `ALL_BUILD_BATCHES x CURRENT_PROBE_BATCH` that satisfy the accelerated /// predicate. /// /// # Default implementation /// /// The default implementation is the nested-loop fallback. It returns the /// probe batch unchanged and emits every build-side row as a candidate for /// each probe row. Implementations that only provide dynamic filtering can /// reuse this default. fn init_prober( &self, probe_batch: RecordBatch, batch_size: usize, ) -> Result<(RecordBatch, Box<dyn JoinAcceleratorProber>)> { let prober = Box::new(FallbackNestedLoopJoinProber { batch_size: batch_size.max(1), build_batch_size: self.num_build_rows(), probe_batch_size: probe_batch.num_rows(), cur_probe_offset: 0, cur_build_offset: 0, }); Ok((probe_batch, prober)) } // ... } ``` 2. Refactor nested loop join to use the new `JoinAccelerator` trait. 3. Port piecewise merge join to the new trait. 4. Explore additional specialized join accelerators. ### Describe the solution you'd like _No response_ ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
