sdd commented on code in PR #652:

@@ -176,6 +188,350 @@ impl ArrowReader {
         return Ok(rx.boxed());
+    // retrieve all delete files concurrently from FileIO and parse them
+    // into `Deletes` objects
+    async fn get_deletes(
+        delete_file_entries: Option<Arc<Vec<FileScanTaskDeleteFile>>>,
+        file_io: FileIO,
+        concurrency_limit_data_files: usize,
+    ) -> Result<Option<Vec<Deletes>>> {
+        let Some(delete_file_entries) = delete_file_entries else {
+            return Ok(None);
+        };
+        let (tx, rx) = channel(concurrency_limit_data_files);
+        let mut channel_for_error = tx.clone();
+        spawn(async move {
+            #[allow(clippy::redundant_closure)] // recommendation from Clippy 
fails to compile
+            let result =
+                futures::stream::iter(delete_file_entries.iter().map(|df| 
+                    .try_for_each_concurrent(concurrency_limit_data_files, 
|entry| {
+                        let file_io = file_io.clone();
+                        let mut tx = tx.clone();
+                        async move {
+                            let FileScanTaskDeleteFile {
+                                ref file_path,
+                                file_type,
+                            } = entry;
+                            let record_batch_stream =
+                                    file_path,
+                                    file_io.clone(),
+                                    false,
+                                )
+                                .await?
+                                .build()?
+                                .map(|item| match item {
+                                    Ok(val) => Ok(val),
+                                    Err(err) => {
+                                        Err(Error::new(ErrorKind::DataInvalid, 
+                                            .with_source(err))
+                                    }
+                                })
+                                .boxed();
+                            let result = match file_type {
+                                DataContentType::PositionDeletes => {
+                                }
+                                DataContentType::EqualityDeletes => {
+                                }
+                                _ => Err(Error::new(
+                                    ErrorKind::Unexpected,
+                                    "Expected equality or positional delete",
+                                )),
+                            }?;
+                            tx.send(Ok(result)).await?;
+                            Ok(())
+                        }
+                    })
+                    .await;
+            if let Err(error) = result {
+                let _ = channel_for_error.send(Err(error)).await;
+            }
+        });
+        let results = rx.try_collect::<Vec<_>>().await?;
+        if results.is_empty() {
+            Ok(None)
+        } else {
+            Ok(Some(results))
+        }
+    }
+    fn get_positional_delete_indexes(
+        data_file_path: &str,
+        deletes: &[Deletes],
+    ) -> Option<Vec<usize>> {
+        let mut results = deletes
+            .iter()
+            .filter_map(|d| {
+                if let Deletes::Positional(map) = d {
+                    if let Some(indices) = map.get(data_file_path) {
+                        return Some(indices.iter().map(|&i| i as usize));
+                    }
+                }
+                None
+            })
+            .peekable();
+        if results.peek().is_none() {
+            None
+        } else {
+            Some(results.flatten().collect())
+        }
+    }
+    fn get_equality_deletes(
+        delete_files: &[Deletes],
+        snapshot_schema: SchemaRef,
+    ) -> Result<Option<BoundPredicate>> {
+        let mut result_predicate = AlwaysFalse;
+        for delete_file in delete_files {
+            if let Deletes::Equality(record_batches) = delete_file {
+                for record_batch in record_batches {
+                    let batch_schema_arrow = record_batch.schema();
+                    let batch_schema_iceberg = 
+                    let datum_columns_with_names: Result<Vec<_>> = record_batch
+                        .columns()
+                        .iter()
+                        .zip(batch_schema_iceberg.as_struct().fields())
+                        .map(|(column, field)| {
+                            let col_as_datum_vec =
Self::equality_delete_column_to_datum_vec(column, field);
+                  |c| (c,
+                        })
+                        .collect();
+                    let datum_columns_with_names = datum_columns_with_names?;
+                    for row_idx in 0..record_batch.num_rows() {
+                        let mut row_predicate = AlwaysTrue;
+                        for (column, field_name) in &datum_columns_with_names {
+                            if let Some(Some(datum)) = column.get(row_idx) {
+                                row_predicate = row_predicate
+                            }
+                        }
+                        result_predicate = result_predicate.or(row_predicate);
+                    }
+                }
+            }
+        }
+        Ok(if result_predicate == AlwaysFalse {
+            None
+        } else {
+            // This is a deletion filter, not a selection filter, so we need 
to invert it
+            result_predicate = result_predicate.negate();
+            Some(result_predicate.bind(snapshot_schema, true)?)
+        })
+    }
+    fn equality_delete_column_to_datum_vec(
+        column: &ArrayRef,
+        field: &NestedFieldRef,
+    ) -> Result<Vec<Option<Datum>>> {

Review Comment:
   Do you mean that it should return `Result<HashSet<Struct>>`?

