This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new b83c0e01ee [branch-53] fix: move overflow guard before dense ratio in
hash join to prevent overflows (#20998) (#21008)
b83c0e01ee is described below
commit b83c0e01ee2e6fa97c0ae5e0d95176648ad6a4a3
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Mar 17 19:16:58 2026 -0400
[branch-53] fix: move overflow guard before dense ratio in hash join to
prevent overflows (#20998) (#21008)
(cherry picked from commit e74e58f109880fe6ff53dcc6251abdd30a972e49)
I confirmed that this fixed CometFuzzTestSuite "join" which found the
original issue while testing `branch-53`.
## Which issue does this PR close?
- Closes #20995.
## Rationale for this change
#20995 has details but it is very straightforward. `dense_ratio`
calculation overflows since overflow guard is after not before
## What changes are included in this PR?
Prevent hash join overflow and unit test for it
- backports https://github.com/apache/datafusion/pull/20998 from
@buraksenn
## Are these changes tested?
Added a test case for both min and max scenario
Co-authored-by: Burak Şen <[email protected]>
---
.../physical-plan/src/joins/hash_join/exec.rs | 52 ++++++++++++++++++----
1 file changed, 43 insertions(+), 9 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index f9fa9f6eb2..25b320f985 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -154,23 +154,23 @@ fn try_create_array_map(
let range = ArrayMap::calculate_range(min_val, max_val);
let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
- let dense_ratio = (num_row as f64) / ((range + 1) as f64);
// TODO: support create ArrayMap<u64>
if num_row >= u32::MAX as usize {
return Ok(None);
}
- if range >= perfect_hash_join_small_build_threshold as u64
- && dense_ratio <= perfect_hash_join_min_key_density
- {
+ // When the key range spans the full integer domain (e.g. i64::MIN to
i64::MAX),
+ // range is u64::MAX and `range + 1` below would overflow.
+ if range == usize::MAX as u64 {
return Ok(None);
}
- // If range equals usize::MAX, then range + 1 would overflow to 0, which
would cause
- // ArrayMap to allocate an invalid zero-sized array or cause indexing
issues.
- // This check prevents such overflow and ensures valid array allocation.
- if range == usize::MAX as u64 {
+ let dense_ratio = (num_row as f64) / ((range + 1) as f64);
+
+ if range >= perfect_hash_join_small_build_threshold as u64
+ && dense_ratio <= perfect_hash_join_min_key_density
+ {
return Ok(None);
}
@@ -2082,7 +2082,9 @@ mod tests {
test::exec::MockExec,
};
- use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array,
UInt64Array};
+ use arrow::array::{
+ Date32Array, Int32Array, Int64Array, StructArray, UInt32Array,
UInt64Array,
+ };
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::Schema;
@@ -5499,6 +5501,38 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
+ let task_ctx = prepare_task_ctx(8192, true);
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int64, true)]));
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
+ )?;
+ let left = TestMemoryExec::try_new_exec(
+ &[vec![batch.clone()]],
+ Arc::clone(&schema),
+ None,
+ )?;
+ let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema,
None)?;
+ let on: JoinOn = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
+ )];
+ let (_columns, batches, _metrics) = join_collect(
+ left,
+ right,
+ on,
+ &JoinType::Inner,
+ NullEquality::NullEqualsNothing,
+ task_ctx,
+ )
+ .await?;
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 2);
+ Ok(())
+ }
+
#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]