This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new abe8a04b48 docs: More docs to `BatchCoalescer` (#7891)
abe8a04b48 is described below
commit abe8a04b4801ec131c6d0478261251251b030035
Author: Yongting You <[email protected]>
AuthorDate: Thu Jul 10 23:50:10 2025 +0800
docs: More docs to `BatchCoalescer` (#7891)
# Which issue does this PR close?
NA
# Rationale for this change
I just read through the new `BatchCoalescer` interface (which is
great!), and I think some additional documentation could further improve
its clarity.
# What changes are included in this PR?
More docs to `BatchCoalescer`
# Are these changes tested?
NA
# Are there any user-facing changes?
No.
---
arrow-select/src/coalesce.rs | 42 +++++++++++++++++++++++++++++-------------
1 file changed, 29 insertions(+), 13 deletions(-)
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 285f6633c0..fc7af1a332 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -47,10 +47,18 @@ use primitive::InProgressPrimitiveArray;
/// smaller batches, and we want to coalesce them into larger batches for
/// further processing.
///
+/// # Motivation
+///
+/// If we use [`concat_batches`] to implement the same functionality, there
are 2 potential issues:
+/// 1. At least 2x peak memory (holding the input and output of concat)
+/// 2. 2 copies of the data (to create the output of filter and then create
the output of concat)
+///
+/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
+/// about the motivation.
+///
/// [`filter`]: crate::filter::filter
/// [`take`]: crate::take::take
-///
-/// See: <https://github.com/apache/arrow-rs/issues/6692>
+/// [`concat_batches`]: crate::concat::concat_batches
///
/// # Example
/// ```
@@ -124,8 +132,10 @@ use primitive::InProgressPrimitiveArray;
pub struct BatchCoalescer {
/// The input schema
schema: SchemaRef,
- /// output batch size
- batch_size: usize,
+ /// The target batch size (and thus size for views allocation). This is a
+ /// hard limit: the output batch will be exactly `target_batch_size`,
+ /// rather than possibly being slightly above.
+ target_batch_size: usize,
/// In-progress arrays
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
/// Buffered row count. Always less than `batch_size`
@@ -139,19 +149,19 @@ impl BatchCoalescer {
///
/// # Arguments
/// - `schema` - the schema of the output batches
- /// - `batch_size` - the number of rows in each output batch.
+ /// - `target_batch_size` - the number of rows in each output batch.
/// Typical values are `4096` or `8192` rows.
///
- pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
+ pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
let in_progress_arrays = schema
.fields()
.iter()
- .map(|field| create_in_progress_array(field.data_type(),
batch_size))
+ .map(|field| create_in_progress_array(field.data_type(),
target_batch_size))
.collect::<Vec<_>>();
Self {
schema,
- batch_size,
+ target_batch_size,
in_progress_arrays,
// We will for sure store at least one completed batch
completed: VecDeque::with_capacity(1),
@@ -201,7 +211,13 @@ impl BatchCoalescer {
/// Push all the rows from `batch` into the Coalescer
///
- /// See [`Self::next_completed_batch()`] to retrieve any completed batches.
+ /// When buffered data plus incoming rows reach `target_batch_size` ,
+ /// completed batches are generated eagerly and can be retrieved via
+ /// [`Self::next_completed_batch()`].
+ /// Output batches contain exactly `target_batch_size` rows, so the tail of
+ /// the input batch may remain buffered.
+ /// Remaining partial data either waits for future input batches or can be
+ /// materialized immediately by calling [`Self::finish_buffered_batch()`].
///
/// # Example
/// ```
@@ -237,8 +253,8 @@ impl BatchCoalescer {
// If pushing this batch would exceed the target batch size,
// finish the current batch and start a new one
let mut offset = 0;
- while num_rows > (self.batch_size - self.buffered_rows) {
- let remaining_rows = self.batch_size - self.buffered_rows;
+ while num_rows > (self.target_batch_size - self.buffered_rows) {
+ let remaining_rows = self.target_batch_size - self.buffered_rows;
debug_assert!(remaining_rows > 0);
// Copy remaining_rows from each array
@@ -262,7 +278,7 @@ impl BatchCoalescer {
}
// If we have reached the target batch size, finalize the buffered
batch
- if self.buffered_rows >= self.batch_size {
+ if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}
@@ -316,7 +332,7 @@ impl BatchCoalescer {
!self.completed.is_empty()
}
- /// Returns the next completed batch, if any
+ /// Removes and returns the next completed batch, if any.
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
self.completed.pop_front()
}