sdd commented on code in PR #652:
URL: https://github.com/apache/iceberg-rust/pull/652#discussion_r1875544707


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -176,6 +188,350 @@ impl ArrowReader {
         return Ok(rx.boxed());
     }
 
+    // retrieve all delete files concurrently from FileIO and parse them
+    // into `Deletes` objects
+    async fn get_deletes(
+        delete_file_entries: Option<Arc<Vec<FileScanTaskDeleteFile>>>,
+        file_io: FileIO,
+        concurrency_limit_data_files: usize,
+    ) -> Result<Option<Vec<Deletes>>> {
+        let Some(delete_file_entries) = delete_file_entries else {
+            return Ok(None);
+        };
+
+        let (tx, rx) = channel(concurrency_limit_data_files);
+        let mut channel_for_error = tx.clone();
+
+        spawn(async move {
+            #[allow(clippy::redundant_closure)] // recommendation from Clippy 
fails to compile
+            let result =
+                futures::stream::iter(delete_file_entries.iter().map(|df| 
crate::Result::Ok(df)))
+                    .try_for_each_concurrent(concurrency_limit_data_files, 
|entry| {
+                        let file_io = file_io.clone();
+                        let mut tx = tx.clone();
+                        async move {
+                            let FileScanTaskDeleteFile {
+                                ref file_path,
+                                file_type,
+                            } = entry;
+
+                            let record_batch_stream =
+                                
Self::create_parquet_record_batch_stream_builder(
+                                    file_path,
+                                    file_io.clone(),
+                                    false,
+                                )
+                                .await?
+                                .build()?
+                                .map(|item| match item {
+                                    Ok(val) => Ok(val),
+                                    Err(err) => {
+                                        Err(Error::new(ErrorKind::DataInvalid, 
err.to_string())
+                                            .with_source(err))
+                                    }
+                                })
+                                .boxed();
+
+                            let result = match file_type {
+                                DataContentType::PositionDeletes => {
+                                    
parse_positional_delete_file(record_batch_stream).await
+                                }
+                                DataContentType::EqualityDeletes => {
+                                    
parse_equality_delete_file(record_batch_stream).await
+                                }
+                                _ => Err(Error::new(
+                                    ErrorKind::Unexpected,
+                                    "Expected equality or positional delete",
+                                )),
+                            }?;
+
+                            tx.send(Ok(result)).await?;
+                            Ok(())
+                        }
+                    })
+                    .await;
+
+            if let Err(error) = result {
+                let _ = channel_for_error.send(Err(error)).await;
+            }
+        });
+
+        let results = rx.try_collect::<Vec<_>>().await?;
+        if results.is_empty() {
+            Ok(None)
+        } else {
+            Ok(Some(results))
+        }
+    }
+
+    fn get_positional_delete_indexes(
+        data_file_path: &str,
+        deletes: &[Deletes],
+    ) -> Option<Vec<usize>> {
+        let mut results = deletes
+            .iter()
+            .filter_map(|d| {
+                if let Deletes::Positional(map) = d {
+                    if let Some(indices) = map.get(data_file_path) {
+                        return Some(indices.iter().map(|&i| i as usize));
+                    }
+                }
+
+                None
+            })
+            .peekable();
+
+        if results.peek().is_none() {
+            None
+        } else {
+            Some(results.flatten().collect())
+        }
+    }
+
+    fn get_equality_deletes(
+        delete_files: &[Deletes],
+        snapshot_schema: SchemaRef,
+    ) -> Result<Option<BoundPredicate>> {
+        let mut result_predicate = AlwaysFalse;
+
+        for delete_file in delete_files {
+            if let Deletes::Equality(record_batches) = delete_file {
+                for record_batch in record_batches {
+                    let batch_schema_arrow = record_batch.schema();
+                    let batch_schema_iceberg = 
arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
+
+                    let datum_columns_with_names: Result<Vec<_>> = record_batch
+                        .columns()
+                        .iter()
+                        .zip(batch_schema_iceberg.as_struct().fields())
+                        .map(|(column, field)| {
+                            let col_as_datum_vec =
+                                
Self::equality_delete_column_to_datum_vec(column, field);
+                            col_as_datum_vec.map(|c| (c, 
field.name.to_string()))
+                        })
+                        .collect();
+                    let datum_columns_with_names = datum_columns_with_names?;
+
+                    for row_idx in 0..record_batch.num_rows() {
+                        let mut row_predicate = AlwaysTrue;
+                        for (column, field_name) in &datum_columns_with_names {
+                            if let Some(Some(datum)) = column.get(row_idx) {
+                                row_predicate = row_predicate
+                                    
.and(Reference::new(field_name).equal_to(datum.clone()));
+                            }
+                        }
+                        result_predicate = result_predicate.or(row_predicate);
+                    }
+                }
+            }
+        }
+
+        Ok(if result_predicate == AlwaysFalse {
+            None
+        } else {
+            // This is a deletion filter, not a selection filter, so we need 
to invert it
+            result_predicate = result_predicate.negate();
+            Some(result_predicate.bind(snapshot_schema, true)?)
+        })
+    }
+
+    fn equality_delete_column_to_datum_vec(
+        column: &ArrayRef,
+        field: &NestedFieldRef,
+    ) -> Result<Vec<Option<Datum>>> {

Review Comment:
   Do you mean that it should return `Result<HashSet<Struct>>`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to