This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new abe8a04b48 docs: More docs to `BatchCoalescer` (#7891)
abe8a04b48 is described below

commit abe8a04b4801ec131c6d0478261251251b030035
Author: Yongting You <[email protected]>
AuthorDate: Thu Jul 10 23:50:10 2025 +0800

    docs: More docs to `BatchCoalescer` (#7891)
    
    # Which issue does this PR close?
    
    NA
    
    # Rationale for this change
    
    I just read through the new `BatchCoalescer` interface (which is
    great!), and I think some additional documentation could further improve
    its clarity.
    
    # What changes are included in this PR?
    
    More docs to `BatchCoalescer`
    
    # Are these changes tested?
    
    NA
    
    # Are there any user-facing changes?
    
    No.
---
 arrow-select/src/coalesce.rs | 42 +++++++++++++++++++++++++++++-------------
 1 file changed, 29 insertions(+), 13 deletions(-)

diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 285f6633c0..fc7af1a332 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -47,10 +47,18 @@ use primitive::InProgressPrimitiveArray;
 /// smaller batches, and we want to coalesce them into larger batches for
 /// further processing.
 ///
+/// # Motivation
+///
+/// If we use [`concat_batches`] to implement the same functionality, there 
are 2 potential issues:
+/// 1. At least 2x peak memory (holding the input and output of concat)
+/// 2. 2 copies of the data (to create the output of filter and then create 
the output of concat)
+///
+/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
+/// about the motivation.
+///
 /// [`filter`]: crate::filter::filter
 /// [`take`]: crate::take::take
-///
-/// See: <https://github.com/apache/arrow-rs/issues/6692>
+/// [`concat_batches`]: crate::concat::concat_batches
 ///
 /// # Example
 /// ```
@@ -124,8 +132,10 @@ use primitive::InProgressPrimitiveArray;
 pub struct BatchCoalescer {
     /// The input schema
     schema: SchemaRef,
-    /// output batch size
-    batch_size: usize,
+    /// The target batch size (and thus size for views allocation). This is a
+    /// hard limit: the output batch will be exactly `target_batch_size`,
+    /// rather than possibly being slightly above.
+    target_batch_size: usize,
     /// In-progress arrays
     in_progress_arrays: Vec<Box<dyn InProgressArray>>,
     /// Buffered row count. Always less than `batch_size`
@@ -139,19 +149,19 @@ impl BatchCoalescer {
     ///
     /// # Arguments
     /// - `schema` - the schema of the output batches
-    /// - `batch_size` - the number of rows in each output batch.
+    /// - `target_batch_size` - the number of rows in each output batch.
     ///   Typical values are `4096` or `8192` rows.
     ///
-    pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
+    pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
         let in_progress_arrays = schema
             .fields()
             .iter()
-            .map(|field| create_in_progress_array(field.data_type(), 
batch_size))
+            .map(|field| create_in_progress_array(field.data_type(), 
target_batch_size))
             .collect::<Vec<_>>();
 
         Self {
             schema,
-            batch_size,
+            target_batch_size,
             in_progress_arrays,
             // We will for sure store at least one completed batch
             completed: VecDeque::with_capacity(1),
@@ -201,7 +211,13 @@ impl BatchCoalescer {
 
     /// Push all the rows from `batch` into the Coalescer
     ///
-    /// See [`Self::next_completed_batch()`] to retrieve any completed batches.
+    /// When buffered data plus incoming rows reach `target_batch_size` ,
+    /// completed batches are generated eagerly and can be retrieved via
+    /// [`Self::next_completed_batch()`].
+    /// Output batches contain exactly `target_batch_size` rows, so the tail of
+    /// the input batch may remain buffered.
+    /// Remaining partial data either waits for future input batches or can be
+    /// materialized immediately by calling [`Self::finish_buffered_batch()`].
     ///
     /// # Example
     /// ```
@@ -237,8 +253,8 @@ impl BatchCoalescer {
         // If pushing this batch would exceed the target batch size,
         // finish the current batch and start a new one
         let mut offset = 0;
-        while num_rows > (self.batch_size - self.buffered_rows) {
-            let remaining_rows = self.batch_size - self.buffered_rows;
+        while num_rows > (self.target_batch_size - self.buffered_rows) {
+            let remaining_rows = self.target_batch_size - self.buffered_rows;
             debug_assert!(remaining_rows > 0);
 
             // Copy remaining_rows from each array
@@ -262,7 +278,7 @@ impl BatchCoalescer {
         }
 
         // If we have reached the target batch size, finalize the buffered 
batch
-        if self.buffered_rows >= self.batch_size {
+        if self.buffered_rows >= self.target_batch_size {
             self.finish_buffered_batch()?;
         }
 
@@ -316,7 +332,7 @@ impl BatchCoalescer {
         !self.completed.is_empty()
     }
 
-    /// Returns the next completed batch, if any
+    /// Removes and returns the next completed batch, if any.
     pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
         self.completed.pop_front()
     }

Reply via email to