This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a49af1de85 fix: [9018]Fixed RunArray slice offsets(row, cast, eq)
(#9213)
a49af1de85 is described below
commit a49af1de8543b844430d799dff89d125a6f87221
Author: Manish Kumar <[email protected]>
AuthorDate: Wed Jan 28 04:17:29 2026 +0530
fix: [9018]Fixed RunArray slice offsets(row, cast, eq) (#9213)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Closes #9018 .
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
The existing implementation had issue for row or cast ops for RunArray.
The equality implementation also did not support logical index-based
comparisons.
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
1. Fixed RunArray row and cast handling
2. Added logical index based equality check for RunArray.
# Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
4. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
No.
---
arrow-array/src/array/run_array.rs | 111 +++++++++++++++++++++++++
arrow-buffer/src/buffer/run.rs | 13 ++-
arrow-cast/src/cast/mod.rs | 28 +++++++
arrow-cast/src/cast/run_array.rs | 16 ++--
arrow-data/src/equal/run.rs | 166 +++++++++++++++++++++++++++----------
arrow-row/src/lib.rs | 8 +-
arrow-row/src/run.rs | 21 +++--
7 files changed, 299 insertions(+), 64 deletions(-)
diff --git a/arrow-array/src/array/run_array.rs
b/arrow-array/src/array/run_array.rs
index aa4d798c77..4770bad05e 100644
--- a/arrow-array/src/array/run_array.rs
+++ b/arrow-array/src/array/run_array.rs
@@ -141,6 +141,9 @@ impl<R: RunEndIndexType> RunArray<R> {
///
/// [`values`]: Self::values
pub fn values_slice(&self) -> ArrayRef {
+ if self.is_empty() {
+ return self.values.slice(0, 0);
+ }
let start = self.get_start_physical_index();
let end = self.get_end_physical_index();
self.values.slice(start, end - start + 1)
@@ -653,6 +656,7 @@ mod tests {
use super::*;
use crate::builder::PrimitiveRunBuilder;
use crate::cast::AsArray;
+ use crate::new_empty_array;
use crate::types::{Int8Type, UInt32Type};
use crate::{Int16Array, Int32Array, StringArray};
@@ -750,6 +754,26 @@ mod tests {
assert_eq!(run_ends.values(), &run_ends_values);
}
+ #[test]
+ fn test_run_array_empty() {
+ let runs = new_empty_array(&DataType::Int16);
+ let runs = runs.as_primitive::<Int16Type>();
+ let values = new_empty_array(&DataType::Int64);
+ let array = RunArray::try_new(runs, &values).unwrap();
+
+ fn assertions(array: &RunArray<Int16Type>) {
+ assert!(array.is_empty());
+ assert_eq!(array.get_start_physical_index(), 0);
+ assert_eq!(array.get_end_physical_index(), 0);
+
assert!(array.get_physical_indices::<i16>(&[]).unwrap().is_empty());
+ assert!(array.run_ends().is_empty());
+ assert_eq!(array.run_ends().sliced_values().count(), 0);
+ }
+
+ assertions(&array);
+ assertions(&array.slice(0, 0));
+ }
+
#[test]
fn test_run_array_fmt_debug() {
let mut builder = PrimitiveRunBuilder::<Int16Type,
UInt32Type>::with_capacity(3);
@@ -1184,4 +1208,91 @@ mod tests {
let values_slice2 = values_slice2.as_primitive::<Int32Type>();
assert_eq!(values_slice2.values(), &[1]);
}
+
+ #[test]
+ fn test_run_array_values_slice_empty() {
+ let run_ends = Int32Array::from(vec![2, 5, 10]);
+ let values = StringArray::from(vec!["a", "b", "c"]);
+ let array = RunArray::<Int32Type>::try_new(&run_ends,
&values).unwrap();
+
+ let slice = array.slice(0, 0);
+ assert_eq!(slice.len(), 0);
+
+ let values_slice = slice.values_slice();
+ assert_eq!(values_slice.len(), 0);
+ assert_eq!(values_slice.data_type(), &DataType::Utf8);
+ }
+
+ #[test]
+ fn test_run_array_eq_empty() {
+ let run_ends = Int32Array::from(vec![2, 5, 10]);
+ let values = StringArray::from(vec!["a", "b", "c"]);
+ let array = RunArray::<Int32Type>::try_new(&run_ends,
&values).unwrap();
+
+ let slice1 = array.slice(0, 0);
+ let slice2 = array.slice(1, 0);
+ let slice3 = array.slice(10, 0);
+
+ assert_eq!(slice1, slice2);
+ assert_eq!(slice2, slice3);
+
+ let empty_array = new_empty_array(array.data_type());
+ let empty_array =
crate::cast::as_run_array::<Int32Type>(empty_array.as_ref());
+
+ assert_eq!(&slice1, empty_array);
+ }
+
+ #[test]
+ fn test_run_array_eq_diff_physical_same_logical() {
+ let run_ends1 = Int32Array::from(vec![1, 3, 6]);
+ let values1 = StringArray::from(vec!["a", "b", "c"]);
+ let array1 = RunArray::<Int32Type>::try_new(&run_ends1,
&values1).unwrap();
+
+ let run_ends2 = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
+ let values2 = StringArray::from(vec!["a", "b", "b", "c", "c", "c"]);
+ let array2 = RunArray::<Int32Type>::try_new(&run_ends2,
&values2).unwrap();
+
+ assert_eq!(array1, array2);
+ }
+
+ #[test]
+ fn test_run_array_eq_sliced() {
+ let run_ends1 = Int32Array::from(vec![2, 5, 10]);
+ let values1 = StringArray::from(vec!["a", "b", "c"]);
+ let array1 = RunArray::<Int32Type>::try_new(&run_ends1,
&values1).unwrap();
+ // Logical: a, a, b, b, b, c, c, c, c, c
+
+ let slice1 = array1.slice(1, 6);
+ // Logical: a, b, b, b, c, c
+
+ let run_ends2 = Int32Array::from(vec![1, 4, 6]);
+ let values2 = StringArray::from(vec!["a", "b", "c"]);
+ let array2 = RunArray::<Int32Type>::try_new(&run_ends2,
&values2).unwrap();
+ // Logical: a, b, b, b, c, c
+
+ assert_eq!(slice1, array2);
+
+ let slice2 = array1.slice(2, 3);
+ // Logical: b, b, b
+ let run_ends3 = Int32Array::from(vec![3]);
+ let values3 = StringArray::from(vec!["b"]);
+ let array3 = RunArray::<Int32Type>::try_new(&run_ends3,
&values3).unwrap();
+ assert_eq!(slice2, array3);
+ }
+
+ #[test]
+ fn test_run_array_eq_sliced_different_offsets() {
+ let run_ends1 = Int32Array::from(vec![2, 5, 10]);
+ let values1 = StringArray::from(vec!["a", "b", "c"]);
+ let array1 = RunArray::<Int32Type>::try_new(&run_ends1,
&values1).unwrap();
+ let array2 = array1.clone();
+ assert_eq!(array1, array2);
+
+ let slice1 = array1.slice(1, 4); // a, b, b, b
+ let slice2 = array1.slice(1, 4);
+ assert_eq!(slice1, slice2);
+
+ let slice3 = array1.slice(0, 4); // a, a, b, b
+ assert_ne!(slice1, slice3);
+ }
}
diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs
index 6603dec1ba..0f4d9234e4 100644
--- a/arrow-buffer/src/buffer/run.rs
+++ b/arrow-buffer/src/buffer/run.rs
@@ -199,9 +199,16 @@ where
pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
let offset = self.logical_offset;
let len = self.logical_length;
- let start = self.get_start_physical_index();
- let end = self.get_end_physical_index();
- self.run_ends[start..=end].iter().map(move |&val| {
+ // Doing this roundabout way since the iterator type we return must be
+ // the same (i.e. cannot use std::iter::empty())
+ let physical_slice = if self.is_empty() {
+ &self.run_ends[0..0]
+ } else {
+ let start = self.get_start_physical_index();
+ let end = self.get_end_physical_index();
+ &self.run_ends[start..=end]
+ };
+ physical_slice.iter().map(move |&val| {
let val = val.as_usize().saturating_sub(offset).min(len);
E::from_usize(val).unwrap()
})
diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs
index da2d6a54ae..6c1629a820 100644
--- a/arrow-cast/src/cast/mod.rs
+++ b/arrow-cast/src/cast/mod.rs
@@ -12590,4 +12590,32 @@ mod tests {
assert_eq!(casted.as_ref(), &expected);
}
}
+
+ #[test]
+ fn test_cast_between_sliced_run_end_encoded() {
+ let run_ends = Int16Array::from(vec![2, 5, 8]);
+ let values = StringArray::from(vec!["a", "b", "c"]);
+
+ let ree_array = RunArray::<Int16Type>::try_new(&run_ends,
&values).unwrap();
+ let ree_array = ree_array.slice(1, 2);
+ let array_ref = Arc::new(ree_array) as ArrayRef;
+
+ let target_type = DataType::RunEndEncoded(
+ Arc::new(Field::new("run_ends", DataType::Int64, false)),
+ Arc::new(Field::new("values", DataType::Utf8, true)),
+ );
+ let cast_options = CastOptions {
+ safe: false,
+ format_options: FormatOptions::default(),
+ };
+
+ let result = cast_with_options(&array_ref, &target_type,
&cast_options).unwrap();
+ let run_array = result.as_run::<Int64Type>();
+ let run_array = run_array.downcast::<StringArray>().unwrap();
+
+ let expected = vec!["a", "b"];
+ let actual = run_array.into_iter().flatten().collect::<Vec<_>>();
+
+ assert_eq!(expected, actual);
+ }
}
diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs
index 3e14804dc8..9878f09772 100644
--- a/arrow-cast/src/cast/run_array.rs
+++ b/arrow-cast/src/cast/run_array.rs
@@ -32,17 +32,18 @@ pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
.downcast_ref::<RunArray<K>>()
.ok_or_else(|| ArrowError::CastError("Expected
RunArray".to_string()))?;
- let values = run_array.values();
-
match to_type {
// Stay as RunEndEncoded, cast only the values
DataType::RunEndEncoded(target_index_field,
target_value_field) => {
- let cast_values =
- cast_with_options(values,
target_value_field.data_type(), cast_options)?;
+ let values = run_array.values_slice();
+ let cast_values = cast_with_options(
+ values.as_ref(),
+ target_value_field.data_type(),
+ cast_options,
+ )?;
- let run_ends_array = PrimitiveArray::<K>::from_iter_values(
- run_array.run_ends().values().iter().copied(),
- );
+ let run_ends_array =
+
PrimitiveArray::<K>::from_iter_values(run_array.run_ends().sliced_values());
let cast_run_ends = cast_with_options(
&run_ends_array,
target_index_field.data_type(),
@@ -72,6 +73,7 @@ pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
// Expand to logical form
_ => {
+ let values = run_array.values();
let len = run_array.len();
let offset = run_array.offset();
let run_ends = run_array.run_ends().values();
diff --git a/arrow-data/src/equal/run.rs b/arrow-data/src/equal/run.rs
index 6c9393ecd8..0032b56d1a 100644
--- a/arrow-data/src/equal/run.rs
+++ b/arrow-data/src/equal/run.rs
@@ -16,13 +16,17 @@
// under the License.
use crate::data::ArrayData;
+use arrow_buffer::ArrowNativeType;
+use arrow_buffer::RunEndBuffer;
+use arrow_schema::DataType;
+use num_traits::ToPrimitive;
use super::equal_range;
-/// The current implementation of comparison of run array support physical
comparison.
-/// Comparing run encoded array based on logical indices (`lhs_start`,
`rhs_start`) will
-/// be time consuming as converting from logical index to physical index
cannot be done
-/// in constant time. The current comparison compares the underlying physical
arrays.
+/// Returns true if the two `RunEndEncoded` arrays are equal.
+///
+/// This provides a specialized implementation of equality for REE arrays that
+/// handles differences in run-encoding by iterating through the logical range.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
@@ -30,57 +34,129 @@ pub(super) fn run_equal(
rhs_start: usize,
len: usize,
) -> bool {
- if lhs_start != 0
- || rhs_start != 0
- || (lhs.len() != len && rhs.len() != len)
- || lhs.offset() > 0
- || rhs.offset() > 0
- {
- unimplemented!("Logical comparison for run array not supported.")
+ let lhs_index_type = match lhs.data_type() {
+ DataType::RunEndEncoded(f, _) => f.data_type(),
+ _ => unreachable!(),
+ };
+
+ match lhs_index_type {
+ DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start,
rhs_start, len),
+ DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start,
rhs_start, len),
+ _ => unreachable!(),
}
+}
+
+struct RunArrayData<'a, T: ArrowNativeType> {
+ run_ends: RunEndBuffer<T>,
+ values: &'a ArrayData,
+}
+
+impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
+ fn new(data: &'a ArrayData, start: usize, len: usize) -> Self {
+ debug_assert!(
+ data.child_data().len() == 2,
+ "RunEndEncoded arrays are guaranteed to have 2 children [run_ends,
values]"
+ );
+ let run_ends_data = &data.child_data()[0];
+ let raw_run_ends_buffer = &run_ends_data.buffers()[0];
+ // SAFETY: we're reconstructing RunEndBuffer from a known valid
RunArray
+ let run_ends = unsafe {
+ RunEndBuffer::<T>::new_unchecked(
+ raw_run_ends_buffer.clone().into(),
+ run_ends_data.offset() + data.offset() + start,
+ len,
+ )
+ };
- if lhs.len() != rhs.len() {
- return false;
+ let values = &data.child_data()[1];
+ Self { run_ends, values }
}
- let lhs_child_data = lhs.child_data();
- let lhs_run_ends_array = &lhs_child_data[0];
- let lhs_values_array = &lhs_child_data[1];
+ fn run_end(&self, index: usize) -> usize {
+ self.run_ends.values()[index].as_usize()
+ }
- let rhs_child_data = rhs.child_data();
- let rhs_run_ends_array = &rhs_child_data[0];
- let rhs_values_array = &rhs_child_data[1];
+ fn get_start_end_physical_indices(&self) -> (usize, usize) {
+ let start = self.run_ends.get_start_physical_index();
+ let end = self.run_ends.get_end_physical_index();
+ (start, end)
+ }
+}
- if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
- return false;
+fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
+ lhs: &ArrayData,
+ rhs: &ArrayData,
+ lhs_start: usize,
+ rhs_start: usize,
+ len: usize,
+) -> bool {
+ if len == 0 {
+ return true;
}
- if lhs_values_array.len() != rhs_values_array.len() {
- return false;
+ let l_array = RunArrayData::<T>::new(lhs, lhs_start, len);
+ let r_array = RunArrayData::<T>::new(rhs, rhs_start, len);
+
+ let (l_start_phys, l_end_phys) = l_array.get_start_end_physical_indices();
+ let (r_start_phys, r_end_phys) = r_array.get_start_end_physical_indices();
+ let l_runs = l_end_phys - l_start_phys + 1;
+ let r_runs = r_end_phys - r_start_phys + 1;
+
+ if l_runs == r_runs {
+ // When the boundaries align perfectly, we don't need the complex
stepping loop that calculates overlaps.
+ // Instead, we can simply treat the underlying values arrays as if
they were standard primitive arrays.
+ let l_iter = l_array.run_ends.sliced_values();
+ let r_iter = r_array.run_ends.sliced_values();
+ let physical_match = l_iter.zip(r_iter).all(|(l_re, r_re)| l_re ==
r_re);
+
+ if physical_match {
+ // Both arrays are partitioned identically.
+ // We can just verify if the physical values in those partitions
match.
+ return equal_range(
+ l_array.values,
+ r_array.values,
+ l_start_phys,
+ r_start_phys,
+ l_runs,
+ );
+ }
}
- // check run ends array are equal. The length of the physical array
- // is used to validate the child arrays.
- let run_ends_equal = equal_range(
- lhs_run_ends_array,
- rhs_run_ends_array,
- lhs_start,
- rhs_start,
- lhs_run_ends_array.len(),
- );
-
- // if run ends array are not the same return early without validating
- // values array.
- if !run_ends_equal {
- return false;
+ let mut l_phys = l_start_phys;
+ let mut r_phys = r_start_phys;
+ let mut processed = 0;
+ while processed < len {
+ if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) {
+ return false;
+ }
+
+ let l_run_end = l_array.run_end(l_phys);
+ let r_run_end = r_array.run_end(r_phys);
+
+ //Calculate how many more logical elements are in the current run of
the left and right array
+ let l_remaining_in_run = l_run_end - (l_array.run_ends.offset() +
processed);
+ let r_remaining_in_run = r_run_end - (r_array.run_ends.offset() +
processed);
+
+ //Calculate how many elements are left to compare in the requested
range
+ let remaining_in_range = len - processed;
+
+ //Find the smallest of these three to determine our step size
+ //The goal is to move the logical cursor (processed) forward as far as
possible without:
+ //Crossing the boundary of a run in the left or right array (where the
value might change).
+ //Going past the total length we were asked to compare.
+ let step = l_remaining_in_run
+ .min(r_remaining_in_run)
+ .min(remaining_in_range);
+ processed += step;
+
+ if l_array.run_ends.offset() + processed == l_run_end {
+ l_phys += 1;
+ }
+ if r_array.run_ends.offset() + processed == r_run_end {
+ r_phys += 1;
+ }
}
- // check values array are equal
- equal_range(
- lhs_values_array,
- rhs_values_array,
- lhs_start,
- rhs_start,
- rhs_values_array.len(),
- )
+ true
}
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index fdad413e0e..d535d90cef 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -657,14 +657,14 @@ impl Codec {
Codec::RunEndEncoded(converter) => {
let values = match array.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
- DataType::Int16 =>
array.as_run::<Int16Type>().values(),
- DataType::Int32 =>
array.as_run::<Int32Type>().values(),
- DataType::Int64 =>
array.as_run::<Int64Type>().values(),
+ DataType::Int16 =>
array.as_run::<Int16Type>().values_slice(),
+ DataType::Int32 =>
array.as_run::<Int32Type>().values_slice(),
+ DataType::Int64 =>
array.as_run::<Int64Type>().values_slice(),
_ => unreachable!("Unsupported run end index type:
{r:?}"),
},
_ => unreachable!(),
};
- let rows =
converter.convert_columns(std::slice::from_ref(values))?;
+ let rows =
converter.convert_columns(std::slice::from_ref(&values))?;
Ok(Encoder::RunEndEncoded(rows))
}
Codec::Union(converters, _) => {
diff --git a/arrow-row/src/run.rs b/arrow-row/src/run.rs
index e12fa87dce..f775abb635 100644
--- a/arrow-row/src/run.rs
+++ b/arrow-row/src/run.rs
@@ -27,11 +27,11 @@ pub fn compute_lengths<R: RunEndIndexType>(
rows: &Rows,
array: &RunArray<R>,
) {
- let run_ends = array.run_ends().values();
+ let run_ends = array.run_ends().sliced_values();
let mut logical_start = 0;
// Iterate over each run and apply the same length to all logical
positions in the run
- for (physical_idx, &run_end) in run_ends.iter().enumerate() {
+ for (physical_idx, run_end) in run_ends.enumerate() {
let logical_end = run_end.as_usize();
let row_len = rows.row_len(physical_idx);
let encoded_len = variable::padded_length(Some(row_len));
@@ -55,14 +55,14 @@ pub fn encode<R: RunEndIndexType>(
opts: SortOptions,
array: &RunArray<R>,
) {
- let run_ends = array.run_ends();
+ let run_ends = array.run_ends().sliced_values();
let mut logical_idx = 0;
let mut offset_idx = 1; // Skip first offset
// Iterate over each run
- for physical_idx in 0..run_ends.values().len() {
- let run_end = run_ends.values()[physical_idx].as_usize();
+ for (physical_idx, run_end) in run_ends.enumerate() {
+ let run_end = run_end.as_usize();
// Process all elements in this run
while logical_idx < run_end && offset_idx < offsets.len() {
@@ -639,4 +639,15 @@ mod tests {
let result_ree = arrays[0].as_run::<Int32Type>();
assert_eq!(result_ree.len(), 0);
}
+
+ #[test]
+ fn test_run_end_encoded_round_trip_sliced() {
+ let values = Int64Array::from(vec![100, 200, 100, 300]);
+ let run_ends = vec![2, 3, 5, 6];
+ let array: RunArray<Int16Type> =
+ RunArray::try_new(&PrimitiveArray::from(run_ends),
&values).unwrap();
+ let array = array.slice(2, 3);
+
+ assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
+ }
}