2010YOUY01 commented on code in PR #21448:
URL: https://github.com/apache/datafusion/pull/21448#discussion_r3062974726
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -904,13 +937,55 @@ pub(crate) struct NestedLoopJoinStream {
// For right join, keep track of matched rows in `current_right_batch`
// Constructed when fetching each new incoming right batch in
`FetchingRight` state.
current_right_batch_matched: Option<BooleanArray>,
+
+ // ========================================================================
+ // MEMORY-LIMITED EXECUTION FIELDS:
+ // Used when left-side data exceeds the memory budget. In this mode,
+ // left data is loaded in chunks, and the right side is spilled to disk
+ // so it can be re-scanned for each left chunk.
+ // ========================================================================
+ /// Left input stream for incremental buffering (memory-limited mode only).
+ /// None when using the standard OnceFut path.
+ left_stream: Option<SendableRecordBatchStream>,
+ /// Memory reservation for left-side buffering in memory-limited mode
+ left_reservation: Option<MemoryReservation>,
+ /// A batch that couldn't be added to the current chunk due to memory
limit.
+ /// It will be the first batch in the next chunk.
+ left_stashed_batch: Option<RecordBatch>,
Review Comment:
Also optional, I feel it's simpler to eliminate this special case, and
directly put this stashed batch to the end of `left_pending_batches`, since
it's already in memory.
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -904,13 +937,55 @@ pub(crate) struct NestedLoopJoinStream {
// For right join, keep track of matched rows in `current_right_batch`
// Constructed when fetching each new incoming right batch in
`FetchingRight` state.
current_right_batch_matched: Option<BooleanArray>,
+
+ // ========================================================================
+ // MEMORY-LIMITED EXECUTION FIELDS:
+ // Used when left-side data exceeds the memory budget. In this mode,
+ // left data is loaded in chunks, and the right side is spilled to disk
+ // so it can be re-scanned for each left chunk.
+ // ========================================================================
+ /// Left input stream for incremental buffering (memory-limited mode only).
+ /// None when using the standard OnceFut path.
+ left_stream: Option<SendableRecordBatchStream>,
Review Comment:
A minor style point, possibly for follow-up: I noticed in the multiple
places that, the memory-limited execution logic is using Option fields to
implicitly check the current fallback state.
It might be clearer to introduce a dedicated state enum, for example:
```
enum NljMemLimitedState {
Unsupported, // this join type cannot use memory-limited fallback
FirstRightPass,
SubsequentRightPass,
// ...
}
```
This would make the execution state explicit, and the Option fields could
then be used purely as payload (or for sanity checks), rather than also acting
as implicit state flags.
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -2906,4 +3340,224 @@ pub(crate) mod tests {
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
+
+ // ========================================================================
+ // Memory-limited execution tests
+ // ========================================================================
Review Comment:
I recommend to add some e2e tests: in `slt`, run NLJ queries with
`generate_series()` table source, and check spill metrics from `explain analyze`
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1153,31 +1231,318 @@ impl NestedLoopJoinStream {
left_buffered_in_one_pass: true,
handled_empty_output: false,
should_track_unmatched_right:
need_produce_right_in_final(join_type),
+ // Memory-limited fields (inactive until OOM fallback)
+ left_stream: None,
+ left_reservation: None,
+ left_stashed_batch: None,
+ left_pending_batches: Vec::new(),
+ left_schema: None,
+ spill_manager: None,
+ right_spill_in_progress: None,
+ right_spill_file: None,
+ right_max_batch_memory: 0,
+ is_first_right_pass: true,
+ // Fallback context
+ left_plan,
+ task_context,
}
}
+ /// Returns true if this stream is operating in memory-limited mode
+ fn is_memory_limited(&self) -> bool {
+ self.left_stream.is_some() || self.left_reservation.is_some()
+ }
+
+ /// Check if we can fall back to memory-limited mode on this error.
+ fn can_fallback_to_spill(&self, error:
&datafusion_common::DataFusionError) -> bool {
+ self.left_plan.is_some()
+ && self.task_context.is_some()
+ && !self.is_memory_limited() // avoid infinite loop
+ && matches!(
+ error.find_root(),
+ datafusion_common::DataFusionError::ResourcesExhausted(_)
+ )
+ }
+
+ /// Switch from the standard OnceFut path to memory-limited mode.
+ ///
+ /// Re-executes the left child to get a fresh stream, creates a
+ /// SpillManager for right-side spilling, and sets up all the
+ /// memory-limited fields. The next call to `handle_buffering_left`
+ /// will dispatch to `handle_buffering_left_memory_limited`.
+ fn initiate_fallback(&mut self) -> Result<()> {
+ let left_plan = self
+ .left_plan
+ .as_ref()
+ .expect("left_plan must be set for fallback");
+ let context = self
+ .task_context
+ .as_ref()
+ .expect("task_context must be set for fallback");
+
+ // Re-execute left child to get a fresh stream
+ let left_stream = left_plan.execute(0, Arc::clone(context))?;
+ let left_schema = left_stream.schema();
+
+ // Create reservation with can_spill for fair memory allocation
+ let reservation =
MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string())
+ .with_can_spill(true)
+ .register(context.memory_pool());
+
+ // Create SpillManager for right-side spilling
+ let right_schema = self.right_data.schema();
+ let spill_manager = SpillManager::new(
+ context.runtime_env(),
+ self.metrics.spill_metrics.clone(),
+ right_schema,
+ )
+ .with_compression_type(context.session_config().spill_compression());
+
+ // Populate memory-limited fields
+ self.left_stream = Some(left_stream);
+ self.left_schema = Some(left_schema);
+ self.left_reservation = Some(reservation);
+ self.left_stashed_batch = None;
+ self.left_pending_batches = Vec::new();
+ self.spill_manager = Some(spill_manager);
+ self.right_spill_in_progress = None;
+ self.right_spill_file = None;
+ self.right_max_batch_memory = 0;
+ self.is_first_right_pass = true;
+
+ // State stays BufferingLeft — next poll will enter
+ // handle_buffering_left_memory_limited via is_memory_limited() check
+ self.state = NLJState::BufferingLeft;
+
+ Ok(())
+ }
+
// ==== State handler functions ====
- /// Handle BufferingLeft state - prepare left side batches
+ /// Handle BufferingLeft state - prepare left side batches.
+ ///
+ /// In standard mode, uses OnceFut to load all left data at once.
+ /// In memory-limited mode, incrementally buffers left batches until the
+ /// memory budget is reached or the left stream is exhausted.
fn handle_buffering_left(
&mut self,
cx: &mut std::task::Context<'_>,
) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
- match self.left_data.get_shared(cx) {
- Poll::Ready(Ok(left_data)) => {
- self.buffered_left_data = Some(left_data);
- // TODO: implement memory-limited case
- self.left_exhausted = true;
- self.state = NLJState::FetchingRight;
- // Continue to next state immediately
- ControlFlow::Continue(())
+ if self.is_memory_limited() {
+ self.handle_buffering_left_memory_limited(cx)
+ } else {
+ // Standard path: use OnceFut
+ match self.left_data.get_shared(cx) {
+ Poll::Ready(Ok(left_data)) => {
+ self.buffered_left_data = Some(left_data);
+ self.left_exhausted = true;
+ self.state = NLJState::FetchingRight;
+ ControlFlow::Continue(())
+ }
+ Poll::Ready(Err(e)) => {
+ if self.can_fallback_to_spill(&e) {
+ debug!(
+ "NestedLoopJoin: OnceFut failed with OOM, \
+ falling back to memory-limited mode"
+ );
+ match self.initiate_fallback() {
+ Ok(()) => ControlFlow::Continue(()),
+ Err(fallback_err) => {
+
ControlFlow::Break(Poll::Ready(Some(Err(fallback_err))))
+ }
+ }
+ } else {
+ ControlFlow::Break(Poll::Ready(Some(Err(e))))
+ }
+ }
+ Poll::Pending => ControlFlow::Break(Poll::Pending),
}
- Poll::Ready(Err(e)) =>
ControlFlow::Break(Poll::Ready(Some(Err(e)))),
- Poll::Pending => ControlFlow::Break(Poll::Pending),
}
}
- /// Handle FetchingRight state - fetch next right batch and prepare for
processing
+ /// Memory-limited path for handle_buffering_left.
+ ///
+ /// Incrementally polls the left stream and accumulates batches until:
+ /// - Memory reservation fails (chunk is full, more data remains)
+ /// - Left stream is exhausted (this is the last/only chunk)
+ fn handle_buffering_left_memory_limited(
Review Comment:
I think this issue should be addressed first:
In the regular in-memory path, the left side is evaluated only once,
buffered, and shared across all partitions via `OnceFut<JoinLeftData>`.
In contrast, the memory-limited fallback re-evaluates the left side for each
partition. This can be inefficient and may even increase memory pressure.
Ideally, we would like the left side to be evaluated only once, similar to the
in-memory path. (I can imagine this is tricky due to the use of `OnceFut`...
I'm thinking if there is any easy way to do it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]