jonathanc-n commented on code in PR #951:
URL: https://github.com/apache/iceberg-rust/pull/951#discussion_r1980039486


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -265,6 +317,132 @@ impl ArrowReader {
         Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
     }
 
+    async fn create_parquet_record_batch_stream_builder(
+        data_file_path: &str,
+        file_io: FileIO,
+        should_load_page_index: bool,
+    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead 
+ Sized>>> {
+        // Get the metadata for the Parquet file we need to read and build
+        // a reader for the data within
+        let parquet_file = file_io.new_input(data_file_path)?;
+        let (parquet_metadata, parquet_reader) =
+            try_join!(parquet_file.metadata(), parquet_file.reader())?;
+        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+        // Create the record batch stream builder, which wraps the parquet 
file reader
+        let record_batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new_with_options(
+            parquet_file_reader,
+            ArrowReaderOptions::new().with_page_index(should_load_page_index),
+        )
+        .await?;
+        Ok(record_batch_stream_builder)
+    }
+
+    /// computes a `RowSelection` from positional delete indices.
+    ///
+    /// Using the Parquet page index, we build a `RowSelection` that rejects 
rows that are indicated
+    /// as having been deleted by a positional delete, taking into account any 
row groups that have
+    /// been skipped entirely by the filter predicate
+    fn build_deletes_row_selection(
+        row_group_metadata: &[RowGroupMetaData],
+        selected_row_groups: &Option<Vec<usize>>,
+        mut positional_deletes: RoaringTreemap,
+    ) -> Result<RowSelection> {
+        let mut results: Vec<RowSelector> = Vec::new();
+        let mut selected_row_groups_idx = 0;
+        let mut current_page_base_idx: u64 = 0;
+
+        for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() 
{
+            let page_num_rows = row_group_metadata.num_rows() as u64;
+            let next_page_base_idx = current_page_base_idx + page_num_rows;
+
+            // if row group selection is enabled,
+            if let Some(selected_row_groups) = selected_row_groups {
+                // if we've consumed all the selected row groups, we're done
+                if selected_row_groups_idx == selected_row_groups.len() {
+                    break;
+                }
+
+                if idx == selected_row_groups[selected_row_groups_idx] {
+                    // we're in a selected row group. Increment 
selected_row_groups_idx
+                    // so that next time around the for loop we're looking for 
the next
+                    // selected row group
+                    selected_row_groups_idx += 1;
+                } else {
+                    // remove any positional deletes from the skipped page so 
that
+                    // `positional.deletes.min()` can be used
+                    
positional_deletes.remove_range(current_page_base_idx..next_page_base_idx);
+
+                    // still increment the current page base index but then 
skip to the next row group
+                    // in the file
+                    current_page_base_idx += page_num_rows;
+                    continue;
+                }
+            }
+
+            let mut next_deleted_row_idx = match positional_deletes.min() {
+                Some(next_deleted_row_idx) => {
+                    // if the index of the next deleted row is beyond this 
page, add a selection for
+                    // the remainder of this page and skip to the next page
+                    if next_deleted_row_idx >= next_page_base_idx {
+                        results.push(RowSelector::select(page_num_rows as 
usize));
+                        continue;
+                    }
+
+                    next_deleted_row_idx
+                }
+
+                // If there are no more pos deletes, add a selector for the 
entirety of this page.
+                _ => {
+                    results.push(RowSelector::select(page_num_rows as usize));
+                    continue;
+                }
+            };
+
+            let mut current_idx = current_page_base_idx;
+            'chunks: while next_deleted_row_idx < next_page_base_idx {
+                // `select` all rows that precede the next delete index
+                if current_idx < next_deleted_row_idx {
+                    let run_length = next_deleted_row_idx - current_idx;
+                    results.push(RowSelector::select(run_length as usize));
+                    current_idx += run_length;
+                }
+
+                // `skip` all consecutive deleted rows in the current row group
+                let mut run_length = 0;
+                while next_deleted_row_idx == current_idx
+                    && next_deleted_row_idx < next_page_base_idx
+                {
+                    run_length += 1;
+                    current_idx += 1;
+                    positional_deletes.remove(next_deleted_row_idx);
+
+                    next_deleted_row_idx = match positional_deletes.min() {
+                        Some(next_deleted_row_idx) => next_deleted_row_idx,
+                        _ => {
+                            // We've processed the final positional delete.
+                            // Conclude the skip and then break so that we 
select the remaining
+                            // rows in the page and move on to the next row 
group
+                            results.push(RowSelector::skip(run_length));
+                            break 'chunks;
+                        }
+                    };
+                }
+                results.push(RowSelector::skip(run_length));

Review Comment:
   Maybe add a check to push only if `run_length > 0`, small nit though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to