This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new b83c0e01ee [branch-53] fix: move overflow guard before dense ratio in 
hash join to prevent overflows (#20998) (#21008)
b83c0e01ee is described below

commit b83c0e01ee2e6fa97c0ae5e0d95176648ad6a4a3
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Mar 17 19:16:58 2026 -0400

    [branch-53] fix: move overflow guard before dense ratio in hash join to 
prevent overflows (#20998) (#21008)
    
    (cherry picked from commit e74e58f109880fe6ff53dcc6251abdd30a972e49)
    
    I confirmed that this fixed CometFuzzTestSuite "join" which found the
    original issue while testing `branch-53`.
    
    ## Which issue does this PR close?
    - Closes #20995.
    
    ## Rationale for this change
    #20995 has details but it is very straightforward. `dense_ratio`
    calculation overflows since overflow guard is after not before
    
    ## What changes are included in this PR?
    Prevent hash join overflow and unit test for it
    - backports https://github.com/apache/datafusion/pull/20998 from
    @buraksenn
    
    ## Are these changes tested?
    Added a test case for both min and max scenario
    
    Co-authored-by: Burak Şen <[email protected]>
---
 .../physical-plan/src/joins/hash_join/exec.rs      | 52 ++++++++++++++++++----
 1 file changed, 43 insertions(+), 9 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index f9fa9f6eb2..25b320f985 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -154,23 +154,23 @@ fn try_create_array_map(
 
     let range = ArrayMap::calculate_range(min_val, max_val);
     let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
-    let dense_ratio = (num_row as f64) / ((range + 1) as f64);
 
     // TODO: support create ArrayMap<u64>
     if num_row >= u32::MAX as usize {
         return Ok(None);
     }
 
-    if range >= perfect_hash_join_small_build_threshold as u64
-        && dense_ratio <= perfect_hash_join_min_key_density
-    {
+    // When the key range spans the full integer domain (e.g. i64::MIN to 
i64::MAX),
+    // range is u64::MAX and `range + 1` below would overflow.
+    if range == usize::MAX as u64 {
         return Ok(None);
     }
 
-    // If range equals usize::MAX, then range + 1 would overflow to 0, which 
would cause
-    // ArrayMap to allocate an invalid zero-sized array or cause indexing 
issues.
-    // This check prevents such overflow and ensures valid array allocation.
-    if range == usize::MAX as u64 {
+    let dense_ratio = (num_row as f64) / ((range + 1) as f64);
+
+    if range >= perfect_hash_join_small_build_threshold as u64
+        && dense_ratio <= perfect_hash_join_min_key_density
+    {
         return Ok(None);
     }
 
@@ -2082,7 +2082,9 @@ mod tests {
         test::exec::MockExec,
     };
 
-    use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, 
UInt64Array};
+    use arrow::array::{
+        Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, 
UInt64Array,
+    };
     use arrow::buffer::NullBuffer;
     use arrow::datatypes::{DataType, Field};
     use arrow_schema::Schema;
@@ -5499,6 +5501,38 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
+        let task_ctx = prepare_task_ctx(8192, true);
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int64, true)]));
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
+        )?;
+        let left = TestMemoryExec::try_new_exec(
+            &[vec![batch.clone()]],
+            Arc::clone(&schema),
+            None,
+        )?;
+        let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, 
None)?;
+        let on: JoinOn = vec![(
+            Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
+            Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
+        )];
+        let (_columns, batches, _metrics) = join_collect(
+            left,
+            right,
+            on,
+            &JoinType::Inner,
+            NullEquality::NullEqualsNothing,
+            task_ctx,
+        )
+        .await?;
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 2);
+        Ok(())
+    }
+
     #[apply(hash_join_exec_configs)]
     #[tokio::test]
     async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to