This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d21a40c64e Avoid concat in `array_append` (#8137)
d21a40c64e is described below
commit d21a40c64e8227eb1ef44b40e9f97e22b9b9f838
Author: Jay Zhan <[email protected]>
AuthorDate: Tue Nov 14 02:27:44 2023 +0800
Avoid concat in `array_append` (#8137)
* clean array_append
Signed-off-by: jayzhan211 <[email protected]>
* done
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
---
datafusion/physical-expr/src/array_expressions.rs | 177 +++++++++++-----------
1 file changed, 85 insertions(+), 92 deletions(-)
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 54452e3653..73ef0ea6da 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -579,57 +579,85 @@ pub fn array_pop_back(args: &[ArrayRef]) ->
Result<ArrayRef> {
)
}
+/// Appends or prepends elements to a ListArray.
+///
+/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean
flag
+/// indicating whether to append or prepend the elements. It returns a
`Result<ArrayRef>`
+/// representing the resulting ListArray after the operation.
+///
+/// # Arguments
+///
+/// * `list_array` - A reference to the ListArray to which elements will be
appended/prepended.
+/// * `element_array` - A reference to the Array containing elements to be
appended/prepended.
+/// * `field` - A reference to the Field describing the data type of the
arrays.
+/// * `is_append` - A boolean flag indicating whether to append (`true`) or
prepend (`false`) elements.
+///
+/// # Examples
+///
+/// general_append_and_prepend(
+/// [1, 2, 3], 4, append => [1, 2, 3, 4]
+/// 5, [6, 7, 8], prepend => [5, 6, 7, 8]
+/// )
+fn general_append_and_prepend(
+ list_array: &ListArray,
+ element_array: &ArrayRef,
+ data_type: &DataType,
+ is_append: bool,
+) -> Result<ArrayRef> {
+ let mut offsets = vec![0];
+ let values = list_array.values();
+ let original_data = values.to_data();
+ let element_data = element_array.to_data();
+ let capacity = Capacities::Array(original_data.len() + element_data.len());
+
+ let mut mutable = MutableArrayData::with_capacities(
+ vec![&original_data, &element_data],
+ false,
+ capacity,
+ );
+
+ let values_index = 0;
+ let element_index = 1;
+
+ for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
+ let start = offset_window[0] as usize;
+ let end = offset_window[1] as usize;
+ if is_append {
+ mutable.extend(values_index, start, end);
+ mutable.extend(element_index, row_index, row_index + 1);
+ } else {
+ mutable.extend(element_index, row_index, row_index + 1);
+ mutable.extend(values_index, start, end);
+ }
+ offsets.push(offsets[row_index] + (end - start + 1) as i32);
+ }
+
+ let data = mutable.freeze();
+
+ Ok(Arc::new(ListArray::try_new(
+ Arc::new(Field::new("item", data_type.to_owned(), true)),
+ OffsetBuffer::new(offsets.into()),
+ arrow_array::make_array(data),
+ None,
+ )?))
+}
+
/// Array_append SQL function
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
- let arr = as_list_array(&args[0])?;
- let element = &args[1];
+ let list_array = as_list_array(&args[0])?;
+ let element_array = &args[1];
- check_datatypes("array_append", &[arr.values(), element])?;
- let res = match arr.value_type() {
+ check_datatypes("array_append", &[list_array.values(), element_array])?;
+ let res = match list_array.value_type() {
DataType::List(_) => concat_internal(args)?,
- DataType::Null => return make_array(&[element.to_owned()]),
+ DataType::Null => return make_array(&[element_array.to_owned()]),
data_type => {
- let mut new_values = vec![];
- let mut offsets = vec![0];
-
- let elem_data = element.to_data();
- for (row_index, arr) in arr.iter().enumerate() {
- let new_array = if let Some(arr) = arr {
- let original_data = arr.to_data();
- let capacity = Capacities::Array(original_data.len() + 1);
- let mut mutable = MutableArrayData::with_capacities(
- vec![&original_data, &elem_data],
- false,
- capacity,
- );
- mutable.extend(0, 0, original_data.len());
- mutable.extend(1, row_index, row_index + 1);
- let data = mutable.freeze();
- arrow_array::make_array(data)
- } else {
- let capacity = Capacities::Array(1);
- let mut mutable = MutableArrayData::with_capacities(
- vec![&elem_data],
- false,
- capacity,
- );
- mutable.extend(0, row_index, row_index + 1);
- let data = mutable.freeze();
- arrow_array::make_array(data)
- };
- offsets.push(offsets[row_index] + new_array.len() as i32);
- new_values.push(new_array);
- }
-
- let new_values: Vec<_> = new_values.iter().map(|a|
a.as_ref()).collect();
- let values = arrow::compute::concat(&new_values)?;
-
- Arc::new(ListArray::try_new(
- Arc::new(Field::new("item", data_type.to_owned(), true)),
- OffsetBuffer::new(offsets.into()),
- values,
- None,
- )?)
+ return general_append_and_prepend(
+ list_array,
+ element_array,
+ &data_type,
+ true,
+ );
}
};
@@ -638,55 +666,20 @@ pub fn array_append(args: &[ArrayRef]) ->
Result<ArrayRef> {
/// Array_prepend SQL function
pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
- let element = &args[0];
- let arr = as_list_array(&args[1])?;
+ let list_array = as_list_array(&args[1])?;
+ let element_array = &args[0];
- check_datatypes("array_prepend", &[element, arr.values()])?;
- let res = match arr.value_type() {
+ check_datatypes("array_prepend", &[element_array, list_array.values()])?;
+ let res = match list_array.value_type() {
DataType::List(_) => concat_internal(args)?,
- DataType::Null => return make_array(&[element.to_owned()]),
+ DataType::Null => return make_array(&[element_array.to_owned()]),
data_type => {
- let mut new_values = vec![];
- let mut offsets = vec![0];
-
- let elem_data = element.to_data();
- for (row_index, arr) in arr.iter().enumerate() {
- let new_array = if let Some(arr) = arr {
- let original_data = arr.to_data();
- let capacity = Capacities::Array(original_data.len() + 1);
- let mut mutable = MutableArrayData::with_capacities(
- vec![&original_data, &elem_data],
- false,
- capacity,
- );
- mutable.extend(1, row_index, row_index + 1);
- mutable.extend(0, 0, original_data.len());
- let data = mutable.freeze();
- arrow_array::make_array(data)
- } else {
- let capacity = Capacities::Array(1);
- let mut mutable = MutableArrayData::with_capacities(
- vec![&elem_data],
- false,
- capacity,
- );
- mutable.extend(0, row_index, row_index + 1);
- let data = mutable.freeze();
- arrow_array::make_array(data)
- };
- offsets.push(offsets[row_index] + new_array.len() as i32);
- new_values.push(new_array);
- }
-
- let new_values: Vec<_> = new_values.iter().map(|a|
a.as_ref()).collect();
- let values = arrow::compute::concat(&new_values)?;
-
- Arc::new(ListArray::try_new(
- Arc::new(Field::new("item", data_type.to_owned(), true)),
- OffsetBuffer::new(offsets.into()),
- values,
- None,
- )?)
+ return general_append_and_prepend(
+ list_array,
+ element_array,
+ &data_type,
+ false,
+ );
}
};