This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 82ccba95f8 arrow-select: improve dictionary interleave fallback
performance (#8978)
82ccba95f8 is described below
commit 82ccba95f855bb37ffa9aba771d88f7f91255e1d
Author: Alfonso Subiotto Marqués <[email protected]>
AuthorDate: Thu Dec 11 20:26:42 2025 +0100
arrow-select: improve dictionary interleave fallback performance (#8978)
The naive interleave_fallback would use MutableArray and extend it with
the full values slice each time the target array changed in the indices
slice.
This commit introduces a new approach where dictionary values are
concatenated once and then new offsets are computed over these taking
the indices into account. This results in 50-75% performance improvement
in microbenchmarks and will also improve memory usage during interleaves
(used heavily in sorts).
Note that this path is only taken when should_merge_dictionary_values
returns false.
```
$ cargo bench --bench interleave_kernels -- 'dict' --baseline main
interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000]
time: [627.14 ns 634.76 ns 644.13 ns]
change: [−65.614% −65.345% −65.002%] (p = 0.00 <
0.05)
Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
2 (2.00%) low mild
6 (6.00%) high mild
1 (1.00%) high severe
interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000]
time: [934.35 ns 937.51 ns 940.60 ns]
change: [−71.488% −71.340% −71.208%] (p = 0.00 <
0.05)
Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000]
time: [1.6485 µs 1.6528 µs 1.6566 µs]
change: [−74.307% −74.190% −74.088%] (p = 0.00 <
0.05)
Performance has improved.
interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
time: [1.6723 µs 1.6782 µs 1.6842 µs]
change: [−74.664% −74.544% −74.438%] (p = 0.00 <
0.05)
Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
1 (1.00%) low mild
1 (1.00%) high severe
interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000]
time: [1.5985 µs 1.6064 µs 1.6148 µs]
change: [−12.510% −12.116% −11.715%] (p = 0.00 <
0.05)
Performance has improved.
Found 19 outliers among 100 measurements (19.00%)
10 (10.00%) low mild
6 (6.00%) high mild
3 (3.00%) high severe
interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000]
time: [1.9310 µs 1.9466 µs 1.9680 µs]
change: [−41.488% −41.091% −40.628%] (p = 0.00 <
0.05)
Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
3 (3.00%) low mild
6 (6.00%) high mild
6 (6.00%) high severe
interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000]
time: [2.7812 µs 2.8516 µs 2.9401 µs]
change: [−56.097% −55.276% −54.274%] (p = 0.00 <
0.05)
Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
8 (8.00%) high mild
7 (7.00%) high severe
interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
time: [3.4926 µs 3.6558 µs 3.8427 µs]
change: [−48.423% −46.405% −44.379%] (p = 0.00 <
0.05)
Performance has improved.
interleave dict_distinct 100
time: [2.0013 µs 2.0106 µs 2.0205 µs]
change: [−1.6162% −1.0465% −0.4647%] (p = 0.00 <
0.05)
Change within noise threshold.
Found 4 outliers among 100 measurements (4.00%)
4 (4.00%) high mild
interleave dict_distinct 1024
time: [1.9784 µs 1.9855 µs 1.9924 µs]
change: [−2.4655% −1.8461% −1.2265%] (p = 0.00 <
0.05)
Performance has improved.
interleave dict_distinct 2048
time: [1.9832 µs 1.9959 µs 2.0087 µs]
change: [−2.9917% −2.3003% −1.6062%] (p = 0.00 <
0.05)
Performance has improved.
```
# Which issue does this PR close?
Specific performance improvement, I believe issue is redundant.
# Rationale for this change
See commit message/PR description.
# What changes are included in this PR?
Ditto.
# Are these changes tested?
This PR adds an additional test for interleave_fallback dictionaries
with nulls in addition to the existing tests.
# Are there any user-facing changes?
No.
Signed-off-by: Alfonso Subiotto Marques <[email protected]>
---
arrow-select/src/concat.rs | 2 +-
arrow-select/src/dictionary.rs | 17 ++++-
arrow-select/src/interleave.rs | 155 ++++++++++++++++++++++++++++++++++++++++-
3 files changed, 168 insertions(+), 6 deletions(-)
diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index 29c52b192b..81b24827e3 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -107,7 +107,7 @@ fn concat_dictionaries<K: ArrowDictionaryKeyType>(
.inspect(|d| output_len += d.len())
.collect();
- if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
+ if !should_merge_dictionary_values::<K>(&dictionaries, output_len).0 {
return concat_fallback(arrays, Capacities::Array(output_len));
}
diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs
index 266842bcc7..5b32f4e761 100644
--- a/arrow-select/src/dictionary.rs
+++ b/arrow-select/src/dictionary.rs
@@ -174,10 +174,14 @@ type PtrEq = fn(&dyn Array, &dyn Array) -> bool;
/// some return over the naive approach used by MutableArrayData
///
/// `len` is the total length of the merged output
+///
+/// Returns `(should_merge, has_overflow)` where:
+/// - `should_merge`: whether dictionary values should be merged
+/// - `has_overflow`: whether the combined dictionary values would overflow
the key type
pub(crate) fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
dictionaries: &[&DictionaryArray<K>],
len: usize,
-) -> bool {
+) -> (bool, bool) {
use DataType::*;
let first_values = dictionaries[0].values().as_ref();
let ptr_eq: PtrEq = match first_values.data_type() {
@@ -187,7 +191,11 @@ pub(crate) fn should_merge_dictionary_values<K:
ArrowDictionaryKeyType>(
LargeBinary => bytes_ptr_eq::<LargeBinaryType>,
dt => {
if !dt.is_primitive() {
- return false;
+ return (
+ false,
+ K::Native::from_usize(dictionaries.iter().map(|d|
d.values().len()).sum())
+ .is_none(),
+ );
}
|a, b| a.to_data().ptr_eq(&b.to_data())
}
@@ -206,7 +214,10 @@ pub(crate) fn should_merge_dictionary_values<K:
ArrowDictionaryKeyType>(
let overflow = K::Native::from_usize(total_values).is_none();
let values_exceed_length = total_values >= len;
- !single_dictionary && (overflow || values_exceed_length)
+ (
+ !single_dictionary && (overflow || values_exceed_length),
+ overflow,
+ )
}
/// Given an array of dictionaries and an optional key mask compute a values
array
diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs
index 1453995a0a..cb3ca655dc 100644
--- a/arrow-select/src/interleave.rs
+++ b/arrow-select/src/interleave.rs
@@ -17,6 +17,7 @@
//! Interleave elements from multiple arrays
+use crate::concat::concat;
use crate::dictionary::{merge_dictionary_values,
should_merge_dictionary_values};
use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
use arrow_array::cast::AsArray;
@@ -195,8 +196,14 @@ fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
let dictionaries: Vec<_> = arrays.iter().map(|x|
x.as_dictionary::<K>()).collect();
- if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
- return interleave_fallback(arrays, indices);
+ let (should_merge, has_overflow) =
+ should_merge_dictionary_values::<K>(&dictionaries, indices.len());
+ if !should_merge {
+ return if has_overflow {
+ interleave_fallback(arrays, indices)
+ } else {
+ interleave_fallback_dictionary::<K>(&dictionaries, indices)
+ };
}
let masks: Vec<_> = dictionaries
@@ -346,6 +353,76 @@ fn interleave_fallback(
Ok(make_array(array_data.freeze()))
}
+/// Fallback implementation for interleaving dictionaries when it was
determined
+/// that the dictionary values should not be merged. This implementation
concatenates
+/// the value slices and recomputes the resulting dictionary keys.
+///
+/// # Panics
+///
+/// This function assumes that the combined dictionary values will not
overflow the
+/// key type. Callers must verify this condition
[`should_merge_dictionary_values`]
+/// before calling this function.
+fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
+ dictionaries: &[&DictionaryArray<K>],
+ indices: &[(usize, usize)],
+) -> Result<ArrayRef, ArrowError> {
+ let relative_offsets: Vec<usize> = dictionaries
+ .iter()
+ .scan(0usize, |offset, dict| {
+ let current = *offset;
+ *offset += dict.values().len();
+ Some(current)
+ })
+ .collect();
+ let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d|
d.values().as_ref()).collect();
+ let concatenated_values = concat(&all_values)?;
+
+ let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
+ let (new_keys, nulls) = if any_nulls {
+ let mut has_nulls = false;
+ let new_keys: Vec<K::Native> = indices
+ .iter()
+ .map(|(array, row)| {
+ let old_keys = dictionaries[*array].keys();
+ if old_keys.is_valid(*row) {
+ let old_key = old_keys.values()[*row].as_usize();
+ K::Native::from_usize(relative_offsets[*array] + old_key)
+ .expect("key overflow should be checked by caller")
+ } else {
+ has_nulls = true;
+ K::Native::ZERO
+ }
+ })
+ .collect();
+
+ let nulls = if has_nulls {
+ let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
+ let (array, row) = indices[i];
+ dictionaries[array].keys().is_valid(row)
+ });
+ Some(NullBuffer::new(null_buffer))
+ } else {
+ None
+ };
+ (new_keys, nulls)
+ } else {
+ let new_keys: Vec<K::Native> = indices
+ .iter()
+ .map(|(array, row)| {
+ let old_key =
dictionaries[*array].keys().values()[*row].as_usize();
+ K::Native::from_usize(relative_offsets[*array] + old_key)
+ .expect("key overflow should be checked by caller")
+ })
+ .collect();
+ (new_keys, None)
+ };
+
+ let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
+ // SAFETY: keys_array is constructed from a valid set of keys.
+ let array = unsafe { DictionaryArray::new_unchecked(keys_array,
concatenated_values) };
+ Ok(Arc::new(array))
+}
+
/// Interleave rows by index from multiple [`RecordBatch`] instances and
return a new [`RecordBatch`].
///
/// This function will call [`interleave`] on each array of the
[`RecordBatch`] instances and assemble a new [`RecordBatch`].
@@ -412,6 +489,7 @@ mod tests {
use super::*;
use arrow_array::Int32RunArray;
use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
+ use arrow_array::types::Int8Type;
use arrow_schema::Field;
#[test]
@@ -509,6 +587,41 @@ mod tests {
assert_eq!(actual, expected);
}
+ #[test]
+ fn test_interleave_dictionary_overflow_same_values() {
+ let values: ArrayRef = Arc::new(StringArray::from_iter_values(
+ (0..50).map(|i| format!("v{i}")),
+ ));
+
+ // With 3 dictionaries of 50 values each, relative_offsets = [0, 50,
100]
+ // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows
Int8
+ // (max 127).
+ // This test case falls back to interleave_fallback because the
+ // dictionaries share the same underlying values slice.
+ let dict1 = DictionaryArray::<Int8Type>::new(
+ Int8Array::from_iter_values([0, 1, 2]),
+ values.clone(),
+ );
+ let dict2 = DictionaryArray::<Int8Type>::new(
+ Int8Array::from_iter_values([0, 1, 2]),
+ values.clone(),
+ );
+ let dict3 =
+
DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]),
values.clone());
+
+ let indices = &[(0, 0), (1, 0), (2, 0)];
+ let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
+
+ let dict_result = result.as_dictionary::<Int8Type>();
+ let string_result: Vec<_> = dict_result
+ .downcast_dict::<StringArray>()
+ .unwrap()
+ .into_iter()
+ .map(|x| x.unwrap())
+ .collect();
+ assert_eq!(string_result, vec!["v0", "v0", "v49"]);
+ }
+
#[test]
fn test_lists() {
// [[1, 2], null, [3]]
@@ -1182,4 +1295,42 @@ mod tests {
assert_eq!(v.len(), 1);
assert_eq!(v.data_type(), &DataType::Struct(fields));
}
+
+ #[test]
+ fn test_interleave_fallback_dictionary_with_nulls() {
+ let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
+ let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
+ let dict_a = DictionaryArray::new(input_1_keys,
Arc::new(input_1_values));
+
+ let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
+ let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
+ let dict_b = DictionaryArray::new(input_2_keys,
Arc::new(input_2_values));
+
+ let indices = vec![
+ (0, 0), // "foo"
+ (0, 1), // null
+ (1, 0), // "baz"
+ (1, 2), // null
+ (0, 2), // "bar"
+ (1, 1), // "qux"
+ ];
+
+ let result =
+ interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b],
&indices).unwrap();
+ let dict_result = result.as_dictionary::<Int32Type>();
+
+ let string_result =
dict_result.downcast_dict::<StringArray>().unwrap();
+ let collected: Vec<_> = string_result.into_iter().collect();
+ assert_eq!(
+ collected,
+ vec![
+ Some("foo"),
+ None,
+ Some("baz"),
+ None,
+ Some("bar"),
+ Some("qux")
+ ]
+ );
+ }
}