This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 7e20eb7ddb [branch-52] perf: Cache num_output_rows in sort merge join
to avoid O(n) recount (#20478) (#20936)
7e20eb7ddb is described below
commit 7e20eb7ddb3acf8174af7adec52859e28333d570
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Mar 13 19:16:45 2026 -0400
[branch-52] perf: Cache num_output_rows in sort merge join to avoid O(n)
recount (#20478) (#20936)
---
.../src/joins/sort_merge_join/stream.rs | 29 ++++++++++------------
1 file changed, 13 insertions(+), 16 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index 3a57dc6b41..dca55c720e 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -128,6 +128,8 @@ pub(super) struct StreamedBatch {
pub join_arrays: Vec<ArrayRef>,
/// Chunks of indices from buffered side (may be nulls) joined to streamed
pub output_indices: Vec<StreamedJoinedChunk>,
+ /// Total number of output rows across all chunks in `output_indices`
+ pub num_output_rows: usize,
/// Index of currently scanned batch from buffered data
pub buffered_batch_idx: Option<usize>,
/// Indices that found a match for the given join filter
@@ -144,6 +146,7 @@ impl StreamedBatch {
idx: 0,
join_arrays,
output_indices: vec![],
+ num_output_rows: 0,
buffered_batch_idx: None,
join_filter_matched_idxs: HashSet::new(),
}
@@ -155,6 +158,7 @@ impl StreamedBatch {
idx: 0,
join_arrays: vec![],
output_indices: vec![],
+ num_output_rows: 0,
buffered_batch_idx: None,
join_filter_matched_idxs: HashSet::new(),
}
@@ -162,10 +166,7 @@ impl StreamedBatch {
/// Number of unfrozen output pairs in this streamed batch
fn num_output_rows(&self) -> usize {
- self.output_indices
- .iter()
- .map(|chunk| chunk.streamed_indices.len())
- .sum()
+ self.num_output_rows
}
/// Appends new pair consisting of current streamed index and
`buffered_idx`
@@ -175,7 +176,6 @@ impl StreamedBatch {
buffered_batch_idx: Option<usize>,
buffered_idx: Option<usize>,
batch_size: usize,
- num_unfrozen_pairs: usize,
) {
// If no current chunk exists or current chunk is not for current
buffered batch,
// create a new chunk
@@ -183,12 +183,13 @@ impl StreamedBatch {
{
// Compute capacity only when creating a new chunk (infrequent
operation).
// The capacity is the remaining space to reach batch_size.
- // This should always be >= 1 since we only call this when
num_unfrozen_pairs < batch_size.
+ // This should always be >= 1 since we only call this when
num_output_rows < batch_size.
debug_assert!(
- batch_size > num_unfrozen_pairs,
- "batch_size ({batch_size}) must be > num_unfrozen_pairs
({num_unfrozen_pairs})"
+ batch_size > self.num_output_rows,
+ "batch_size ({batch_size}) must be > num_output_rows ({})",
+ self.num_output_rows
);
- let capacity = batch_size - num_unfrozen_pairs;
+ let capacity = batch_size - self.num_output_rows;
self.output_indices.push(StreamedJoinedChunk {
buffered_batch_idx,
streamed_indices: UInt64Builder::with_capacity(capacity),
@@ -205,6 +206,7 @@ impl StreamedBatch {
} else {
current_chunk.buffered_indices.append_null();
}
+ self.num_output_rows += 1;
}
}
@@ -1134,13 +1136,10 @@ impl SortMergeJoinStream {
let scanning_idx = self.buffered_data.scanning_idx();
if join_streamed {
// Join streamed row and buffered row
- // Pass batch_size and num_unfrozen_pairs to compute
capacity only when
- // creating a new chunk (when buffered_batch_idx changes),
not on every iteration.
self.streamed_batch.append_output_pair(
Some(self.buffered_data.scanning_batch_idx),
Some(scanning_idx),
self.batch_size,
- self.num_unfrozen_pairs(),
);
} else {
// Join nulls and buffered row for FULL join
@@ -1166,13 +1165,10 @@ impl SortMergeJoinStream {
// For Mark join we store a dummy id to indicate the row has a
match
let scanning_idx = mark_row_as_match.then_some(0);
- // Pass batch_size=1 and num_unfrozen_pairs=0 to get capacity of 1,
- // since we only append a single null-joined pair here (not in a
loop).
self.streamed_batch.append_output_pair(
scanning_batch_idx,
scanning_idx,
- 1,
- 0,
+ self.batch_size,
);
self.buffered_data.scanning_finish();
self.streamed_joined = true;
@@ -1469,6 +1465,7 @@ impl SortMergeJoinStream {
}
self.streamed_batch.output_indices.clear();
+ self.streamed_batch.num_output_rows = 0;
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]