This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 82ccba95f8 arrow-select: improve dictionary interleave fallback 
performance (#8978)
82ccba95f8 is described below

commit 82ccba95f855bb37ffa9aba771d88f7f91255e1d
Author: Alfonso Subiotto Marqués <[email protected]>
AuthorDate: Thu Dec 11 20:26:42 2025 +0100

    arrow-select: improve dictionary interleave fallback performance (#8978)
    
    The naive interleave_fallback would use MutableArray and extend it with
    the full values slice each time the target array changed in the indices
    slice.
    
    This commit introduces a new approach where dictionary values are
    concatenated once and then new offsets are computed over these taking
    the indices into account. This results in 50-75% performance improvement
    in microbenchmarks and will also improve memory usage during interleaves
    (used heavily in sorts).
    
    Note that this path is only taken when should_merge_dictionary_values
    returns false.
    
    ```
    $ cargo bench --bench interleave_kernels -- 'dict' --baseline main
    interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000]
                            time:   [627.14 ns 634.76 ns 644.13 ns]
                            change: [−65.614% −65.345% −65.002%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 9 outliers among 100 measurements (9.00%)
      2 (2.00%) low mild
      6 (6.00%) high mild
      1 (1.00%) high severe
    
    interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000]
                            time:   [934.35 ns 937.51 ns 940.60 ns]
                            change: [−71.488% −71.340% −71.208%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 3 outliers among 100 measurements (3.00%)
      3 (3.00%) high mild
    
    interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                            time:   [1.6485 µs 1.6528 µs 1.6566 µs]
                            change: [−74.307% −74.190% −74.088%] (p = 0.00 < 
0.05)
                            Performance has improved.
    
    interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                            time:   [1.6723 µs 1.6782 µs 1.6842 µs]
                            change: [−74.664% −74.544% −74.438%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 2 outliers among 100 measurements (2.00%)
      1 (1.00%) low mild
      1 (1.00%) high severe
    
    interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000]
                            time:   [1.5985 µs 1.6064 µs 1.6148 µs]
                            change: [−12.510% −12.116% −11.715%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 19 outliers among 100 measurements (19.00%)
      10 (10.00%) low mild
      6 (6.00%) high mild
      3 (3.00%) high severe
    
    interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000]
                            time:   [1.9310 µs 1.9466 µs 1.9680 µs]
                            change: [−41.488% −41.091% −40.628%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 15 outliers among 100 measurements (15.00%)
      3 (3.00%) low mild
      6 (6.00%) high mild
      6 (6.00%) high severe
    
    interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                            time:   [2.7812 µs 2.8516 µs 2.9401 µs]
                            change: [−56.097% −55.276% −54.274%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 15 outliers among 100 measurements (15.00%)
      8 (8.00%) high mild
      7 (7.00%) high severe
    
    interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                            time:   [3.4926 µs 3.6558 µs 3.8427 µs]
                            change: [−48.423% −46.405% −44.379%] (p = 0.00 < 
0.05)
                            Performance has improved.
    
    interleave dict_distinct 100
                            time:   [2.0013 µs 2.0106 µs 2.0205 µs]
                            change: [−1.6162% −1.0465% −0.4647%] (p = 0.00 < 
0.05)
                            Change within noise threshold.
    Found 4 outliers among 100 measurements (4.00%)
      4 (4.00%) high mild
    
    interleave dict_distinct 1024
                            time:   [1.9784 µs 1.9855 µs 1.9924 µs]
                            change: [−2.4655% −1.8461% −1.2265%] (p = 0.00 < 
0.05)
                            Performance has improved.
    
    interleave dict_distinct 2048
                            time:   [1.9832 µs 1.9959 µs 2.0087 µs]
                            change: [−2.9917% −2.3003% −1.6062%] (p = 0.00 < 
0.05)
                            Performance has improved.
    ```
    
    # Which issue does this PR close?
    
    Specific performance improvement, I believe issue is redundant.
    
    # Rationale for this change
    
    See commit message/PR description.
    
    # What changes are included in this PR?
    
    Ditto.
    
    # Are these changes tested?
    
    This PR adds an additional test for interleave_fallback dictionaries
    with nulls in addition to the existing tests.
    
    # Are there any user-facing changes?
    
    No.
    
    Signed-off-by: Alfonso Subiotto Marques <[email protected]>
---
 arrow-select/src/concat.rs     |   2 +-
 arrow-select/src/dictionary.rs |  17 ++++-
 arrow-select/src/interleave.rs | 155 ++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 168 insertions(+), 6 deletions(-)

diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index 29c52b192b..81b24827e3 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -107,7 +107,7 @@ fn concat_dictionaries<K: ArrowDictionaryKeyType>(
         .inspect(|d| output_len += d.len())
         .collect();
 
-    if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
+    if !should_merge_dictionary_values::<K>(&dictionaries, output_len).0 {
         return concat_fallback(arrays, Capacities::Array(output_len));
     }
 
diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs
index 266842bcc7..5b32f4e761 100644
--- a/arrow-select/src/dictionary.rs
+++ b/arrow-select/src/dictionary.rs
@@ -174,10 +174,14 @@ type PtrEq = fn(&dyn Array, &dyn Array) -> bool;
 /// some return over the naive approach used by MutableArrayData
 ///
 /// `len` is the total length of the merged output
+///
+/// Returns `(should_merge, has_overflow)` where:
+/// - `should_merge`: whether dictionary values should be merged
+/// - `has_overflow`: whether the combined dictionary values would overflow 
the key type
 pub(crate) fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
     dictionaries: &[&DictionaryArray<K>],
     len: usize,
-) -> bool {
+) -> (bool, bool) {
     use DataType::*;
     let first_values = dictionaries[0].values().as_ref();
     let ptr_eq: PtrEq = match first_values.data_type() {
@@ -187,7 +191,11 @@ pub(crate) fn should_merge_dictionary_values<K: 
ArrowDictionaryKeyType>(
         LargeBinary => bytes_ptr_eq::<LargeBinaryType>,
         dt => {
             if !dt.is_primitive() {
-                return false;
+                return (
+                    false,
+                    K::Native::from_usize(dictionaries.iter().map(|d| 
d.values().len()).sum())
+                        .is_none(),
+                );
             }
             |a, b| a.to_data().ptr_eq(&b.to_data())
         }
@@ -206,7 +214,10 @@ pub(crate) fn should_merge_dictionary_values<K: 
ArrowDictionaryKeyType>(
     let overflow = K::Native::from_usize(total_values).is_none();
     let values_exceed_length = total_values >= len;
 
-    !single_dictionary && (overflow || values_exceed_length)
+    (
+        !single_dictionary && (overflow || values_exceed_length),
+        overflow,
+    )
 }
 
 /// Given an array of dictionaries and an optional key mask compute a values 
array
diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs
index 1453995a0a..cb3ca655dc 100644
--- a/arrow-select/src/interleave.rs
+++ b/arrow-select/src/interleave.rs
@@ -17,6 +17,7 @@
 
 //! Interleave elements from multiple arrays
 
+use crate::concat::concat;
 use crate::dictionary::{merge_dictionary_values, 
should_merge_dictionary_values};
 use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
 use arrow_array::cast::AsArray;
@@ -195,8 +196,14 @@ fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
     indices: &[(usize, usize)],
 ) -> Result<ArrayRef, ArrowError> {
     let dictionaries: Vec<_> = arrays.iter().map(|x| 
x.as_dictionary::<K>()).collect();
-    if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
-        return interleave_fallback(arrays, indices);
+    let (should_merge, has_overflow) =
+        should_merge_dictionary_values::<K>(&dictionaries, indices.len());
+    if !should_merge {
+        return if has_overflow {
+            interleave_fallback(arrays, indices)
+        } else {
+            interleave_fallback_dictionary::<K>(&dictionaries, indices)
+        };
     }
 
     let masks: Vec<_> = dictionaries
@@ -346,6 +353,76 @@ fn interleave_fallback(
     Ok(make_array(array_data.freeze()))
 }
 
+/// Fallback implementation for interleaving dictionaries when it was 
determined
+/// that the dictionary values should not be merged. This implementation 
concatenates
+/// the value slices and recomputes the resulting dictionary keys.
+///
+/// # Panics
+///
+/// This function assumes that the combined dictionary values will not 
overflow the
+/// key type. Callers must verify this condition 
[`should_merge_dictionary_values`]
+/// before calling this function.
+fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
+    dictionaries: &[&DictionaryArray<K>],
+    indices: &[(usize, usize)],
+) -> Result<ArrayRef, ArrowError> {
+    let relative_offsets: Vec<usize> = dictionaries
+        .iter()
+        .scan(0usize, |offset, dict| {
+            let current = *offset;
+            *offset += dict.values().len();
+            Some(current)
+        })
+        .collect();
+    let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| 
d.values().as_ref()).collect();
+    let concatenated_values = concat(&all_values)?;
+
+    let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
+    let (new_keys, nulls) = if any_nulls {
+        let mut has_nulls = false;
+        let new_keys: Vec<K::Native> = indices
+            .iter()
+            .map(|(array, row)| {
+                let old_keys = dictionaries[*array].keys();
+                if old_keys.is_valid(*row) {
+                    let old_key = old_keys.values()[*row].as_usize();
+                    K::Native::from_usize(relative_offsets[*array] + old_key)
+                        .expect("key overflow should be checked by caller")
+                } else {
+                    has_nulls = true;
+                    K::Native::ZERO
+                }
+            })
+            .collect();
+
+        let nulls = if has_nulls {
+            let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
+                let (array, row) = indices[i];
+                dictionaries[array].keys().is_valid(row)
+            });
+            Some(NullBuffer::new(null_buffer))
+        } else {
+            None
+        };
+        (new_keys, nulls)
+    } else {
+        let new_keys: Vec<K::Native> = indices
+            .iter()
+            .map(|(array, row)| {
+                let old_key = 
dictionaries[*array].keys().values()[*row].as_usize();
+                K::Native::from_usize(relative_offsets[*array] + old_key)
+                    .expect("key overflow should be checked by caller")
+            })
+            .collect();
+        (new_keys, None)
+    };
+
+    let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
+    // SAFETY: keys_array is constructed from a valid set of keys.
+    let array = unsafe { DictionaryArray::new_unchecked(keys_array, 
concatenated_values) };
+    Ok(Arc::new(array))
+}
+
 /// Interleave rows by index from multiple [`RecordBatch`] instances and 
return a new [`RecordBatch`].
 ///
 /// This function will call [`interleave`] on each array of the 
[`RecordBatch`] instances and assemble a new [`RecordBatch`].
@@ -412,6 +489,7 @@ mod tests {
     use super::*;
     use arrow_array::Int32RunArray;
     use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
+    use arrow_array::types::Int8Type;
     use arrow_schema::Field;
 
     #[test]
@@ -509,6 +587,41 @@ mod tests {
         assert_eq!(actual, expected);
     }
 
+    #[test]
+    fn test_interleave_dictionary_overflow_same_values() {
+        let values: ArrayRef = Arc::new(StringArray::from_iter_values(
+            (0..50).map(|i| format!("v{i}")),
+        ));
+
+        // With 3 dictionaries of 50 values each, relative_offsets = [0, 50, 
100]
+        // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows 
Int8
+        // (max 127).
+        // This test case falls back to interleave_fallback because the
+        // dictionaries share the same underlying values slice.
+        let dict1 = DictionaryArray::<Int8Type>::new(
+            Int8Array::from_iter_values([0, 1, 2]),
+            values.clone(),
+        );
+        let dict2 = DictionaryArray::<Int8Type>::new(
+            Int8Array::from_iter_values([0, 1, 2]),
+            values.clone(),
+        );
+        let dict3 =
+            
DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), 
values.clone());
+
+        let indices = &[(0, 0), (1, 0), (2, 0)];
+        let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
+
+        let dict_result = result.as_dictionary::<Int8Type>();
+        let string_result: Vec<_> = dict_result
+            .downcast_dict::<StringArray>()
+            .unwrap()
+            .into_iter()
+            .map(|x| x.unwrap())
+            .collect();
+        assert_eq!(string_result, vec!["v0", "v0", "v49"]);
+    }
+
     #[test]
     fn test_lists() {
         // [[1, 2], null, [3]]
@@ -1182,4 +1295,42 @@ mod tests {
         assert_eq!(v.len(), 1);
         assert_eq!(v.data_type(), &DataType::Struct(fields));
     }
+
+    #[test]
+    fn test_interleave_fallback_dictionary_with_nulls() {
+        let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
+        let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
+        let dict_a = DictionaryArray::new(input_1_keys, 
Arc::new(input_1_values));
+
+        let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
+        let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
+        let dict_b = DictionaryArray::new(input_2_keys, 
Arc::new(input_2_values));
+
+        let indices = vec![
+            (0, 0), // "foo"
+            (0, 1), // null
+            (1, 0), // "baz"
+            (1, 2), // null
+            (0, 2), // "bar"
+            (1, 1), // "qux"
+        ];
+
+        let result =
+            interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], 
&indices).unwrap();
+        let dict_result = result.as_dictionary::<Int32Type>();
+
+        let string_result = 
dict_result.downcast_dict::<StringArray>().unwrap();
+        let collected: Vec<_> = string_result.into_iter().collect();
+        assert_eq!(
+            collected,
+            vec![
+                Some("foo"),
+                None,
+                Some("baz"),
+                None,
+                Some("bar"),
+                Some("qux")
+            ]
+        );
+    }
 }

Reply via email to