kosiew commented on code in PR #22038:
URL: https://github.com/apache/datafusion/pull/22038#discussion_r3450966086
##########
datafusion/sqllogictest/test_files/nested_loop_join_spill.slt:
##########
@@ -128,12 +128,120 @@ FULL JOIN generate_series(1, 200) AS t2(v2)
----
100199 199 99999
-# Restore settings to slt runner defaults
-statement ok
-RESET datafusion.runtime.memory_limit
+# =============================================================================
+# Multi-partition memory-limited correctness tests
+#
+# These exercise the cross-partition shared-state path of NLJ's spill
+# fallback: every right partition must observe the same per-chunk
+# `JoinLeftData` (visited bitmap + probe-thread counter) so that join
+# types which emit unmatched left rows (LEFT, LEFT SEMI, LEFT ANTI,
+# LEFT MARK, FULL) produce the same results as the single-pass and
+# single-partition paths. Without the coordinator, each partition would
+# independently emit unmatched left rows from its own bitmap, producing
+# duplicates.
+# =============================================================================
statement ok
SET datafusion.execution.target_partitions = 4
+# Memory budget tight enough to force NLJ left-side OOM and trigger the
+# memory-limited fallback. Shared between all queries in this section.
+statement ok
+SET datafusion.runtime.memory_limit = '50K'
+
+# --- LEFT JOIN ---
+# v1 in [1,5000], v2 in [1,100] with predicate (v1+v2)=101 AND v2<=50.
+# Matches: v2 in [1..50] each pairs with v1 = 101 - v2 in [51..100] → 50
pairs
+# Left only: v1 in [1..5000] \ [51..100] →
4950 unmatched
+# Total LEFT JOIN output rows = 50 + 4950 = 5000 (each left row exactly once).
+query I nosort
+SELECT count(*) as cnt
+FROM generate_series(1, 5000) AS t1(v1)
+LEFT JOIN generate_series(1, 100) AS t2(v2)
+ ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50
+----
+5000
+
+# Verify the memory-limited fallback was actually taken (spill_count > 0)
+# under target_partitions=4. The fallback shares per-chunk `JoinLeftData`
+# across right partitions via `FallbackCoordinator`; without that path
+# the test would simply OOM rather than spill.
+query TT
+EXPLAIN ANALYZE
+SELECT count(*) as cnt
+FROM generate_series(1, 5000) AS t1(v1)
+LEFT JOIN generate_series(1, 100) AS t2(v2)
+ ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50
+----
+Plan with Metrics
+01)ProjectionExec: expr=[count(Int64(1))@0 as cnt], metrics=[<slt:ignore>]
+02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))],
metrics=[<slt:ignore>]
+03)----CoalescePartitionsExec, metrics=[<slt:ignore>]
+04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))],
metrics=[<slt:ignore>]
+05)--------NestedLoopJoinExec: join_type=Left, filter=v1@0 + v2@1 = 101,
projection=[], metrics=[output_rows=5.00 K, <slt:ignore> spill_count=2,
<slt:ignore>]
+06)----------ProjectionExec: expr=[value@0 as v1], metrics=[<slt:ignore>]
+07)------------LazyMemoryExec: partitions=1,
batch_generators=[generate_series: start=1, end=5000, batch_size=8192],
metrics=[<slt:ignore>]
Review Comment:
Thanks for adding the regression coverage here. I do not think this
currently exercises the multi-chunk coordinator path though.
The plan shows the left `generate_series(1, 5000)` being produced as one
`batch_size=8192` batch, and `load_one_chunk` accepts a single over-budget
batch so it can make progress. Because of that, these SLT cases appear to
validate only the one-chunk fallback path.
The main invariant added by the coordinator is per-chunk sharing, followed
by releasing the current chunk and advancing to the next one. Without a test
where the left input is split into at least two chunks, the `carryover`,
`release_chunk`, waiter notification, and global-right accumulation across
chunks are not really covered.
Could you please add a multi-partition spill regression with multiple left
batches or chunks? For example, this could be a Rust test that feeds more than
one left batch, or an SLT shape that produces multiple left batches. It would
be good to assert the same LEFT/FULL or LEFT SEMI/ANTI counts, plus
`spill_count > 0`.
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -908,6 +919,365 @@ pub(crate) struct LeftSpillData {
schema: SchemaRef,
}
+/// Per-chunk shared state in the memory-limited fallback path.
+///
+/// Each chunk's `JoinLeftData` is loaded once by a "leader" partition and
+/// shared (via `Arc`) with every right-side output partition. The
+/// `probe_threads_counter` inside the `JoinLeftData` is initialized to
+/// `right_partition_count`, so `report_probe_completed` returns `true`
+/// only when the *last* partition has finished probing the chunk. That
+/// last partition is then responsible for emitting unmatched left rows
+/// for the chunk, mirroring the single-pass path's coordination via
+/// `collect_left_input(..., probe_threads_count)`.
+struct CurrentChunk {
+ /// 0-based monotonically increasing chunk index.
+ chunk_index: usize,
+ /// Shared per-chunk left data. Cloned by every partition that probes
+ /// this chunk; the last to call `report_probe_completed` emits
+ /// unmatched left rows.
+ data: Arc<JoinLeftData>,
+ /// True if the left stream was exhausted while loading this chunk —
+ /// no further chunks will be produced after it.
+ is_last: bool,
+}
+
+/// Inner state of [`FallbackCoordinator`], guarded by an async mutex.
+struct FallbackCoordinatorInner {
+ /// Reservation owned by the coordinator. Holds the memory for the
+ /// currently-loaded chunk. Reset (`resize(0)`) between chunks.
+ /// Lazily registered by the first leader, after the runtime context
+ /// becomes available via `initiate_fallback`.
+ reservation: Option<MemoryReservation>,
+ /// The shared left spill stream from which chunks are read. Owned by
+ /// the coordinator so only one partition reads it at a time.
+ left_stream: Option<SendableRecordBatchStream>,
+ /// Left schema. Set after the first leader resolves the spill future.
+ left_schema: Option<SchemaRef>,
+ /// One batch carried over from the previous chunk's load: when
+ /// reservation `try_grow` failed for chunk N, the offending batch is
+ /// recorded here and becomes the first batch of chunk N+1.
+ carryover: Option<RecordBatch>,
+ /// True once the left spill stream has produced `None`.
+ left_exhausted: bool,
+ /// Index of the next chunk to be loaded.
+ next_chunk_index: usize,
+ /// The currently-loaded chunk, or `None` if no chunk is currently
+ /// loaded (initial state, or the last partition has just released
+ /// chunk `next_chunk_index - 1` and the next leader hasn't taken
+ /// over yet).
+ current: Option<CurrentChunk>,
+ /// True while a partition has claimed leader role for the next
+ /// chunk and is loading it; prevents two partitions from racing.
+ loader_in_flight: bool,
+}
+
+/// Plan-level shared coordinator for the memory-limited fallback path.
+///
+/// All right-side output partitions share one of these. It serializes
+/// access to the left spill stream (so each chunk is read exactly once),
+/// publishes the loaded chunk as an `Arc<JoinLeftData>` for every
+/// partition to clone, and uses a `Notify` so partitions waiting for the
+/// next chunk can sleep without busy-looping.
+pub(crate) struct FallbackCoordinator {
+ /// Number of right-side partitions; equals the
+ /// `probe_threads_counter` initial value for each chunk.
+ right_partition_count: usize,
+ /// Whether `JoinLeftData` should carry a left visited bitmap (for
+ /// join types that emit unmatched left rows in the final output).
+ with_visited_bitmap: bool,
+ inner: tokio::sync::Mutex<FallbackCoordinatorInner>,
+ /// Notified when a new chunk becomes available, when the left stream
+ /// is exhausted, or when a chunk is released.
+ notify: tokio::sync::Notify,
+}
+
+impl FallbackCoordinator {
+ fn new(right_partition_count: usize, with_visited_bitmap: bool) -> Self {
+ Self {
+ right_partition_count,
+ with_visited_bitmap,
+ inner: tokio::sync::Mutex::new(FallbackCoordinatorInner {
+ reservation: None,
+ left_stream: None,
+ left_schema: None,
+ carryover: None,
+ left_exhausted: false,
+ next_chunk_index: 0,
+ current: None,
+ loader_in_flight: false,
+ }),
+ notify: tokio::sync::Notify::new(),
+ }
+ }
+
+ /// After the last partition finishes processing chunk
+ /// `released_chunk_index`, drop the slot so the next leader can
+ /// load chunk `released_chunk_index + 1`.
+ async fn release_chunk(self: &Arc<Self>, released_chunk_index: usize) {
+ let mut inner = self.inner.lock().await;
+ if let Some(cur) = &inner.current
+ && cur.chunk_index == released_chunk_index
+ {
+ inner.current = None;
+ inner.next_chunk_index = released_chunk_index + 1;
+ }
+ // Always notify: waiters may be blocked because they couldn't
+ // become leader while a previous chunk was current.
+ drop(inner);
+ self.notify.notify_waiters();
+ }
+
+ /// Fetch `expected_chunk_index`, becoming leader to load it from the
+ /// left spill stream if no other partition has done so. Returns
+ /// `Ok(None)` when the left stream is exhausted and no chunk with
+ /// the requested index exists.
+ async fn next_chunk(
+ self: Arc<Self>,
+ expected_chunk_index: usize,
+ left_spill_fut: OnceFut<LeftSpillData>,
+ task_context: Arc<TaskContext>,
+ ) -> Result<Option<(Arc<JoinLeftData>, bool)>> {
+ // Resolve the left spill future once. All partitions share the
+ // same OnceFut so this only does real work the first time.
+ let spill_data = left_spill_fut_get_shared(left_spill_fut).await?;
+
+ loop {
+ let mut inner = self.inner.lock().await;
+
+ // Lazily initialize the shared left stream and schema from
+ // the spill file.
+ if inner.left_stream.is_none() && !inner.left_exhausted {
+ let stream = spill_data
+ .spill_manager
+ .read_spill_as_stream(spill_data.spill_file.clone(),
None)?;
+ inner.left_stream = Some(stream);
+ inner.left_schema = Some(Arc::clone(&spill_data.schema));
+ }
+ // Lazily register the coordinator's chunk reservation.
+ if inner.reservation.is_none() {
+ inner.reservation = Some(
+
MemoryConsumer::new("NestedLoopJoinFallbackChunk".to_string())
+ .with_can_spill(true)
+ .register(task_context.memory_pool()),
+ );
+ }
+
+ // Case 1: requested chunk is already loaded.
Review Comment:
Small follow-up suggestion: while a leader is loading a chunk, waiters can
see `left_stream.is_none()` and reopen the spill stream before they reach the
`loader_in_flight` wait path.
The leader later overwrites that stream, so this looks like wasted I/O and
state churn. It may be worth guarding lazy stream/reservation initialization
with `!inner.loader_in_flight`, or moving that initialization into the
leader-claim branch so only the loader sets up those fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]