kosiew commented on code in PR #21833:
URL: https://github.com/apache/datafusion/pull/21833#discussion_r3172430877
##########
datafusion/sqllogictest/test_files/nested_loop_join_spill.slt:
##########
@@ -57,6 +57,66 @@ Plan with Metrics
06)------ProjectionExec: expr=[value@0 as v2], metrics=[<slt:ignore>]
07)--------LazyMemoryExec: partitions=1, batch_generators=[generate_series:
start=1, end=1, batch_size=8192], metrics=[<slt:ignore>]
+# --- RIGHT JOIN with non-equijoin predicate ---
+# Every (v1, v2=1) pair passes the predicate `v1 + v2 > 0`, so all
+# 100000 left rows match the single right row. Output count = 100000.
+query I nosort
+SELECT count(*)
+FROM generate_series(1, 100000) AS t1(v1)
+RIGHT JOIN generate_series(1, 1) AS t2(v2)
+ ON (t1.v1 + t2.v2) > 0
+----
+100000
+
+# RIGHT JOIN where NO right row matches any left row. All 3 right rows
+# get NULL-padded on the left side. This exercises the global right
+# bitmap: every right batch is seen across multiple left chunks, and
+# we must emit the correct unmatched rows at the end.
+query II rowsort
+SELECT t1.v1, t2.v2
+FROM generate_series(1, 5) AS t1(v1)
Review Comment:
This test says it exercises the global right bitmap across multiple left
chunks, but `generate_series(1, 5)` looks too small to trigger the
memory-limited fallback under the `150K` limit. As a result, it seems to run
through the unchanged single-pass path instead.
The larger RIGHT JOIN case above does spill, but all left chunks match the
single right row, so it would not catch a bad global OR/accumulation or
incorrect final unmatched emission.
Could we add a spilling `RIGHT`, `FULL`, `RIGHT ANTI`, or `RIGHT MARK` case
where different right rows are matched by different left chunks and at least
one right row remains unmatched? It would also be good to assert `spill_count`
for that query so we know the fallback path is actually exercised.
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -654,13 +656,8 @@ impl ExecutionPlan for NestedLoopJoinExec {
let probe_side_data = self.right.execute(partition,
Arc::clone(&context))?;
// Determine if OOM fallback to memory-limited mode is possible.
- // Conditions:
- // 1. Disk manager supports temp files (needed for right-side spilling)
- // 2. Join type does not require tracking right-side matched state
- // across multiple left chunks (RIGHT/FULL joins not yet supported)
- let spill_state = if
context.runtime_env().disk_manager.tmp_files_enabled()
- && !need_produce_right_in_final(self.join_type)
- {
+ // Condition: disk manager supports temp files (needed for spilling)
+ let spill_state = if
context.runtime_env().disk_manager.tmp_files_enabled() {
Review Comment:
I think we need to keep the core invariant here that memory-limited
execution produces the same results as the single-pass NLJ for every enabled
join type.
Enabling fallback for `FULL` joins exposes the memory-limited path to
multi-partition right inputs, but each output partition builds its own
per-chunk `JoinLeftData` with `AtomicUsize::new(1)`. That means left-unmatched
rows are emitted based only on matches seen by that partition's right-side
input.
In the single-pass path, `collect_left_input(..., right_partition_count)`
coordinates left-unmatched emission across all right partitions. The fallback
path does not appear to do that yet.
For a `FULL JOIN` with `target_partitions > 1`, a left row that matches a
row in another right partition can still be emitted as unmatched by this
partition, which would produce incorrect duplicate/null-padded rows.
Could we either keep the previous exclusion for `FULL` until the fallback
path has coordinated cross-partition left match state, or make the
memory-limited left bitmap/probe completion shared across right partitions for
each left chunk?
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1680,10 +1723,58 @@ impl NestedLoopJoinStream {
}
}
- /// Handle EmitRightUnmatched state - emit unmatched right rows
+ /// Handle EmitRightUnmatched state - emit unmatched right rows.
+ ///
+ /// In memory-limited mode, instead of emitting unmatched right rows
+ /// per-batch (which would be incorrect since more left chunks may
+ /// match those rows), we merge the bitmap into the global accumulator
+ /// and defer emission to `EmitGlobalRightUnmatched`.
fn handle_emit_right_unmatched(
&mut self,
) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
+ // In memory-limited mode, merge bitmap into global and move on
+ if self.is_memory_limited() {
+ debug_assert!(
+ self.current_right_batch_matched.is_some(),
+ "right bitmap must be present"
+ );
+ let bitmap = std::mem::take(&mut self.current_right_batch_matched)
+ .expect("right bitmap should be available");
+ let (values, _nulls) = bitmap.into_parts();
+
Review Comment:
Small readability suggestion: could this bitmap merge/accounting block be
extracted into a helper on `SpillStateActive`, maybe something like
`merge_current_right_bitmap(idx, values)`?
The state machine is already pretty dense, and centralizing the first-seen
vs OR-merge behavior would make the global bitmap invariant easier to audit and
test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]