2010YOUY01 commented on code in PR #21983:
URL: https://github.com/apache/datafusion/pull/21983#discussion_r3176231316
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1795,278 +1840,81 @@ impl NestedLoopJoinStream {
// ==== Core logic handling for each state ====
- /// Returns bool to indicate should it continue probing
+ /// Enters in [`NLJState::ProbeRight`] state. Each entrance either updates
the
+ /// flags and moves to next state, or handles one candidate chunk from the
+ /// current right batch. Candidate iteration over the batch is managed by
the
+ /// [`JoinAccelerator`].
+ ///
+ /// # Return
/// true -> continue in the same ProbeRight state
- /// false -> It has done with the (buffered_left x cur_right_batch), go to
- /// next state (ProbeRight)
+ /// false -> the current right batch is done; go to the next state
fn process_probe_batch(&mut self) -> Result<bool> {
let left_data = Arc::clone(self.get_left_data()?);
+ if self.current_right_batch_prober.is_none() {
+ let right_batch = self.current_right_batch.take().ok_or_else(|| {
+ internal_datafusion_err!("Right batch should be available")
+ })?;
+ let (right_batch, prober) = left_data
+ .accelerator()
+ .init_prober(right_batch, self.batch_size)?;
+ self.current_right_batch = Some(right_batch);
+ self.current_right_batch_prober = Some(prober);
+ }
+
+ // Get left/probe side candidate pairs and then further filter them
with
+ // the remaining join condition.
+ let next_candidates = {
+ let prober =
self.current_right_batch_prober.as_mut().ok_or_else(|| {
+ internal_datafusion_err!("Right-batch prober should be
initialized")
+ })?;
+ prober.probe()?
+ };
+
+ // If candidate iteration is exhausted, move to the next right batch.
+ let Some(candidates) = next_candidates else {
+ self.current_right_batch_prober = None;
+ return Ok(false);
+ };
+
let right_batch = self
.current_right_batch
.as_ref()
.ok_or_else(|| internal_datafusion_err!("Right batch should be
available"))?
.clone();
- // stop probing, the caller will go to the next state
- if self.left_probe_idx >= left_data.batch().num_rows() {
- return Ok(false);
- }
-
- // ========
- // Join (l_row x right_batch)
- // and push the result into output_buffer
- // ========
-
- // Special case:
- // When the right batch is very small, join with multiple left rows at
once,
- //
- // The regular implementation is not efficient if the plan's right
child is
- // very small (e.g. 1 row total), because inside the inner loop of
NLJ, it's
- // handling one input right batch at once, if it's not large enough,
the
- // overheads like filter evaluation can't be amortized through
vectorization.
- debug_assert_ne!(
- right_batch.num_rows(),
- 0,
- "When fetching the right batch, empty batches will be skipped"
- );
-
- let l_row_cnt_ratio = self.batch_size / right_batch.num_rows();
- if l_row_cnt_ratio > 10 {
- // Calculate max left rows to handle at once. This operator tries
to handle
- // up to `datafusion.execution.batch_size` rows at once in the
intermediate
- // batch.
- let l_row_count = std::cmp::min(
- l_row_cnt_ratio,
- left_data.batch().num_rows() - self.left_probe_idx,
- );
-
- debug_assert!(
- l_row_count != 0,
- "This function should only be entered when there are remaining
left rows to process"
- );
- let joined_batch = self.process_left_range_join(
- &left_data,
- &right_batch,
- self.left_probe_idx,
- l_row_count,
- )?;
-
- if let Some(batch) = joined_batch {
- self.output_buffer.push_batch(batch)?;
- }
-
- self.left_probe_idx += l_row_count;
-
- return Ok(true);
- }
-
- let l_idx = self.left_probe_idx;
let joined_batch =
- self.process_single_left_row_join(&left_data, &right_batch,
l_idx)?;
+ self.process_probe_candidates(&left_data, &right_batch,
candidates)?;
if let Some(batch) = joined_batch {
self.output_buffer.push_batch(batch)?;
}
- // ==== Prepare for the next iteration ====
-
- // Advance left cursor
- self.left_probe_idx += 1;
-
- // Return true to continue probing
Ok(true)
}
- /// Process [l_start_index, l_start_index + l_count) JOIN right_batch
- /// Returns a RecordBatch containing the join results (None if empty)
+ /// Process one batch of join candidates into final join output.
///
- /// Side Effect: If the join type requires, left or right side matched
bitmap
- /// will be set for matched indices.
- fn process_left_range_join(
+ /// This materializes the candidate pairs, evaluates any remaining join
+ /// filter, updates match bitmaps for joins that need unmatched rows, and
+ /// produces the final joined batch.
+ fn process_probe_candidates(
Review Comment:
Here is a major place that is needed to update when the first indexed join
implementation is added.
Now the `BuildRow` variant is a specialized implementation that is the most
efficient for NLJ cartesian join fallback, for indexed join it needs a more
general representation like `(build_index, probe_index)`.
##########
datafusion/physical-plan/src/joins/join_accelerator.rs:
##########
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Join accelerator interfaces used by [`crate::joins::NestedLoopJoinExec`],
see
+//! comments in [`JoinAccelerator`] for details.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::JoinSide;
+use datafusion_expr::JoinType;
+
+use super::join_filter::JoinFilter;
+use datafusion_common::{Result, internal_datafusion_err, internal_err};
+
+/// Shared reference to a selected join accelerator.
+pub(crate) type JoinAcceleratorRef = Arc<dyn JoinAccelerator>;
+
+/// Planning-time specification used to select a join accelerator.
+#[derive(Debug, Clone)]
+pub(crate) struct JoinAcceleratorSpec {
+ #[expect(dead_code)]
+ // Kept for accelerator selection once non-fallback implementations are
enabled.
+ join_type: JoinType,
+ build_side: JoinSide,
+ left_schema: SchemaRef,
+ right_schema: SchemaRef,
+ #[expect(dead_code)]
+ // Kept for accelerator selection once non-fallback implementations are
enabled.
+ filter: Option<JoinFilter>,
+}
+
+impl JoinAcceleratorSpec {
+ pub(crate) fn new(
+ join_type: JoinType,
+ build_side: JoinSide,
+ left_schema: SchemaRef,
+ right_schema: SchemaRef,
+ filter: Option<JoinFilter>,
+ ) -> Self {
+ Self {
+ join_type,
+ build_side,
+ left_schema,
+ right_schema,
+ filter,
+ }
+ }
+
+ pub(crate) fn build_schema(&self) -> &SchemaRef {
+ match self.build_side {
+ JoinSide::Left => &self.left_schema,
+ JoinSide::Right => &self.right_schema,
+ JoinSide::None => unreachable!("Join accelerator build side cannot
be None"),
+ }
+ }
+}
+
+/// Selects a planning-time join accelerator.
+#[derive(Debug, Default)]
+pub(crate) struct JoinAcceleratorBuilder;
+
+impl JoinAcceleratorBuilder {
+ /// Select the accelerator for a nested loop join.
+ ///
+ /// This always succeeds because NLJ has a naive cartesian fallback.
+ pub(crate) fn try_new(spec: JoinAcceleratorSpec) ->
Result<JoinAcceleratorRef> {
+ Ok(Arc::new(FallbackNestedLoopJoinAccelerator::new(spec)))
+ }
+}
+
+/// Nested loop join accelerator selected at planning time.
+///
+/// The accelerator is used by joins that first buffer one input, then probe
the
+/// other input one batch at a time. It provides two extension points for
reducing
+/// probe work:
+///
+/// - Runtime indexing on the build side. After all build batches are buffered,
+/// the accelerator can create an index-like representation that finds the
+/// candidate build rows for each probe row more cheaply than a cartesian
scan.
+/// - Dynamic filtering on the probe side. Build-side statistics can produce a
+/// probe-side filter that eliminates rows which cannot match any build row.
+///
+/// Accelerators should only handle an accelerated predicate: a conjunct from a
+/// conjunction join condition (e.g. `cond1` from `cond1 AND cond2 AND cond3`),
+/// and:
+///
+/// - It should emit candidate pairs efficiently, using the accelerated
predicate
+/// to avoid unnecessary cartesian comparisons, for example by building an
index
+/// on the build side before probing.
+/// - It must not discard any pair that could satisfy the full join filter. The
+/// accelerator may emit a superset of the final matches.
+///
+/// Residual join filter evaluation and outer, semi, anti, and mark join match
+/// tracking are handled by the outer nested-loop join driver. See the examples
+/// below for term definitions.
+///
+/// # Example: PiecewiseMergeJoin
+///
+/// For workload
+///
+/// ```sql
+/// select *
+/// from generate_series(1000) as t1(v1)
+/// join generate_series(1000000) as t2(v1)
+/// on (t1.v1 > t2.v1) AND ((t1.v1 + t2.v1) % 2 = 0)
+/// ```
+///
+/// `(t1.v1 > t2.v1)` is the accelerated predicate because it can be evaluated
+/// efficiently with a specific algorithm; see below for details.
+/// `((t1.v1 + t2.v1) % 2 = 0)` is the residual predicate and is evaluated by
+/// the outer nested-loop join driver after candidate pairs are produced.
+///
+/// ## (TODO) Dynamic filter
+///
+/// After buffering `t1` as the build side, the accelerator knows
+/// `max(t1.v1) = 1000` for `generate_series(1000)`. For an inner join, probe
+/// rows with `t2.v1 >= 1000` cannot satisfy `t1.v1 > t2.v1` and can be
filtered
+/// out.
+/// This reduces the candidate search space from roughly `1K x 1M` pairs to
+/// roughly `1K x 1K` pairs before residual filter evaluation.
+///
+/// ## Runtime index
+///
+/// The accelerator can also sort the buffered `t1` rows by `v1`. For each
+/// incoming `t2` batch, it sorts the probe rows by `v1` and scans the two
sorted
+/// runs once. For a probe row where `t2.v1 = 10`, the matching build rows are
the
+/// suffix of sorted `t1` rows with `t1.v1 > 10`; for a later probe row where
+/// `t2.v1 = 20`, the scan cursor only moves forward. This avoids checking
every
+/// build row for every probe row while still producing all pairs that satisfy
the
+/// accelerated predicate. The residual predicate is then applied to those
pairs
+/// by the join driver.
+/// This step further reduces the operation count from `1K x 1K` to
+/// `log(1K) + 1K + 1K` (sort and linear scans).
+///
+/// # Control flow
+/// The following pseudo-code demonstrates the high-level join control flow and
+/// how functions in this trait are used.
+///
+/// ```text
+/// for build_batch in build_input {
+/// accelerator.add_build_batch(build_batch)?;
+/// }
+/// accelerator.finish()?;
+///
+/// for probe_batch in probe_input {
+/// let (probe_batch, mut prober) =
+/// accelerator.init_prober(probe_batch, batch_size)?;
+///
+/// while let Some(candidates) = prober.probe()? {
+/// // The join driver consumes candidates with the prepared
probe_batch,
+/// // applies the residual filter, and records join matches.
+/// }
+/// }
+/// ```
+///
+/// # Implementation Plan
+/// This trait is intended to become public. For now, keep it private while
+/// internal experiments stabilize the API.
+pub(crate) trait JoinAccelerator: Debug + Send + Sync {
+ #[cfg_attr(not(test), expect(dead_code))]
+ // Will be used in explain output when accelerator selection is visible.
+ fn name(&self) -> &'static str;
+
+ /// Return `true` only if this accelerator supports the nested-loop join
+ /// spilling execution path. See [`super::NestedLoopJoinExec`] for details.
+ fn support_spilling(&self) -> bool {
+ false
+ }
+
+ /// Create a fresh mutable accelerator for one execution.
+ fn clone_accelerator(&self) -> Box<dyn JoinAccelerator>;
+
+ /// Add one build-side input batch to the buffer, and optionally build a
+ /// runtime index.
+ fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>;
+
+ /// Signal the end of build-side input and prepare for probing.
+ fn finish(&mut self) -> Result<()>;
+
+ /// Return the concatenated build-side batch from all batches added with
+ /// [`Self::add_build_batch`].
+ fn build_batch(&self) -> &RecordBatch;
+
+ /// Return the number of rows in the accumulated build side.
+ fn num_build_rows(&self) -> usize;
+
+ /// Initialize probing for one probe-side batch.
+ ///
+ /// Returns the prepared probe batch and a prober that iterates over
+ /// candidate pairs satisfying the accelerated predicate.
+ ///
+ /// - Prepared probe batch: implementations may preprocess the probe batch;
+ /// the outer join loop manages this returned batch while consuming the
+ /// prober.
+ /// - Output shape: an iterator over candidate pairs from
+ /// `ALL_BUILD_BATCHES x CURRENT_PROBE_BATCH` that satisfy the
accelerated
+ /// predicate.
+ ///
+ /// # Default implementation
+ ///
+ /// The default implementation is the nested-loop fallback: it returns the
+ /// probe batch unchanged and emits one buffered build-side row at a time.
+ /// The caller joins that row against the current probe batch.
+ fn init_prober(
Review Comment:
This function creates an iterator on candidates for `ALL_BUILD_BATCHES x
CURRENT_PROBE_BATCH`
An easier alternative can be `ALL_BUILD_BATCHES x CURRENT_PROBE_ROW` for
common indexed joins, however I think processing a probe batch each step is
more flexible. For example, probe side may need certain batched pre-processing
on the probe batch. So I prefer this design for now.
##########
datafusion/physical-plan/src/joins/join_accelerator.rs:
##########
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Join accelerator interfaces used by [`crate::joins::NestedLoopJoinExec`],
see
+//! comments in [`JoinAccelerator`] for details.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::JoinSide;
+use datafusion_expr::JoinType;
+
+use super::join_filter::JoinFilter;
+use datafusion_common::{Result, internal_datafusion_err, internal_err};
+
+/// Shared reference to a selected join accelerator.
+pub(crate) type JoinAcceleratorRef = Arc<dyn JoinAccelerator>;
+
+/// Planning-time specification used to select a join accelerator.
+#[derive(Debug, Clone)]
+pub(crate) struct JoinAcceleratorSpec {
+ #[expect(dead_code)]
+ // Kept for accelerator selection once non-fallback implementations are
enabled.
+ join_type: JoinType,
+ build_side: JoinSide,
+ left_schema: SchemaRef,
+ right_schema: SchemaRef,
+ #[expect(dead_code)]
+ // Kept for accelerator selection once non-fallback implementations are
enabled.
+ filter: Option<JoinFilter>,
+}
+
+impl JoinAcceleratorSpec {
+ pub(crate) fn new(
+ join_type: JoinType,
+ build_side: JoinSide,
+ left_schema: SchemaRef,
+ right_schema: SchemaRef,
+ filter: Option<JoinFilter>,
+ ) -> Self {
+ Self {
+ join_type,
+ build_side,
+ left_schema,
+ right_schema,
+ filter,
+ }
+ }
+
+ pub(crate) fn build_schema(&self) -> &SchemaRef {
+ match self.build_side {
+ JoinSide::Left => &self.left_schema,
+ JoinSide::Right => &self.right_schema,
+ JoinSide::None => unreachable!("Join accelerator build side cannot
be None"),
+ }
+ }
+}
+
+/// Selects a planning-time join accelerator.
+#[derive(Debug, Default)]
+pub(crate) struct JoinAcceleratorBuilder;
+
+impl JoinAcceleratorBuilder {
+ /// Select the accelerator for a nested loop join.
+ ///
+ /// This always succeeds because NLJ has a naive cartesian fallback.
+ pub(crate) fn try_new(spec: JoinAcceleratorSpec) ->
Result<JoinAcceleratorRef> {
+ Ok(Arc::new(FallbackNestedLoopJoinAccelerator::new(spec)))
+ }
+}
+
+/// Nested loop join accelerator selected at planning time.
+///
+/// The accelerator is used by joins that first buffer one input, then probe
the
+/// other input one batch at a time. It provides two extension points for
reducing
+/// probe work:
+///
+/// - Runtime indexing on the build side. After all build batches are buffered,
+/// the accelerator can create an index-like representation that finds the
+/// candidate build rows for each probe row more cheaply than a cartesian
scan.
+/// - Dynamic filtering on the probe side. Build-side statistics can produce a
+/// probe-side filter that eliminates rows which cannot match any build row.
+///
+/// Accelerators should only handle an accelerated predicate: a conjunct from a
+/// conjunction join condition (e.g. `cond1` from `cond1 AND cond2 AND cond3`),
+/// and:
+///
+/// - It should emit candidate pairs efficiently, using the accelerated
predicate
+/// to avoid unnecessary cartesian comparisons, for example by building an
index
+/// on the build side before probing.
+/// - It must not discard any pair that could satisfy the full join filter. The
+/// accelerator may emit a superset of the final matches.
+///
+/// Residual join filter evaluation and outer, semi, anti, and mark join match
+/// tracking are handled by the outer nested-loop join driver. See the examples
+/// below for term definitions.
+///
+/// # Example: PiecewiseMergeJoin
+///
+/// For workload
+///
+/// ```sql
+/// select *
+/// from generate_series(1000) as t1(v1)
+/// join generate_series(1000000) as t2(v1)
+/// on (t1.v1 > t2.v1) AND ((t1.v1 + t2.v1) % 2 = 0)
+/// ```
+///
+/// `(t1.v1 > t2.v1)` is the accelerated predicate because it can be evaluated
+/// efficiently with a specific algorithm; see below for details.
+/// `((t1.v1 + t2.v1) % 2 = 0)` is the residual predicate and is evaluated by
+/// the outer nested-loop join driver after candidate pairs are produced.
+///
+/// ## (TODO) Dynamic filter
+///
+/// After buffering `t1` as the build side, the accelerator knows
+/// `max(t1.v1) = 1000` for `generate_series(1000)`. For an inner join, probe
+/// rows with `t2.v1 >= 1000` cannot satisfy `t1.v1 > t2.v1` and can be
filtered
+/// out.
+/// This reduces the candidate search space from roughly `1K x 1M` pairs to
+/// roughly `1K x 1K` pairs before residual filter evaluation.
+///
+/// ## Runtime index
+///
+/// The accelerator can also sort the buffered `t1` rows by `v1`. For each
+/// incoming `t2` batch, it sorts the probe rows by `v1` and scans the two
sorted
+/// runs once. For a probe row where `t2.v1 = 10`, the matching build rows are
the
+/// suffix of sorted `t1` rows with `t1.v1 > 10`; for a later probe row where
+/// `t2.v1 = 20`, the scan cursor only moves forward. This avoids checking
every
+/// build row for every probe row while still producing all pairs that satisfy
the
+/// accelerated predicate. The residual predicate is then applied to those
pairs
+/// by the join driver.
+/// This step further reduces the operation count from `1K x 1K` to
+/// `log(1K) + 1K + 1K` (sort and linear scans).
+///
+/// # Control flow
+/// The following pseudo-code demonstrates the high-level join control flow and
+/// how functions in this trait are used.
+///
+/// ```text
+/// for build_batch in build_input {
+/// accelerator.add_build_batch(build_batch)?;
+/// }
+/// accelerator.finish()?;
+///
+/// for probe_batch in probe_input {
+/// let (probe_batch, mut prober) =
+/// accelerator.init_prober(probe_batch, batch_size)?;
+///
+/// while let Some(candidates) = prober.probe()? {
+/// // The join driver consumes candidates with the prepared
probe_batch,
+/// // applies the residual filter, and records join matches.
+/// }
+/// }
+/// ```
+///
+/// # Implementation Plan
+/// This trait is intended to become public. For now, keep it private while
+/// internal experiments stabilize the API.
+pub(crate) trait JoinAccelerator: Debug + Send + Sync {
+ #[cfg_attr(not(test), expect(dead_code))]
+ // Will be used in explain output when accelerator selection is visible.
+ fn name(&self) -> &'static str;
+
+ /// Return `true` only if this accelerator supports the nested-loop join
+ /// spilling execution path. See [`super::NestedLoopJoinExec`] for details.
+ fn support_spilling(&self) -> bool {
+ false
+ }
+
+ /// Create a fresh mutable accelerator for one execution.
+ fn clone_accelerator(&self) -> Box<dyn JoinAccelerator>;
+
+ /// Add one build-side input batch to the buffer, and optionally build a
+ /// runtime index.
+ fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>;
Review Comment:
To extend dynamic filter: a common pattern for it is calculating bounds from
the build side, and build an expression to filter the probe side.
The bound calculation step can be included inside this API naturally, and we
only have to extend an API for returning the dynamic filter, in the follow-up
work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]