This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 91234b542f feat: impl BatchCoalescer::push_batch_with_indices (#8991)
91234b542f is described below

commit 91234b542f455e06666188cc216c1af0cec9788b
Author: ClSlaid <[email protected]>
AuthorDate: Tue Dec 16 02:13:23 2025 +0800

    feat: impl BatchCoalescer::push_batch_with_indices (#8991)
    
    MVP for #8957
    awaits for #8951
    
    very first version for reviewers to confirm behaviour, optimizations TBD
    
    Signed-off-by: 蔡略 <[email protected]>
---
 arrow-select/src/coalesce.rs | 77 ++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 74 insertions(+), 3 deletions(-)

diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 5ef9a86b92..ddb1c41c8c 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -21,6 +21,7 @@
 //! [`filter`]: crate::filter::filter
 //! [`take`]: crate::take::take
 use crate::filter::filter_record_batch;
+use crate::take::take_record_batch;
 use arrow_array::types::{BinaryViewType, StringViewType};
 use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, 
downcast_primitive};
 use arrow_schema::{ArrowError, DataType, SchemaRef};
@@ -243,6 +244,39 @@ impl BatchCoalescer {
         self.push_batch(filtered_batch)
     }
 
+    /// Push a batch into the Coalescer after applying a set of indices
+    /// This is semantically equivalent of calling [`Self::push_batch`]
+    /// with the results from  [`take_record_batch`]
+    ///
+    /// # Example
+    /// ```
+    /// # use arrow_array::{record_batch, UInt64Array};
+    /// # use arrow_select::coalesce::BatchCoalescer;
+    /// let batch1 = record_batch!(("a", Int32, [0, 0, 0])).unwrap();
+    /// let batch2 = record_batch!(("a", Int32, [1, 1, 4, 5, 1, 4])).unwrap();
+    /// // Sorted indices to create a sorted output, this can be obtained with
+    /// // `arrow-ord`'s sort_to_indices operation
+    /// let indices = UInt64Array::from(vec![0, 1, 4, 2, 5, 3]);
+    /// // create a new Coalescer that targets creating 1000 row batches
+    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
+    /// coalescer.push_batch(batch1);
+    /// coalescer.push_batch_with_indices(batch2, &indices);
+    /// // finsh and retrieve the created batch
+    /// coalescer.finish_buffered_batch().unwrap();
+    /// let completed_batch = coalescer.next_completed_batch().unwrap();
+    /// let expected_batch = record_batch!(("a", Int32, [0, 0, 0, 1, 1, 1, 4, 
4, 5])).unwrap();
+    /// assert_eq!(completed_batch, expected_batch);
+    /// ```
+    pub fn push_batch_with_indices(
+        &mut self,
+        batch: RecordBatch,
+        indices: &dyn Array,
+    ) -> Result<(), ArrowError> {
+        // todo: optimize this to avoid materializing (copying the results of 
take indices to a new batch)
+        let taken_batch = take_record_batch(&batch, indices)?;
+        self.push_batch(taken_batch)
+    }
+
     /// Push all the rows from `batch` into the Coalescer
     ///
     /// When buffered data plus incoming rows reach `target_batch_size` ,
@@ -583,7 +617,7 @@ mod tests {
     use arrow_array::cast::AsArray;
     use arrow_array::{
         BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, 
StringArray, StringViewArray,
-        TimestampNanosecondArray, UInt32Array,
+        TimestampNanosecondArray, UInt32Array, UInt64Array,
     };
     use arrow_schema::{DataType, Field, Schema};
     use rand::{Rng, SeedableRng};
@@ -1327,7 +1361,7 @@ mod tests {
 
     /// Return a RecordBatch with a UInt32Array with the specified range and
     /// every third value is null.
-    fn uint32_batch(range: Range<u32>) -> RecordBatch {
+    fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> 
RecordBatch {
         let schema = Arc::new(Schema::new(vec![Field::new("c0", 
DataType::UInt32, true)]));
 
         let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None 
} else { Some(i) }));
@@ -1335,13 +1369,21 @@ mod tests {
     }
 
     /// Return a RecordBatch with a UInt32Array with no nulls specified range
-    fn uint32_batch_non_null(range: Range<u32>) -> RecordBatch {
+    fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> 
RecordBatch {
         let schema = Arc::new(Schema::new(vec![Field::new("c0", 
DataType::UInt32, false)]));
 
         let array = UInt32Array::from_iter_values(range);
         RecordBatch::try_new(Arc::clone(&schema), 
vec![Arc::new(array)]).unwrap()
     }
 
+    /// Return a RecordBatch with a UInt64Array with no nulls specified range
+    fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> 
RecordBatch {
+        let schema = Arc::new(Schema::new(vec![Field::new("c0", 
DataType::UInt64, false)]));
+
+        let array = UInt64Array::from_iter_values(range);
+        RecordBatch::try_new(Arc::clone(&schema), 
vec![Arc::new(array)]).unwrap()
+    }
+
     /// Return a RecordBatch with a StringArrary with values `value0`, 
`value1`, ...
     /// and every third value is `None`.
     fn utf8_batch(range: Range<u32>) -> RecordBatch {
@@ -1932,4 +1974,33 @@ mod tests {
         }
         assert_eq!(coalescer.get_buffered_rows(), 0);
     }
+
+    #[test]
+    fn test_coalasce_push_batch_with_indices() {
+        const MID_POINT: u32 = 2333;
+        const TOTAL_ROWS: u32 = 23333;
+        let batch1 = uint32_batch_non_null(0..MID_POINT);
+        let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
+
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, 
false)])),
+            TOTAL_ROWS as usize,
+        );
+        coalescer.push_batch(batch1).unwrap();
+
+        let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
+        let reversed_indices_batch = uint64_batch_non_null(rev_indices);
+
+        let reverse_indices = 
UInt64Array::from(reversed_indices_batch.column(0).to_data());
+        coalescer
+            .push_batch_with_indices(batch2, &reverse_indices)
+            .unwrap();
+
+        coalescer.finish_buffered_batch().unwrap();
+        let actual = coalescer.next_completed_batch().unwrap();
+
+        let expected = uint32_batch_non_null(0..TOTAL_ROWS);
+
+        assert_eq!(expected, actual);
+    }
 }

Reply via email to