2010YOUY01 opened a new issue, #21921:
URL: https://github.com/apache/datafusion/issues/21921

   ### Is your feature request related to a problem or challenge?
   
   ## Motivation
   
   There has been increasing interest in specialized join algorithms for 
different workloads. One example is range join support: 
[https://github.com/apache/datafusion/issues/318](https://github.com/apache/datafusion/issues/318).
   
   Join is one of the most expensive operators in OLAP workloads. When 
DataFusion cannot use an equi-join implementation, the fallback is often a 
nested loop join, which may need to consider `|build_side| * |probe_side|` row 
pairs. For queries with multiple joins, this cost can compound quickly as 
intermediate cardinalities grow.
   
   One way to support these workloads is to add more specialized join 
executors, but this is harder than it first appears.
   
   For functional requirements, a join executor has to support semi, anti, 
mark, and other join types correctly.
   
   For non-functional requirements, a join executor also has to handle internal 
buffering and incremental output. For example, it should buffer enough data to 
preserve vectorized execution, while avoiding materializing too many 
intermediate results in memory.
   
   This proposal introduces a simpler extension API for specialized join 
implementations. Many specialized joins follow a common build-probe pattern, 
and can reduce candidate pairs with runtime indexes or runtime filters. We can 
reuse much of the existing NLJ code for common logic that addresses the 
challenges above, while each specialized implementation only provides the 
acceleration logic: how to build an index or summary from the build side, and 
how to probe more efficiently using the runtime index.
   
   ## Proposed abstraction
   
   For joins that fit the build-then-probe pattern, the execution flow can be 
abstracted as follows:
   
   ```rust
   for build_batch in build_input {
       accelerator.add_build_batch(build_batch)?;
   }
   accelerator.finish()?;
   
   for probe_batch in probe_input {
       let (probe_batch, mut prober) =
           accelerator.init_prober(probe_batch, batch_size)?;
   
       while let Some(candidates) = prober.probe()? {
           // The common join driver consumes candidate row pairs,
           // evaluates residual predicates, tracks join state, and builds 
output.
       }
   }
   ```
   
   This abstraction provides two main optimization opportunities:
   
   * **Runtime index on the build side.** After all build batches are buffered, 
the accelerator can create an index-like representation that finds candidate 
build rows for each probe row more cheaply than a Cartesian scan.
   * **Dynamic filter on the probe side.** Build-side statistics can sometimes 
prove that a probe row cannot match any build row, allowing the executor to 
skip that probe row before candidate generation. This is especially useful for 
inner and semi joins. For outer, anti, and mark joins, the common driver must 
preserve the correct unmatched-row semantics.
   
   ## Walkthrough: piecewise merge join
   
   Consider this query:
   
   ```sql
   SELECT *
   FROM generate_series(1000) AS t1(v1)
   JOIN generate_series(1000000) AS t2(v1)
   ON (t1.v1 > t2.v1)
      AND ((t1.v1 + t2.v1) % 2 = 0);
   ```
   
   Here, `t1.v1 > t2.v1` is the accelerated predicate. It has structure that a 
specialized algorithm can exploit. The second predicate, `((t1.v1 + t2.v1) % 2 
= 0)`, is the residual predicate. The accelerator does not need to understand 
it; the common join driver can evaluate it after candidate pairs are produced.
   
   ### Dynamic filter
   
   After buffering `t1` as the build side, the accelerator can compute 
`max(t1.v1)`. For an inner join, any probe row with `t2.v1 >= max(t1.v1)` 
cannot satisfy `t1.v1 > t2.v1`, so those probe rows can be filtered out before 
candidate generation.
   
   In this example, the dynamic filter reduces the probe side from roughly `1M` 
rows to roughly `1K` rows. The candidate search space therefore goes from 
roughly `1K x 1M` to roughly `1K x 1K` before residual predicate evaluation.
   
   ### Runtime index
   
   The accelerator can further sort the buffered `t1` rows by `v1`. For each 
incoming `t2` batch, it can sort the remaining probe rows by `v1` and scan the 
two sorted runs.
   
   For a probe row where `t2.v1 = 10`, the matching build rows are the suffix 
of sorted `t1` rows with `t1.v1 > 10`. For a later probe row where `t2.v1 = 
20`, the scan cursor only moves forward. This avoids repeatedly checking every 
build row for every probe row.
   
   Ignoring the cost of emitting the final matching pairs, this further reduces 
the search work from roughly `1K x 1K` to `sort(1K) + 1K + 1K` (sort and linear 
scan)
   
   ## Other possible accelerators
   
   This framework could support several specialized join families:
   
   * Range joins.
   * ASOF joins.
   * Spatial joins, where the build side can be indexed with an R-tree or a 
similar structure, and build-side bounding boxes can also derive probe-side 
pruning filters.
   * Array/list joins, such as joins based on `array_contains`, 
`array_intersect`, or tag membership, where the build side can be represented 
as an inverted index.
   * Full-text-style joins and semantic joins
   
   ## Limitations
   
   Not every specialized join fits perfectly into this build-buffer/probe 
framework.
   
   For example, an optimal ASOF join can often stream both sides in sorted 
order without fully buffering one side. That design may be better implemented 
as a dedicated physical operator.
   
   The goal of this proposal is to provide a simple but useful abstraction for 
specialized joins, and hopefully inspire a broader class of practical 
optimizations. If a particular workload later needs a deeper optimization, it 
can still be implemented as a dedicated executor.
   
   ## Proposed implementation plan
   
   1. Introduce an API for custom join indexes and dynamic filters:
   
   ```rust
   pub trait JoinAccelerator: Debug + Send + Sync {
       /// Add one build-side input batch to the accelerator.
       ///
       /// Implementations may buffer the batch, update build-side statistics,
       /// or incrementally build a runtime index.
       fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>;
   
       /// Signal the end of build-side input and prepare for probing.
       ///
       /// Implementations can finalize indexes, summaries, or dynamic filters 
here.
       fn finish(&mut self) -> Result<()>;
   
       /// Initialize probing for one probe-side batch.
       ///
       /// Returns the prepared probe batch and a prober that iterates over
       /// candidate pairs satisfying the accelerated predicate.
       ///
       /// - Prepared probe batch: implementations may preprocess the probe 
batch.
       ///   The outer join loop owns this returned batch while consuming the 
prober.
       /// - Output shape: an iterator over candidate pairs from
       ///   `ALL_BUILD_BATCHES x CURRENT_PROBE_BATCH` that satisfy the 
accelerated
       ///   predicate.
       ///
       /// # Default implementation
       ///
       /// The default implementation is the nested-loop fallback. It returns 
the
       /// probe batch unchanged and emits every build-side row as a candidate 
for
       /// each probe row. Implementations that only provide dynamic filtering 
can
       /// reuse this default.
       fn init_prober(
           &self,
           probe_batch: RecordBatch,
           batch_size: usize,
       ) -> Result<(RecordBatch, Box<dyn JoinAcceleratorProber>)> {
           let prober = Box::new(FallbackNestedLoopJoinProber {
               batch_size: batch_size.max(1),
               build_batch_size: self.num_build_rows(),
               probe_batch_size: probe_batch.num_rows(),
               cur_probe_offset: 0,
               cur_build_offset: 0,
           });
   
           Ok((probe_batch, prober))
       }
   
       // ...
   }
   ```
   
   2. Refactor nested loop join to use the new `JoinAccelerator` trait.
   3. Port piecewise merge join to the new trait.
   4. Explore additional specialized join accelerators.
   
   
   ### Describe the solution you'd like
   
   _No response_
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to