kosiew commented on code in PR #21833:
URL: https://github.com/apache/datafusion/pull/21833#discussion_r3172430877


##########
datafusion/sqllogictest/test_files/nested_loop_join_spill.slt:
##########
@@ -57,6 +57,66 @@ Plan with Metrics
 06)------ProjectionExec: expr=[value@0 as v2], metrics=[<slt:ignore>]
 07)--------LazyMemoryExec: partitions=1, batch_generators=[generate_series: 
start=1, end=1, batch_size=8192], metrics=[<slt:ignore>]
 
+# --- RIGHT JOIN with non-equijoin predicate ---
+# Every (v1, v2=1) pair passes the predicate `v1 + v2 > 0`, so all
+# 100000 left rows match the single right row. Output count = 100000.
+query I nosort
+SELECT count(*)
+FROM generate_series(1, 100000) AS t1(v1)
+RIGHT JOIN generate_series(1, 1) AS t2(v2)
+  ON (t1.v1 + t2.v2) > 0
+----
+100000
+
+# RIGHT JOIN where NO right row matches any left row. All 3 right rows
+# get NULL-padded on the left side. This exercises the global right
+# bitmap: every right batch is seen across multiple left chunks, and
+# we must emit the correct unmatched rows at the end.
+query II rowsort
+SELECT t1.v1, t2.v2
+FROM generate_series(1, 5) AS t1(v1)

Review Comment:
   This test says it exercises the global right bitmap across multiple left 
chunks, but `generate_series(1, 5)` looks too small to trigger the 
memory-limited fallback under the `150K` limit. As a result, it seems to run 
through the unchanged single-pass path instead.
   
   The larger RIGHT JOIN case above does spill, but all left chunks match the 
single right row, so it would not catch a bad global OR/accumulation or 
incorrect final unmatched emission.
   
   Could we add a spilling `RIGHT`, `FULL`, `RIGHT ANTI`, or `RIGHT MARK` case 
where different right rows are matched by different left chunks and at least 
one right row remains unmatched? It would also be good to assert `spill_count` 
for that query so we know the fallback path is actually exercised.



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -654,13 +656,8 @@ impl ExecutionPlan for NestedLoopJoinExec {
         let probe_side_data = self.right.execute(partition, 
Arc::clone(&context))?;
 
         // Determine if OOM fallback to memory-limited mode is possible.
-        // Conditions:
-        // 1. Disk manager supports temp files (needed for right-side spilling)
-        // 2. Join type does not require tracking right-side matched state
-        //    across multiple left chunks (RIGHT/FULL joins not yet supported)
-        let spill_state = if 
context.runtime_env().disk_manager.tmp_files_enabled()
-            && !need_produce_right_in_final(self.join_type)
-        {
+        // Condition: disk manager supports temp files (needed for spilling)
+        let spill_state = if 
context.runtime_env().disk_manager.tmp_files_enabled() {

Review Comment:
   I think we need to keep the core invariant here that memory-limited 
execution produces the same results as the single-pass NLJ for every enabled 
join type.
   
   Enabling fallback for `FULL` joins exposes the memory-limited path to 
multi-partition right inputs, but each output partition builds its own 
per-chunk `JoinLeftData` with `AtomicUsize::new(1)`. That means left-unmatched 
rows are emitted based only on matches seen by that partition's right-side 
input.
   
   In the single-pass path, `collect_left_input(..., right_partition_count)` 
coordinates left-unmatched emission across all right partitions. The fallback 
path does not appear to do that yet.
   
   For a `FULL JOIN` with `target_partitions > 1`, a left row that matches a 
row in another right partition can still be emitted as unmatched by this 
partition, which would produce incorrect duplicate/null-padded rows.
   
   Could we either keep the previous exclusion for `FULL` until the fallback 
path has coordinated cross-partition left match state, or make the 
memory-limited left bitmap/probe completion shared across right partitions for 
each left chunk?



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1680,10 +1723,58 @@ impl NestedLoopJoinStream {
         }
     }
 
-    /// Handle EmitRightUnmatched state - emit unmatched right rows
+    /// Handle EmitRightUnmatched state - emit unmatched right rows.
+    ///
+    /// In memory-limited mode, instead of emitting unmatched right rows
+    /// per-batch (which would be incorrect since more left chunks may
+    /// match those rows), we merge the bitmap into the global accumulator
+    /// and defer emission to `EmitGlobalRightUnmatched`.
     fn handle_emit_right_unmatched(
         &mut self,
     ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
+        // In memory-limited mode, merge bitmap into global and move on
+        if self.is_memory_limited() {
+            debug_assert!(
+                self.current_right_batch_matched.is_some(),
+                "right bitmap must be present"
+            );
+            let bitmap = std::mem::take(&mut self.current_right_batch_matched)
+                .expect("right bitmap should be available");
+            let (values, _nulls) = bitmap.into_parts();
+

Review Comment:
   Small readability suggestion: could this bitmap merge/accounting block be 
extracted into a helper on `SpillStateActive`, maybe something like 
`merge_current_right_bitmap(idx, values)`?
   
   The state machine is already pretty dense, and centralizing the first-seen 
vs OR-merge behavior would make the global bitmap invariant easier to audit and 
test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to