comphead commented on code in PR #10784:
URL: https://github.com/apache/datafusion/pull/10784#discussion_r1629770348
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1500,11 +1497,13 @@ fn get_filtered_join_mask(
filter_matched_indices.push(streamed_indices.value(i));
}
- // if switched to next streaming index(e.g. from 0 to 1, or
from 1 to 2), we reset seen_as_true flag
+ // Reset `seen_as_true` flag and calculate mask for the
current streaming index
+ // - if within the batch it switched to next streaming
index(e.g. from 0 to 1, or from 1 to 2)
+ // - if it is at the end of the all buffered batches for the
given streaming index, 0 index comes last
if (i < streamed_indices_length - 1
&& streamed_indices.value(i) != streamed_indices.value(i +
1))
|| (i == streamed_indices_length - 1
- && *scanning_buffered_batch_idx ==
buffered_batches_len - 1)
+ && *scanning_buffered_offset == 0)
Review Comment:
Thanks @viirya for the review. We exactly need to check the condition where
all buffered batches are scanned for current streamed batch. This is because
LeftAnti doesnt know if it matches or not until the very last buffered row
comes in. This scenario already tested in slt file
```
query II
select * from (
with
t1 as (
select 11 a, 12 b),
t2 as (
select 11 a, 12 c union all
select 11 a, 11 c union all
select 11 a, 15 c
)
select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
) order by 1, 2
----
```
it works for small and large batches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]