This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1bed04c1e0 Optimize coalesce kernel for StringView (10-50% faster)
(#7650)
1bed04c1e0 is described below
commit 1bed04c1e053e52575c6476f592c5aca3de7310f
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jun 20 09:24:22 2025 -0400
Optimize coalesce kernel for StringView (10-50% faster) (#7650)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/7456
# Rationale for this change
Currently the `coalesce` kernel buffers views / data until there are
enough rows and then concat's the results together. StringViewArrays can
be even worse as there is a second copy in `gc_string_view_batch`
This is wasteful because it
1. Buffers memory (has 2x the peak usage)
2. Copies the data twice
We can make it faster and more memory efficient by directly creating the
output array
# What changes are included in this PR?
1. Add a specialization for incrementally building `StringViewArray`
without buffering
Note this PR does NOT (yet) add specialized filtering -- instead it
focuses on reducing the
overhead of appending views by not copying them (again!) with
`gc_string_view_batch`
# Open questions:
1. There is substantial overlap / duplication with StringViewBuilder --
I wonder if we can / should consolidate them somehow
The differences are that the
1. Block size calculation management (aka look at the buffer sizes of
the incoming buffers)
2. Finishing array allocates sufficient space for views
# Are there any user-facing changes?
The kernel is faster, no API changes
---
arrow-array/src/array/byte_view_array.rs | 26 ++
arrow-select/src/coalesce.rs | 519 ++++++++++++++++++++++---------
arrow-select/src/coalesce/byte_view.rs | 442 ++++++++++++++++++++++++++
arrow-select/src/coalesce/generic.rs | 76 +++++
4 files changed, 915 insertions(+), 148 deletions(-)
diff --git a/arrow-array/src/array/byte_view_array.rs
b/arrow-array/src/array/byte_view_array.rs
index e837512ed0..713e275d18 100644
--- a/arrow-array/src/array/byte_view_array.rs
+++ b/arrow-array/src/array/byte_view_array.rs
@@ -479,6 +479,32 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
builder.finish()
}
+ /// Returns the total number of bytes used by all non inlined views in all
+ /// buffers.
+ ///
+ /// Note this does not account for views that point at the same underlying
+ /// data in buffers
+ ///
+ /// For example, if the array has three strings views:
+ /// * View with length = 9 (inlined)
+ /// * View with length = 32 (non inlined)
+ /// * View with length = 16 (non inlined)
+ ///
+ /// Then this method would report 48
+ pub fn total_buffer_bytes_used(&self) -> usize {
+ self.views()
+ .iter()
+ .map(|v| {
+ let len = (*v as u32) as usize;
+ if len > 12 {
+ len
+ } else {
+ 0
+ }
+ })
+ .sum()
+ }
+
/// Compare two [`GenericByteViewArray`] at index `left_idx` and
`right_idx`
///
/// Comparing two ByteView types are non-trivial.
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index fb126aa9e6..9b310c645d 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -20,17 +20,21 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
-use crate::concat::concat_batches;
use crate::filter::filter_record_batch;
-use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
-use arrow_array::{BooleanArray, StringViewArray};
-use arrow_data::ByteView;
-use arrow_schema::{ArrowError, SchemaRef};
+use arrow_array::types::{BinaryViewType, StringViewType};
+use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
+use arrow_schema::{ArrowError, DataType, SchemaRef};
use std::collections::VecDeque;
use std::sync::Arc;
// Originally From DataFusion's coalesce module:
//
https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
+mod byte_view;
+mod generic;
+
+use byte_view::InProgressByteViewArray;
+use generic::GenericInProgressArray;
+
/// Concatenate multiple [`RecordBatch`]es
///
/// Implements the common pattern of incrementally creating output
@@ -38,7 +42,8 @@ use std::sync::Arc;
/// [`RecordBatch`]es.
///
/// This is useful after operations such as [`filter`] and [`take`] that
produce
-/// smaller batches, and we want to coalesce them into larger
+/// smaller batches, and we want to coalesce them into larger batches for
+/// further processing.
///
/// [`filter`]: crate::filter::filter
/// [`take`]: crate::take::take
@@ -113,18 +118,14 @@ use std::sync::Arc;
///
/// 2. The output is a sequence of batches, with all but the last being at
exactly
/// `target_batch_size` rows.
-///
-/// 3. Eventually this may also be able to handle other optimizations such as a
-/// combined filter/coalesce operation. See
<https://github.com/apache/arrow-rs/issues/6692>
-///
#[derive(Debug)]
pub struct BatchCoalescer {
/// The input schema
schema: SchemaRef,
/// output batch size
batch_size: usize,
- /// In-progress buffered batches
- buffer: Vec<RecordBatch>,
+ /// In-progress arrays
+ in_progress_arrays: Vec<Box<dyn InProgressArray>>,
/// Buffered row count. Always less than `batch_size`
buffered_rows: usize,
/// Completed batches
@@ -140,10 +141,16 @@ impl BatchCoalescer {
/// Typical values are `4096` or `8192` rows.
///
pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
+ let in_progress_arrays = schema
+ .fields()
+ .iter()
+ .map(|field| create_in_progress_array(field.data_type(),
batch_size))
+ .collect::<Vec<_>>();
+
Self {
schema,
batch_size,
- buffer: vec![],
+ in_progress_arrays,
// We will for sure store at least one completed batch
completed: VecDeque::with_capacity(1),
buffered_rows: 0,
@@ -161,7 +168,6 @@ impl BatchCoalescer {
/// with the results from [`filter_record_batch`]
///
/// # Example
- /// # Example
/// ```
/// # use arrow_array::{record_batch, BooleanArray};
/// # use arrow_select::coalesce::BatchCoalescer;
@@ -212,32 +218,57 @@ impl BatchCoalescer {
/// assert_eq!(completed_batch, expected_batch);
/// ```
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
{
- if batch.num_rows() == 0 {
- // If the batch is empty, we don't need to do anything
+ let (_schema, arrays, mut num_rows) = batch.into_parts();
+ if num_rows == 0 {
return Ok(());
}
- let mut batch = gc_string_view_batch(batch);
+ // setup input rows
+ assert_eq!(arrays.len(), self.in_progress_arrays.len());
+ self.in_progress_arrays
+ .iter_mut()
+ .zip(arrays)
+ .for_each(|(in_progress, array)| {
+ in_progress.set_source(Some(array));
+ });
// If pushing this batch would exceed the target batch size,
// finish the current batch and start a new one
- while batch.num_rows() > (self.batch_size - self.buffered_rows) {
+ let mut offset = 0;
+ while num_rows > (self.batch_size - self.buffered_rows) {
let remaining_rows = self.batch_size - self.buffered_rows;
debug_assert!(remaining_rows > 0);
- let head_batch = batch.slice(0, remaining_rows);
- batch = batch.slice(remaining_rows, batch.num_rows() -
remaining_rows);
- self.buffered_rows += head_batch.num_rows();
- self.buffer.push(head_batch);
+
+ // Copy remaining_rows from each array
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, remaining_rows)?;
+ }
+
+ self.buffered_rows += remaining_rows;
+ offset += remaining_rows;
+ num_rows -= remaining_rows;
+
self.finish_buffered_batch()?;
}
- // Add the remaining rows to the buffer
- self.buffered_rows += batch.num_rows();
- self.buffer.push(batch);
+
+ // Add any the remaining rows to the buffer
+ self.buffered_rows += num_rows;
+ if num_rows > 0 {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, num_rows)?;
+ }
+ }
// If we have reached the target batch size, finalize the buffered
batch
if self.buffered_rows >= self.batch_size {
self.finish_buffered_batch()?;
}
+
+ // clear in progress sources (to allow the memory to be freed)
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.set_source(None);
+ }
+
Ok(())
}
@@ -249,11 +280,25 @@ impl BatchCoalescer {
///
/// See [`Self::next_completed_batch()`] for the completed batches.
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
- if self.buffer.is_empty() {
+ if self.buffered_rows == 0 {
return Ok(());
}
- let batch = concat_batches(&self.schema, &self.buffer)?;
- self.buffer.clear();
+ let new_arrays = self
+ .in_progress_arrays
+ .iter_mut()
+ .map(|array| array.finish())
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ for (array, field) in
new_arrays.iter().zip(self.schema.fields().iter()) {
+ debug_assert_eq!(array.data_type(), field.data_type());
+ debug_assert_eq!(array.len(), self.buffered_rows);
+ }
+
+ // SAFETY: each array was created of the correct type and length.
+ let batch = unsafe {
+ RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays,
self.buffered_rows)
+ };
+
self.buffered_rows = 0;
self.completed.push_back(batch);
Ok(())
@@ -261,7 +306,7 @@ impl BatchCoalescer {
/// Returns true if there is any buffered data
pub fn is_empty(&self) -> bool {
- self.buffer.is_empty() && self.completed.is_empty()
+ self.buffered_rows == 0 && self.completed.is_empty()
}
/// Returns true if there are any completed batches
@@ -275,119 +320,51 @@ impl BatchCoalescer {
}
}
-/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed
-///
-/// Decides when to consolidate the StringView into a new buffer to reduce
-/// memory usage and improve string locality for better performance.
-///
-/// This differs from `StringViewArray::gc` because:
-/// 1. It may not compact the array depending on a heuristic.
-/// 2. It uses a precise block size to reduce the number of buffers to track.
-///
-/// # Heuristic
+/// Return a new `InProgressArray` for the given data type
+fn create_in_progress_array(data_type: &DataType, batch_size: usize) ->
Box<dyn InProgressArray> {
+ match data_type {
+ DataType::Utf8View =>
Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
+ DataType::BinaryView => {
+
Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
+ }
+ _ => Box::new(GenericInProgressArray::new()),
+ }
+}
+
+/// Incrementally builds up arrays
///
-/// If the average size of each view is larger than 32 bytes, we compact the
array.
+/// [`GenericInProgressArray`] is the default implementation that buffers
+/// arrays and uses other kernels concatenates them when finished.
///
-/// `StringViewArray` include pointers to buffer that hold the underlying data.
-/// One of the great benefits of `StringViewArray` is that many operations
-/// (e.g., `filter`) can be done without copying the underlying data.
+/// Some types have specialized implementations for this array types (e.g.,
+/// [`StringViewArray`], etc.).
///
-/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
-/// `StringViewArray` may only refer to a small portion of the buffer,
-/// significantly increasing memory usage.
-fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
- let (schema, columns, num_rows) = batch.into_parts();
- let new_columns: Vec<ArrayRef> = columns
- .into_iter()
- .map(|c| {
- // Try to re-create the `StringViewArray` to prevent holding the
underlying buffer too long.
- let Some(s) = c.as_string_view_opt() else {
- return c;
- };
- if s.data_buffers().is_empty() {
- // If there are no data buffers, we can just return the array
as is
- return c;
- }
- let ideal_buffer_size: usize = s
- .views()
- .iter()
- .map(|v| {
- let len = (*v as u32) as usize;
- if len > 12 {
- len
- } else {
- 0
- }
- })
- .sum();
- let actual_buffer_size = s.get_buffer_memory_size();
- let buffers = s.data_buffers();
-
- // Re-creating the array copies data and can be time consuming.
- // We only do it if the array is sparse
- if actual_buffer_size > (ideal_buffer_size * 2) {
- if ideal_buffer_size == 0 {
- // If the ideal buffer size is 0, all views are inlined
- // so just reuse the views
- return Arc::new(unsafe {
- StringViewArray::new_unchecked(
- s.views().clone(),
- vec![],
- s.nulls().cloned(),
- )
- });
- }
- // We set the block size to `ideal_buffer_size` so that the
new StringViewArray only has one buffer, which accelerate later concat_batches.
- // See https://github.com/apache/arrow-rs/issues/6094 for more
details.
- let mut buffer: Vec<u8> =
Vec::with_capacity(ideal_buffer_size);
-
- let views: Vec<u128> = s
- .views()
- .as_ref()
- .iter()
- .cloned()
- .map(|v| {
- let mut b: ByteView = ByteView::from(v);
-
- if b.length > 12 {
- let offset = buffer.len() as u32;
- buffer.extend_from_slice(
- buffers[b.buffer_index as usize]
- .get(b.offset as usize..b.offset as usize
+ b.length as usize)
- .expect("Invalid buffer slice"),
- );
- b.offset = offset;
- b.buffer_index = 0; // Set buffer index to 0, as
we only have one buffer
- }
-
- b.into()
- })
- .collect();
-
- let buffers = if buffer.is_empty() {
- vec![]
- } else {
- vec![buffer.into()]
- };
-
- let gc_string = unsafe {
- StringViewArray::new_unchecked(views.into(), buffers,
s.nulls().cloned())
- };
-
- Arc::new(gc_string)
- } else {
- c
- }
- })
- .collect();
- unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) }
+/// [`StringViewArray`]: arrow_array::StringViewArray
+trait InProgressArray: std::fmt::Debug + Send + Sync {
+ /// Set the source array.
+ ///
+ /// Calls to [`Self::copy_rows`] will copy rows from this array into the
+ /// current in-progress array
+ fn set_source(&mut self, source: Option<ArrayRef>);
+
+ /// Copy rows from the current source array into the in-progress array
+ ///
+ /// The source array is set by [`Self::set_source`].
+ ///
+ /// Return an error if the source array is not set
+ fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError>;
+
+ /// Finish the currently in-progress array and return it as an `ArrayRef`
+ fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::concat::concat_batches;
use arrow_array::builder::StringViewBuilder;
- use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array};
+ use arrow_array::cast::AsArray;
+ use arrow_array::{BinaryViewArray, RecordBatchOptions, StringViewArray,
UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use std::ops::Range;
@@ -481,43 +458,76 @@ mod tests {
#[test]
fn test_string_view_no_views() {
- Test::new()
+ let output_batches = Test::new()
// both input batches have no views, so no need to compact
.with_batch(stringview_batch([Some("foo"), Some("bar")]))
.with_batch(stringview_batch([Some("baz"), Some("qux")]))
.with_expected_output_sizes(vec![4])
.run();
+
+ expect_buffer_layout(
+ col_as_string_view("c0", output_batches.first().unwrap()),
+ vec![],
+ );
}
#[test]
fn test_string_view_batch_small_no_compact() {
// view with only short strings (no buffers) --> no need to compact
let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"),
Some("c")]);
- let gc_batches = Test::new()
+ let output_batches = Test::new()
.with_batch(batch.clone())
.with_expected_output_sizes(vec![1000])
.run();
let array = col_as_string_view("c0", &batch);
- let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
+ let gc_array = col_as_string_view("c0",
output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 0);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len());
// no compaction
+
+ expect_buffer_layout(gc_array, vec![]);
}
#[test]
fn test_string_view_batch_large_no_compact() {
// view with large strings (has buffers) but full --> no need to
compact
let batch = stringview_batch_repeated(1000, [Some("This string is
longer than 12 bytes")]);
- let gc_batches = Test::new()
+ let output_batches = Test::new()
.with_batch(batch.clone())
.with_batch_size(1000)
.with_expected_output_sizes(vec![1000])
.run();
let array = col_as_string_view("c0", &batch);
- let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
+ let gc_array = col_as_string_view("c0",
output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len());
// no compaction
+
+ expect_buffer_layout(
+ gc_array,
+ vec![
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 2240,
+ capacity: 8192,
+ },
+ ],
+ );
}
#[test]
@@ -530,14 +540,14 @@ mod tests {
let batch = stringview_batch_repeated(1000, values)
// take only 10 short strings (no long ones)
.slice(5, 10);
- let gc_batches = Test::new()
+ let output_batches = Test::new()
.with_batch(batch.clone())
.with_batch_size(1000)
.with_expected_output_sizes(vec![10])
.run();
let array = col_as_string_view("c0", &batch);
- let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
+ let gc_array = col_as_string_view("c0",
output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 1); // input has one buffer
assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers
as only short strings
}
@@ -549,16 +559,23 @@ mod tests {
// slice only 22 rows, so most of the buffer is not used
.slice(11, 22);
- let gc_batches = Test::new()
+ let output_batches = Test::new()
.with_batch(batch.clone())
.with_batch_size(1000)
.with_expected_output_sizes(vec![22])
.run();
let array = col_as_string_view("c0", &batch);
- let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
+ let gc_array = col_as_string_view("c0",
output_batches.first().unwrap());
assert_eq!(array.data_buffers().len(), 5);
- assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a
single buffer
+
+ expect_buffer_layout(
+ gc_array,
+ vec![ExpectedLayout {
+ len: 770,
+ capacity: 8192,
+ }],
+ );
}
#[test]
@@ -581,7 +598,7 @@ mod tests {
// Several batches with mixed inline / non inline
// 4k rows in
- let gc_batches = Test::new()
+ let output_batches = Test::new()
.with_batch(large_view_batch.clone())
.with_batch(small_view_batch)
// this batch needs to be compacted (less than 1/2 full)
@@ -593,9 +610,199 @@ mod tests {
.with_expected_output_sizes(vec![1024, 1024, 1024, 968])
.run();
- let gc_array = col_as_string_view("c0", gc_batches.first().unwrap());
+ expect_buffer_layout(
+ col_as_string_view("c0", output_batches.first().unwrap()),
+ vec![
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 8190,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 2240,
+ capacity: 8192,
+ },
+ ],
+ );
+ }
- assert_eq!(gc_array.data_buffers().len(), 5);
+ #[test]
+ fn test_string_view_many_small_compact() {
+ // The strings are 28 long, so each batch has 400 * 28 = 5600 bytes
+ let batch = stringview_batch_repeated(
+ 400,
+ [Some("This string is 28 bytes long"), Some("small string")],
+ );
+ let output_batches = Test::new()
+ // First allocated buffer is 8kb.
+ // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb
(8kb, an 16kb and 32kbkb)
+ .with_batch(batch.clone())
+ .with_batch(batch.clone())
+ .with_batch(batch.clone())
+ .with_batch(batch.clone())
+ .with_batch(batch.clone())
+ .with_batch_size(8000)
+ .with_expected_output_sizes(vec![2000]) // only 2000 rows total
+ .run();
+
+ // expect a nice even distribution of buffers
+ expect_buffer_layout(
+ col_as_string_view("c0", output_batches.first().unwrap()),
+ vec![
+ ExpectedLayout {
+ len: 8176,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 16380,
+ capacity: 16384,
+ },
+ ExpectedLayout {
+ len: 3444,
+ capacity: 32768,
+ },
+ ],
+ );
+ }
+
+ #[test]
+ fn test_string_view_many_small_boundary() {
+ // The strings are designed to exactly fit into buffers that are
powers of 2 long
+ let batch = stringview_batch_repeated(100, [Some("This string is a
power of two=32")]);
+ let output_batches = Test::new()
+ .with_batches(std::iter::repeat(batch).take(20))
+ .with_batch_size(900)
+ .with_expected_output_sizes(vec![900, 900, 200])
+ .run();
+
+ // expect each buffer to be entirely full except the last one
+ expect_buffer_layout(
+ col_as_string_view("c0", output_batches.first().unwrap()),
+ vec![
+ ExpectedLayout {
+ len: 8192,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 16384,
+ capacity: 16384,
+ },
+ ExpectedLayout {
+ len: 4224,
+ capacity: 32768,
+ },
+ ],
+ );
+ }
+
+ #[test]
+ fn test_string_view_large_small() {
+ // The strings are 37 bytes long, so each batch has 200 * 28 = 5600
bytes
+ let mixed_batch = stringview_batch_repeated(
+ 400,
+ [Some("This string is 28 bytes long"), Some("small string")],
+ );
+ // These strings aren't copied, this array has an 8k buffer
+ let all_large = stringview_batch_repeated(
+ 100,
+ [Some(
+ "This buffer has only large strings in it so there are no
buffer copies",
+ )],
+ );
+
+ let output_batches = Test::new()
+ // First allocated buffer is 8kb.
+ // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb
(8kb, an 16kb and 32kbkb)
+ .with_batch(mixed_batch.clone())
+ .with_batch(mixed_batch.clone())
+ .with_batch(all_large.clone())
+ .with_batch(mixed_batch.clone())
+ .with_batch(all_large.clone())
+ .with_batch_size(8000)
+ .with_expected_output_sizes(vec![1400])
+ .run();
+
+ expect_buffer_layout(
+ col_as_string_view("c0", output_batches.first().unwrap()),
+ vec![
+ ExpectedLayout {
+ len: 8176,
+ capacity: 8192,
+ },
+ // this buffer was allocated but not used when the all_large
batch was pushed
+ ExpectedLayout {
+ len: 3024,
+ capacity: 16384,
+ },
+ ExpectedLayout {
+ len: 7000,
+ capacity: 8192,
+ },
+ ExpectedLayout {
+ len: 5600,
+ capacity: 32768,
+ },
+ ExpectedLayout {
+ len: 7000,
+ capacity: 8192,
+ },
+ ],
+ );
+ }
+
+ #[test]
+ fn test_binary_view() {
+ let values: Vec<Option<&[u8]>> = vec![
+ Some(b"foo"),
+ None,
+ Some(b"A longer string that is more than 12 bytes"),
+ ];
+
+ let binary_view =
+
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+ let batch =
+ RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as
ArrayRef)]).unwrap();
+
+ Test::new()
+ .with_batch(batch.clone())
+ .with_batch(batch.clone())
+ .with_batch_size(512)
+ .with_expected_output_sizes(vec![512, 512, 512, 464])
+ .run();
+ }
+
+ #[derive(Debug, Clone, PartialEq)]
+ struct ExpectedLayout {
+ len: usize,
+ capacity: usize,
+ }
+
+ /// Asserts that the buffer layout of the specified StringViewArray
matches the expected layout
+ fn expect_buffer_layout(array: &StringViewArray, expected:
Vec<ExpectedLayout>) {
+ let actual = array
+ .data_buffers()
+ .iter()
+ .map(|b| ExpectedLayout {
+ len: b.len(),
+ capacity: b.capacity(),
+ })
+ .collect::<Vec<_>>();
+
+ assert_eq!(
+ actual, expected,
+ "Expected buffer layout {expected:#?} but got {actual:#?}"
+ );
}
/// Test for [`BatchCoalescer`]
@@ -678,10 +885,26 @@ mod tests {
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema),
target_batch_size);
+ let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
for batch in input_batches {
coalescer.push_batch(batch).unwrap();
}
+ assert_eq!(schema, coalescer.schema());
+
+ if had_input {
+ assert!(!coalescer.is_empty(), "Coalescer should not be
empty");
+ } else {
+ assert!(coalescer.is_empty(), "Coalescer should be empty");
+ }
+
coalescer.finish_buffered_batch().unwrap();
+ if had_input {
+ assert!(
+ coalescer.has_completed_batch(),
+ "Coalescer should have completed batches"
+ );
+ }
+
let mut output_batches = vec![];
while let Some(batch) = coalescer.next_completed_batch() {
output_batches.push(batch);
@@ -689,7 +912,6 @@ mod tests {
// make sure we got the expected number of output batches and
content
let mut starting_idx = 0;
- assert_eq!(expected_output_sizes.len(), output_batches.len());
let actual_output_sizes: Vec<usize> =
output_batches.iter().map(|b| b.num_rows()).collect();
assert_eq!(
@@ -769,7 +991,8 @@ mod tests {
builder.append_option(val);
}
- RecordBatch::try_new(Arc::clone(&schema),
vec![Arc::new(builder.finish())]).unwrap()
+ let array = builder.finish();
+ RecordBatch::try_new(Arc::clone(&schema),
vec![Arc::new(array)]).unwrap()
}
/// Returns the named column as a StringViewArray
diff --git a/arrow-select/src/coalesce/byte_view.rs
b/arrow-select/src/coalesce/byte_view.rs
new file mode 100644
index 0000000000..9f87d14a8e
--- /dev/null
+++ b/arrow-select/src/coalesce/byte_view.rs
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::coalesce::InProgressArray;
+use arrow_array::cast::AsArray;
+use arrow_array::types::ByteViewType;
+use arrow_array::{Array, ArrayRef, GenericByteViewArray};
+use arrow_buffer::{Buffer, NullBufferBuilder};
+use arrow_data::ByteView;
+use arrow_schema::ArrowError;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
+///
+/// This structure buffers the views and data buffers as they are copied from
+/// the source array, and then produces a new array when `finish` is called. It
+/// also handles "garbage collection" by copying strings to a new buffer when
+/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
+/// needs).
+///
+/// [`StringViewArray`]: arrow_array::StringViewArray
+/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
+pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
+ /// The source array and information
+ source: Option<Source>,
+ /// the target batch size (and thus size for views allocation)
+ batch_size: usize,
+ /// The in progress views
+ views: Vec<u128>,
+ /// In progress nulls
+ nulls: NullBufferBuilder,
+ /// current buffer
+ current: Option<Vec<u8>>,
+ /// completed buffers
+ completed: Vec<Buffer>,
+ /// Allocates new buffers of increasing size as needed
+ buffer_source: BufferSource,
+ /// Phantom so we can use the same struct for both StringViewArray and
+ /// BinaryViewArray
+ _phantom: PhantomData<B>,
+}
+
+struct Source {
+ /// The array to copy form
+ array: ArrayRef,
+ /// Should the strings from the source array be copied into new buffers?
+ need_gc: bool,
+ /// How many bytes were actually used in the source array's buffers?
+ ideal_buffer_size: usize,
+}
+
+// manually implement Debug because ByteViewType doesn't implement Debug
+impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InProgressByteViewArray")
+ .field("batch_size", &self.batch_size)
+ .field("views", &self.views.len())
+ .field("nulls", &self.nulls)
+ .field("current", &self.current.as_ref().map(|_| "Some(...)"))
+ .field("completed", &self.completed.len())
+ .finish()
+ }
+}
+
+impl<B: ByteViewType> InProgressByteViewArray<B> {
+ pub(crate) fn new(batch_size: usize) -> Self {
+ let buffer_source = BufferSource::new();
+
+ Self {
+ batch_size,
+ source: None,
+ views: Vec::new(), // allocate in push
+ nulls: NullBufferBuilder::new(batch_size), // no allocation
+ current: None,
+ completed: vec![],
+ buffer_source,
+ _phantom: PhantomData,
+ }
+ }
+
+ /// Allocate space for output views and nulls if needed
+ ///
+ /// This is done on write (when we know it is necessary) rather than
+ /// eagerly to avoid allocations that are not used.
+ fn ensure_capacity(&mut self) {
+ self.views.reserve(self.batch_size);
+ }
+
+ /// Finishes in progress buffer, if any
+ fn finish_current(&mut self) {
+ let Some(next_buffer) = self.current.take() else {
+ return;
+ };
+ self.completed.push(next_buffer.into());
+ }
+
+ /// Append views to self.views, updating the buffer index if necessary
+ #[inline(never)]
+ fn append_views_and_update_buffer_index(&mut self, views: &[u128],
buffers: &[Buffer]) {
+ if let Some(buffer) = self.current.take() {
+ self.completed.push(buffer.into());
+ }
+ let starting_buffer: u32 = self.completed.len().try_into().expect("too
many buffers");
+ self.completed.extend_from_slice(buffers);
+
+ if starting_buffer == 0 {
+ // If there are no buffers, we can just use the views as is
+ self.views.extend_from_slice(views);
+ } else {
+ // If there are buffers, we need to update the buffer index
+ let updated_views = views.iter().map(|v| {
+ let mut byte_view = ByteView::from(*v);
+ if byte_view.length > 12 {
+ // Small views (<=12 bytes) are inlined, so only need to
update large views
+ byte_view.buffer_index += starting_buffer;
+ };
+ byte_view.as_u128()
+ });
+
+ self.views.extend(updated_views);
+ }
+ }
+
+ /// Append views to self.views, copying data from the buffers into
+ /// self.buffers and updating the buffer index as necessary.
+ ///
+ /// # Arguments
+ /// - `views` - the views to append
+ /// - `view_buffer_size` - the total number of bytes pointed to by all
+ /// views (used to allocate new buffers if needed)
+ /// - `buffers` - the buffers the reviews point to
+ #[inline(never)]
+ fn append_views_and_copy_strings(
+ &mut self,
+ views: &[u128],
+ view_buffer_size: usize,
+ buffers: &[Buffer],
+ ) {
+ // Note: the calculations below are designed to avoid any reallocations
+ // of the current buffer, and to only allocate new buffers when
+ // necessary, which is critical for performance.
+
+ // If there is no current buffer, allocate a new one
+ let Some(current) = self.current.take() else {
+ let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
+ self.append_views_and_copy_strings_inner(views, new_buffer,
buffers);
+ return;
+ };
+
+ // If there is a current buffer with enough space, append the views and
+ // copy the strings into the existing buffer.
+ let mut remaining_capacity = current.capacity() - current.len();
+ if view_buffer_size <= remaining_capacity {
+ self.append_views_and_copy_strings_inner(views, current, buffers);
+ return;
+ }
+
+ // Here there is a current buffer, but it doesn't have enough space to
+ // hold all the strings. Copy as many views as we can into the current
+ // buffer and then allocate a new buffer for the remaining views
+ //
+ // TODO: should we copy the strings too at the same time?
+ let mut num_view_to_current = 0;
+ for view in views {
+ let b = ByteView::from(*view);
+ let str_len = b.length;
+ if remaining_capacity < str_len as usize {
+ break;
+ }
+ if str_len > 12 {
+ remaining_capacity -= str_len as usize;
+ }
+ num_view_to_current += 1;
+ }
+
+ let first_views = &views[0..num_view_to_current];
+ let string_bytes_to_copy = current.capacity() - current.len() -
remaining_capacity;
+ let remaining_view_buffer_size = view_buffer_size -
string_bytes_to_copy;
+
+ self.append_views_and_copy_strings_inner(first_views, current,
buffers);
+ let completed = self.current.take().expect("completed");
+ self.completed.push(completed.into());
+
+ // Copy any remaining views into a new buffer
+ let remaining_views = &views[num_view_to_current..];
+ let new_buffer =
self.buffer_source.next_buffer(remaining_view_buffer_size);
+ self.append_views_and_copy_strings_inner(remaining_views, new_buffer,
buffers);
+ }
+
+ /// Append views to self.views, copying data from the buffers into
+ /// dst_buffer, which is then set as self.current
+ ///
+ /// # Panics:
+ /// If `self.current` is `Some`
+ ///
+ /// See `append_views_and_copy_strings` for more details
+ #[inline(never)]
+ fn append_views_and_copy_strings_inner(
+ &mut self,
+ views: &[u128],
+ mut dst_buffer: Vec<u8>,
+ buffers: &[Buffer],
+ ) {
+ assert!(self.current.is_none(), "current buffer should be None");
+
+ if views.is_empty() {
+ self.current = Some(dst_buffer);
+ return;
+ }
+
+ let new_buffer_index: u32 =
self.completed.len().try_into().expect("too many buffers");
+
+ // In debug builds, check that the vector has enough capacity to copy
+ // the views into it without reallocating.
+ #[cfg(debug_assertions)]
+ {
+ let total_length: usize = views
+ .iter()
+ .filter_map(|v| {
+ let b = ByteView::from(*v);
+ if b.length > 12 {
+ Some(b.length as usize)
+ } else {
+ None
+ }
+ })
+ .sum();
+ debug_assert!(
+ dst_buffer.capacity() >= total_length,
+ "dst_buffer capacity {} is less than total length {}",
+ dst_buffer.capacity(),
+ total_length
+ );
+ }
+
+ // Copy the views, updating the buffer index and copying the data as
needed
+ let new_views = views.iter().map(|v| {
+ let mut b: ByteView = ByteView::from(*v);
+ if b.length > 12 {
+ let buffer_index = b.buffer_index as usize;
+ let buffer_offset = b.offset as usize;
+ let str_len = b.length as usize;
+
+ // Update view to location in current
+ b.offset = dst_buffer.len() as u32;
+ b.buffer_index = new_buffer_index;
+
+ // safety: input views are validly constructed
+ let src = unsafe {
+ buffers
+ .get_unchecked(buffer_index)
+ .get_unchecked(buffer_offset..buffer_offset + str_len)
+ };
+ dst_buffer.extend_from_slice(src);
+ }
+ b.as_u128()
+ });
+ self.views.extend(new_views);
+ self.current = Some(dst_buffer);
+ }
+}
+
+impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
+ fn set_source(&mut self, source: Option<ArrayRef>) {
+ self.source = source.map(|array| {
+ let s = array.as_byte_view::<B>();
+
+ let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
+ (false, 0)
+ } else {
+ let ideal_buffer_size = s.total_buffer_bytes_used();
+ let actual_buffer_size = s.get_buffer_memory_size();
+ // copying strings is expensive, so only do it if the array is
+ // sparse (uses at least 2x the memory it needs)
+ let need_gc =
+ ideal_buffer_size != 0 && actual_buffer_size >
(ideal_buffer_size * 2);
+ (need_gc, ideal_buffer_size)
+ };
+
+ Source {
+ array,
+ need_gc,
+ ideal_buffer_size,
+ }
+ })
+ }
+
+ fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError> {
+ self.ensure_capacity();
+ let source = self.source.take().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressByteViewArray: source not
set".to_string(),
+ )
+ })?;
+
+ // If creating StringViewArray output, ensure input was valid utf8 too
+ let s = source.array.as_byte_view::<B>();
+
+ // add any nulls, as necessary
+ if let Some(nulls) = s.nulls().as_ref() {
+ let nulls = nulls.slice(offset, len);
+ self.nulls.append_buffer(&nulls);
+ } else {
+ self.nulls.append_n_non_nulls(len);
+ };
+
+ let buffers = s.data_buffers();
+ let views = &s.views().as_ref()[offset..offset + len];
+
+ // If there are no data buffers in s (all inlined views), can append
the
+ // views/nulls and done
+ if source.ideal_buffer_size == 0 {
+ self.views.extend_from_slice(views);
+ self.source = Some(source);
+ return Ok(());
+ }
+
+ // Copying the strings into a buffer can be time-consuming so
+ // only do it if the array is sparse
+ if source.need_gc {
+ self.append_views_and_copy_strings(views,
source.ideal_buffer_size, buffers);
+ } else {
+ self.append_views_and_update_buffer_index(views, buffers);
+ }
+ self.source = Some(source);
+ Ok(())
+ }
+
+ fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
+ self.finish_current();
+ assert!(self.current.is_none());
+ let buffers = std::mem::take(&mut self.completed);
+ let views = std::mem::take(&mut self.views);
+ let nulls = self.nulls.finish();
+ self.nulls = NullBufferBuilder::new(self.batch_size);
+
+ // Safety: we created valid views and buffers above and the
+ // input arrays had value data and nulls
+ let new_array =
+ unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(),
buffers, nulls) };
+ Ok(Arc::new(new_array))
+ }
+}
+
+const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is
actually 8KiB)
+const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
+
+/// Manages allocating new buffers for `StringViewArray` in increasing sizes
+#[derive(Debug)]
+struct BufferSource {
+ current_size: usize,
+}
+
+impl BufferSource {
+ fn new() -> Self {
+ Self {
+ current_size: STARTING_BLOCK_SIZE,
+ }
+ }
+
+ /// Return a new buffer, with a capacity of at least `min_size`
+ fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
+ let size = self.next_size(min_size);
+ Vec::with_capacity(size)
+ }
+
+ fn next_size(&mut self, min_size: usize) -> usize {
+ if self.current_size < MAX_BLOCK_SIZE {
+ // If the current size is less than the max size, we can double it
+ // we have fixed start/end block sizes, so we can't overflow
+ self.current_size = self.current_size.saturating_mul(2);
+ }
+ if self.current_size >= min_size {
+ self.current_size
+ } else {
+ // increase next size until we hit min_size or max size
+ while self.current_size <= min_size && self.current_size <
MAX_BLOCK_SIZE {
+ self.current_size = self.current_size.saturating_mul(2);
+ }
+ self.current_size.max(min_size)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_buffer_source() {
+ let mut source = BufferSource::new();
+ assert_eq!(source.next_buffer(1000).capacity(), 8192);
+ assert_eq!(source.next_buffer(1000).capacity(), 16384);
+ assert_eq!(source.next_buffer(1000).capacity(), 32768);
+ assert_eq!(source.next_buffer(1000).capacity(), 65536);
+ assert_eq!(source.next_buffer(1000).capacity(), 131072);
+ assert_eq!(source.next_buffer(1000).capacity(), 262144);
+ assert_eq!(source.next_buffer(1000).capacity(), 524288);
+ assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
+ // clamped to max size
+ assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
+ // Can override with larger size request
+ assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
+ }
+
+ #[test]
+ fn test_buffer_source_with_min_small() {
+ let mut source = BufferSource::new();
+ // First buffer should be 8kb
+ assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
+ // then 16kb
+ assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
+ // then 32kb
+ assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
+ }
+
+ #[test]
+ fn test_buffer_source_with_min_large() {
+ let mut source = BufferSource::new();
+ assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
+ assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
+ // clamped to max size
+ assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
+ // Can override with larger size request
+ assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
+ }
+}
diff --git a/arrow-select/src/coalesce/generic.rs
b/arrow-select/src/coalesce/generic.rs
new file mode 100644
index 0000000000..1ea57dff92
--- /dev/null
+++ b/arrow-select/src/coalesce/generic.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::InProgressArray;
+use crate::concat::concat;
+use arrow_array::ArrayRef;
+use arrow_schema::ArrowError;
+
+/// Generic implementation for [`InProgressArray`] that works with any type of
+/// array.
+///
+/// Internally, this buffers arrays and then calls other kernels such as
+/// [`concat`] to produce the final array.
+///
+/// [`concat`]: crate::concat::concat
+#[derive(Debug)]
+pub(crate) struct GenericInProgressArray {
+ /// The current source
+ source: Option<ArrayRef>,
+ /// The buffered array slices
+ buffered_arrays: Vec<ArrayRef>,
+}
+
+impl GenericInProgressArray {
+ /// Create a new `GenericInProgressArray`
+ pub(crate) fn new() -> Self {
+ Self {
+ source: None,
+ buffered_arrays: vec![],
+ }
+ }
+}
+impl InProgressArray for GenericInProgressArray {
+ fn set_source(&mut self, source: Option<ArrayRef>) {
+ self.source = source
+ }
+
+ fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError> {
+ let source = self.source.as_ref().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: GenericInProgressArray: source not
set".to_string(),
+ )
+ })?;
+ let array = source.slice(offset, len);
+ self.buffered_arrays.push(array);
+ Ok(())
+ }
+
+ fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
+ // Concatenate all buffered arrays into a single array, which uses 2x
+ // peak memory
+ let array = concat(
+ &self
+ .buffered_arrays
+ .iter()
+ .map(|array| array.as_ref())
+ .collect::<Vec<_>>(),
+ )?;
+ self.buffered_arrays.clear();
+ Ok(array)
+ }
+}