kosiew commented on code in PR #22038:
URL: https://github.com/apache/datafusion/pull/22038#discussion_r3464495703


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -908,6 +919,359 @@ pub(crate) struct LeftSpillData {
     schema: SchemaRef,
 }
 
+/// Per-chunk shared state in the memory-limited fallback path.
+///
+/// Each chunk's `JoinLeftData` is loaded once by a "leader" partition and
+/// shared (via `Arc`) with every right-side output partition. The
+/// `probe_threads_counter` inside the `JoinLeftData` is initialized to
+/// `right_partition_count`, so `report_probe_completed` returns `true`
+/// only when the *last* partition has finished probing the chunk. That
+/// last partition is then responsible for emitting unmatched left rows
+/// for the chunk, mirroring the single-pass path's coordination via
+/// `collect_left_input(..., probe_threads_count)`.
+struct CurrentChunk {
+    /// 0-based monotonically increasing chunk index.
+    chunk_index: usize,
+    /// Shared per-chunk left data. Cloned by every partition that probes
+    /// this chunk; the last to call `report_probe_completed` emits
+    /// unmatched left rows.
+    data: Arc<JoinLeftData>,
+    /// True if the left stream was exhausted while loading this chunk —
+    /// no further chunks will be produced after it.
+    is_last: bool,
+}
+
+/// Inner state of [`FallbackCoordinator`], guarded by an async mutex.
+struct FallbackCoordinatorInner {
+    /// Reservation owned by the coordinator. Holds the memory for the
+    /// currently-loaded chunk. Reset (`resize(0)`) between chunks.
+    /// Lazily registered by the first leader, after the runtime context
+    /// becomes available via `initiate_fallback`.
+    reservation: Option<MemoryReservation>,
+    /// The shared left spill stream from which chunks are read. Owned by
+    /// the coordinator so only one partition reads it at a time.
+    left_stream: Option<SendableRecordBatchStream>,
+    /// Left schema. Set after the first leader resolves the spill future.
+    left_schema: Option<SchemaRef>,
+    /// One batch carried over from the previous chunk's load: when
+    /// reservation `try_grow` failed for chunk N, the offending batch is
+    /// recorded here and becomes the first batch of chunk N+1.
+    carryover: Option<RecordBatch>,
+    /// True once the left spill stream has produced `None`.
+    left_exhausted: bool,
+    /// Index of the next chunk to be loaded.
+    next_chunk_index: usize,
+    /// The currently-loaded chunk, or `None` if no chunk is currently
+    /// loaded (initial state, or the last partition has just released
+    /// chunk `next_chunk_index - 1` and the next leader hasn't taken
+    /// over yet).
+    current: Option<CurrentChunk>,
+    /// True while a partition has claimed leader role for the next
+    /// chunk and is loading it; prevents two partitions from racing.
+    loader_in_flight: bool,
+}
+
+/// Plan-level shared coordinator for the memory-limited fallback path.
+///
+/// All right-side output partitions share one of these. It serializes
+/// access to the left spill stream (so each chunk is read exactly once),
+/// publishes the loaded chunk as an `Arc<JoinLeftData>` for every
+/// partition to clone, and uses a `Notify` so partitions waiting for the
+/// next chunk can sleep without busy-looping.
+pub(crate) struct FallbackCoordinator {
+    /// Number of right-side partitions; equals the
+    /// `probe_threads_counter` initial value for each chunk.
+    right_partition_count: usize,
+    /// Whether `JoinLeftData` should carry a left visited bitmap (for
+    /// join types that emit unmatched left rows in the final output).
+    with_visited_bitmap: bool,
+    inner: tokio::sync::Mutex<FallbackCoordinatorInner>,
+    /// Notified when a new chunk becomes available, when the left stream
+    /// is exhausted, or when a chunk is released.
+    notify: tokio::sync::Notify,
+}
+
+impl FallbackCoordinator {
+    fn new(right_partition_count: usize, with_visited_bitmap: bool) -> Self {
+        Self {
+            right_partition_count,
+            with_visited_bitmap,
+            inner: tokio::sync::Mutex::new(FallbackCoordinatorInner {
+                reservation: None,
+                left_stream: None,
+                left_schema: None,
+                carryover: None,
+                left_exhausted: false,
+                next_chunk_index: 0,
+                current: None,
+                loader_in_flight: false,
+            }),
+            notify: tokio::sync::Notify::new(),
+        }
+    }
+
+    /// After the last partition finishes processing chunk
+    /// `released_chunk_index`, drop the slot so the next leader can
+    /// load chunk `released_chunk_index + 1`.
+    async fn release_chunk(self: &Arc<Self>, released_chunk_index: usize) {
+        let mut inner = self.inner.lock().await;
+        if let Some(cur) = &inner.current
+            && cur.chunk_index == released_chunk_index
+        {
+            inner.current = None;
+            inner.next_chunk_index = released_chunk_index + 1;
+        }
+        // Always notify: waiters may be blocked because they couldn't
+        // become leader while a previous chunk was current.
+        drop(inner);
+        self.notify.notify_waiters();
+    }
+
+    /// Fetch `expected_chunk_index`, becoming leader to load it from the
+    /// left spill stream if no other partition has done so. Returns
+    /// `Ok(None)` when the left stream is exhausted and no chunk with
+    /// the requested index exists.
+    async fn next_chunk(
+        self: Arc<Self>,
+        expected_chunk_index: usize,
+        left_spill_fut: OnceFut<LeftSpillData>,
+        task_context: Arc<TaskContext>,
+    ) -> Result<Option<(Arc<JoinLeftData>, bool)>> {
+        // Resolve the left spill future once. All partitions share the
+        // same OnceFut so this only does real work the first time.
+        let spill_data = left_spill_fut_get_shared(left_spill_fut).await?;
+
+        loop {
+            let mut inner = self.inner.lock().await;
+
+            // Case 1: requested chunk is already loaded.
+            if let Some(cur) = &inner.current
+                && cur.chunk_index == expected_chunk_index
+            {
+                return Ok(Some((Arc::clone(&cur.data), cur.is_last)));
+            }
+
+            // Case 2: left stream exhausted and no current chunk to
+            // deliver — caller is past the last chunk.
+            if inner.left_exhausted
+                && inner.current.is_none()
+                && inner.carryover.is_none()
+            {
+                return Ok(None);
+            }
+
+            // Case 3: no chunk loaded and no leader yet — claim leader.
+            if inner.current.is_none() && !inner.loader_in_flight {
+                inner.loader_in_flight = true;
+                // Lazily initialize the shared left stream, schema, and
+                // chunk reservation. Only the leader does this (under the
+                // `loader_in_flight` guard), so waiters never open a
+                // throwaway spill stream that the leader would overwrite.
+                let mut left_stream = match inner.left_stream.take() {
+                    Some(stream) => stream,
+                    None => {
+                        let stream = spill_data
+                            .spill_manager
+                            
.read_spill_as_stream(spill_data.spill_file.clone(), None)?;

Review Comment:
   Nice improvement moving stream initialization behind the leader claim.
   
   One small thing that caught my eye: `loader_in_flight` is set to `true` 
before calling `read_spill_as_stream(...)`, and the call uses `?`. Today that 
seems safe because `read_spill_as_stream` only constructs the buffered stream 
and returns `Ok(...)`; any real file/open/schema errors happen later while 
polling the stream, where `load_one_chunk` already clears `loader_in_flight` 
and wakes any waiters.
   
   This feels a little fragile though. If stream construction ever becomes 
genuinely fallible in the future, an early return here could leave waiters 
blocked indefinitely. It might be worth avoiding `?` while the flag is set, or 
using a small guard/helper that guarantees the flag is cleared and waiters are 
notified if leader setup fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to