viirya commented on code in PR #21448:
URL: https://github.com/apache/datafusion/pull/21448#discussion_r3068895528


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -904,13 +937,55 @@ pub(crate) struct NestedLoopJoinStream {
     // For right join, keep track of matched rows in `current_right_batch`
     // Constructed when fetching each new incoming right batch in 
`FetchingRight` state.
     current_right_batch_matched: Option<BooleanArray>,
+
+    // ========================================================================
+    // MEMORY-LIMITED EXECUTION FIELDS:
+    // Used when left-side data exceeds the memory budget. In this mode,
+    // left data is loaded in chunks, and the right side is spilled to disk
+    // so it can be re-scanned for each left chunk.
+    // ========================================================================
+    /// Left input stream for incremental buffering (memory-limited mode only).
+    /// None when using the standard OnceFut path.
+    left_stream: Option<SendableRecordBatchStream>,

Review Comment:
   Good suggestion! The current approach does use Option fields as implicit 
state flags (e.g., left_stream.is_some() to check if we're in memory-limited 
mode), which is fragile. I'll introduce a dedicated enum to make the execution 
state explicit. Will address in a follow-up or in this PR if you prefer.



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -904,13 +937,55 @@ pub(crate) struct NestedLoopJoinStream {
     // For right join, keep track of matched rows in `current_right_batch`
     // Constructed when fetching each new incoming right batch in 
`FetchingRight` state.
     current_right_batch_matched: Option<BooleanArray>,
+
+    // ========================================================================
+    // MEMORY-LIMITED EXECUTION FIELDS:
+    // Used when left-side data exceeds the memory budget. In this mode,
+    // left data is loaded in chunks, and the right side is spilled to disk
+    // so it can be re-scanned for each left chunk.
+    // ========================================================================
+    /// Left input stream for incremental buffering (memory-limited mode only).
+    /// None when using the standard OnceFut path.
+    left_stream: Option<SendableRecordBatchStream>,
+    /// Memory reservation for left-side buffering in memory-limited mode
+    left_reservation: Option<MemoryReservation>,
+    /// A batch that couldn't be added to the current chunk due to memory 
limit.
+    /// It will be the first batch in the next chunk.
+    left_stashed_batch: Option<RecordBatch>,

Review Comment:
   You're right — the stashed batch is already in memory, so there's no need 
for a separate field. I can push it directly onto left_pending_batches when the 
memory limit is hit, instead of stashing it separately and re-inserting on the 
next BufferingLeft entry. Will simplify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to