viirya commented on code in PR #21448:
URL: https://github.com/apache/datafusion/pull/21448#discussion_r3068909276


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1153,31 +1231,318 @@ impl NestedLoopJoinStream {
             left_buffered_in_one_pass: true,
             handled_empty_output: false,
             should_track_unmatched_right: 
need_produce_right_in_final(join_type),
+            // Memory-limited fields (inactive until OOM fallback)
+            left_stream: None,
+            left_reservation: None,
+            left_stashed_batch: None,
+            left_pending_batches: Vec::new(),
+            left_schema: None,
+            spill_manager: None,
+            right_spill_in_progress: None,
+            right_spill_file: None,
+            right_max_batch_memory: 0,
+            is_first_right_pass: true,
+            // Fallback context
+            left_plan,
+            task_context,
         }
     }
 
+    /// Returns true if this stream is operating in memory-limited mode
+    fn is_memory_limited(&self) -> bool {
+        self.left_stream.is_some() || self.left_reservation.is_some()
+    }
+
+    /// Check if we can fall back to memory-limited mode on this error.
+    fn can_fallback_to_spill(&self, error: 
&datafusion_common::DataFusionError) -> bool {
+        self.left_plan.is_some()
+            && self.task_context.is_some()
+            && !self.is_memory_limited() // avoid infinite loop
+            && matches!(
+                error.find_root(),
+                datafusion_common::DataFusionError::ResourcesExhausted(_)
+            )
+    }
+
+    /// Switch from the standard OnceFut path to memory-limited mode.
+    ///
+    /// Re-executes the left child to get a fresh stream, creates a
+    /// SpillManager for right-side spilling, and sets up all the
+    /// memory-limited fields. The next call to `handle_buffering_left`
+    /// will dispatch to `handle_buffering_left_memory_limited`.
+    fn initiate_fallback(&mut self) -> Result<()> {
+        let left_plan = self
+            .left_plan
+            .as_ref()
+            .expect("left_plan must be set for fallback");
+        let context = self
+            .task_context
+            .as_ref()
+            .expect("task_context must be set for fallback");
+
+        // Re-execute left child to get a fresh stream
+        let left_stream = left_plan.execute(0, Arc::clone(context))?;
+        let left_schema = left_stream.schema();
+
+        // Create reservation with can_spill for fair memory allocation
+        let reservation = 
MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string())
+            .with_can_spill(true)
+            .register(context.memory_pool());
+
+        // Create SpillManager for right-side spilling
+        let right_schema = self.right_data.schema();
+        let spill_manager = SpillManager::new(
+            context.runtime_env(),
+            self.metrics.spill_metrics.clone(),
+            right_schema,
+        )
+        .with_compression_type(context.session_config().spill_compression());
+
+        // Populate memory-limited fields
+        self.left_stream = Some(left_stream);
+        self.left_schema = Some(left_schema);
+        self.left_reservation = Some(reservation);
+        self.left_stashed_batch = None;
+        self.left_pending_batches = Vec::new();
+        self.spill_manager = Some(spill_manager);
+        self.right_spill_in_progress = None;
+        self.right_spill_file = None;
+        self.right_max_batch_memory = 0;
+        self.is_first_right_pass = true;
+
+        // State stays BufferingLeft — next poll will enter
+        // handle_buffering_left_memory_limited via is_memory_limited() check
+        self.state = NLJState::BufferingLeft;
+
+        Ok(())
+    }
+
     // ==== State handler functions ====
 
-    /// Handle BufferingLeft state - prepare left side batches
+    /// Handle BufferingLeft state - prepare left side batches.
+    ///
+    /// In standard mode, uses OnceFut to load all left data at once.
+    /// In memory-limited mode, incrementally buffers left batches until the
+    /// memory budget is reached or the left stream is exhausted.
     fn handle_buffering_left(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
-        match self.left_data.get_shared(cx) {
-            Poll::Ready(Ok(left_data)) => {
-                self.buffered_left_data = Some(left_data);
-                // TODO: implement memory-limited case
-                self.left_exhausted = true;
-                self.state = NLJState::FetchingRight;
-                // Continue to next state immediately
-                ControlFlow::Continue(())
+        if self.is_memory_limited() {
+            self.handle_buffering_left_memory_limited(cx)
+        } else {
+            // Standard path: use OnceFut
+            match self.left_data.get_shared(cx) {
+                Poll::Ready(Ok(left_data)) => {
+                    self.buffered_left_data = Some(left_data);
+                    self.left_exhausted = true;
+                    self.state = NLJState::FetchingRight;
+                    ControlFlow::Continue(())
+                }
+                Poll::Ready(Err(e)) => {
+                    if self.can_fallback_to_spill(&e) {
+                        debug!(
+                            "NestedLoopJoin: OnceFut failed with OOM, \
+                             falling back to memory-limited mode"
+                        );
+                        match self.initiate_fallback() {
+                            Ok(()) => ControlFlow::Continue(()),
+                            Err(fallback_err) => {
+                                
ControlFlow::Break(Poll::Ready(Some(Err(fallback_err))))
+                            }
+                        }
+                    } else {
+                        ControlFlow::Break(Poll::Ready(Some(Err(e))))
+                    }
+                }
+                Poll::Pending => ControlFlow::Break(Poll::Pending),
             }
-            Poll::Ready(Err(e)) => 
ControlFlow::Break(Poll::Ready(Some(Err(e)))),
-            Poll::Pending => ControlFlow::Break(Poll::Pending),
         }
     }
 
-    /// Handle FetchingRight state - fetch next right batch and prepare for 
processing
+    /// Memory-limited path for handle_buffering_left.
+    ///
+    /// Incrementally polls the left stream and accumulates batches until:
+    /// - Memory reservation fails (chunk is full, more data remains)
+    /// - Left stream is exhausted (this is the last/only chunk)
+    fn handle_buffering_left_memory_limited(

Review Comment:
   This is a valid concern. Currently each partition independently re-executes 
the left child on OOM fallback. This is because the original left stream is 
consumed by the failed collect_left_input attempt and cannot be reused, so each 
partition must re-execute the left child to get a fresh stream.
   
   A possible improvement: have the first partition that hits OOM spill the 
left side to disk, then other partitions read from that spill file instead of 
re-executing the left child. This would require some cross-partition 
coordination (e.g., an OnceAsync-like mechanism for the spilled left data).
   
   That said, this only happens on the OOM fallback path, and the cost is 
executing the left child plan N times (where N = right partition count) instead 
of once. I think this is acceptable for an initial implementation since the 
alternative before this PR is a hard OOM failure. Would you prefer I add the 
shared-spill optimization in this PR or as a follow-up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to