liurenjie1024 commented on code in PR #652:
URL: https://github.com/apache/iceberg-rust/pull/652#discussion_r1818728863


##########
crates/iceberg/src/spec/delete_file.rs:
##########
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::scan::ArrowRecordBatchStream;
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
+pub(crate) const FIELD_ID_DELETE_POS: i32 = i32::MAX - 102;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Positional(HashMap<String, Vec<u64>>),

Review Comment:
   Java implementation uses roaring bitmap to save space, we should also use it?



##########
crates/iceberg/src/scan.rs:
##########
@@ -951,6 +1077,82 @@ impl FileScanTask {
     }
 }
 
+type DeleteFileManagerResult = 
Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;
+
+/// Manages async retrieval of all the delete files from FileIO that are
+/// applicable to the scan. Provides references to them for inclusion within 
FileScanTasks
+#[derive(Debug, Clone)]
+struct DeleteFileManager {

Review Comment:
   I would suggest to move this part to a standalone module. And there exists a 
component 
[DeleteFileIndex](https://github.com/apache/iceberg/blob/95497abe5579cf492f24ac8c470c7853d59332e9/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L66)
 in java implementation, which I think is well designed. We don't need to 
implement all its details in one pr, but maintaining a similar data structure 
and api as  `DeleteFileIndex` and evolve slowly would be easier.



##########
crates/iceberg/src/scan.rs:
##########
@@ -951,6 +1077,82 @@ impl FileScanTask {
     }
 }
 
+type DeleteFileManagerResult = 
Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;
+
+/// Manages async retrieval of all the delete files from FileIO that are
+/// applicable to the scan. Provides references to them for inclusion within 
FileScanTasks
+#[derive(Debug, Clone)]
+struct DeleteFileManager {
+    files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
+}
+
+#[derive(Debug, Clone)]
+struct DeleteFileManagerFuture {
+    files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
+}
+
+impl Future for DeleteFileManagerFuture {
+    type Output = DeleteFileManagerResult;
+
+    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> 
{
+        let Ok(guard) = self.files.try_read() else {
+            return Poll::Pending;
+        };
+
+        if let Some(value) = guard.as_ref() {
+            Poll::Ready(match value.as_ref() {
+                Ok(deletes) => Ok(deletes.clone()),
+                Err(err) => Err(Error::new(err.kind(), err.message())),
+            })
+        } else {
+            Poll::Pending
+        }
+    }
+}
+
+impl DeleteFileManager {
+    pub(crate) fn from_receiver(receiver: 
Receiver<Result<FileScanTaskDeleteFile>>) -> Self {
+        let delete_file_stream = receiver.boxed();
+        let files = Arc::new(RwLock::new(None));
+
+        spawn({
+            let files = files.clone();
+            async move {
+                let _ = spawn(async move {
+                    let result = 
delete_file_stream.try_collect::<Vec<_>>().await;
+                    let result = result.map(|files| {
+                        if files.is_empty() {
+                            None
+                        } else {
+                            Some(Arc::new(files))
+                        }
+                    });
+
+                    // Unwrap is ok here since this is the only place where a 
write lock
+                    // can be acquired, so the lock can't already have been 
poisoned
+                    let mut guard = files.write().unwrap();
+                    *guard = Some(result);
+                })
+                .await;
+            }
+        });
+
+        DeleteFileManager { files }
+    }
+
+    pub(crate) fn get_deletes_for_data_file(
+        &self,
+        _data_file: &DataFile,
+    ) -> DeleteFileManagerFuture {
+        // TODO: in the future we may want to filter out delete files
+        //       that are not applicable to the DataFile?
+
+        DeleteFileManagerFuture {
+            files: self.files.clone(),

Review Comment:
   This is incorrect even if we ignore pruning techniques  to remove unrelated 
deletion files. Please see [this 
part](https://iceberg.apache.org/spec/#scan-planning:~:text=Delete%20files%20that%20match%20the%20query%20filter%20must%20be%20applied%20to%20data%20files%20at%20read%20time%2C%20limited%20by%20the%20scope%20of%20the%20delete%20file%20using%20the%20following%20rules.)
 for details.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -176,6 +188,350 @@ impl ArrowReader {
         return Ok(rx.boxed());
     }
 
+    // retrieve all delete files concurrently from FileIO and parse them
+    // into `Deletes` objects
+    async fn get_deletes(
+        delete_file_entries: Option<Arc<Vec<FileScanTaskDeleteFile>>>,
+        file_io: FileIO,
+        concurrency_limit_data_files: usize,
+    ) -> Result<Option<Vec<Deletes>>> {
+        let Some(delete_file_entries) = delete_file_entries else {
+            return Ok(None);
+        };
+
+        let (tx, rx) = channel(concurrency_limit_data_files);
+        let mut channel_for_error = tx.clone();
+
+        spawn(async move {
+            #[allow(clippy::redundant_closure)] // recommendation from Clippy 
fails to compile
+            let result =
+                futures::stream::iter(delete_file_entries.iter().map(|df| 
crate::Result::Ok(df)))
+                    .try_for_each_concurrent(concurrency_limit_data_files, 
|entry| {
+                        let file_io = file_io.clone();
+                        let mut tx = tx.clone();
+                        async move {
+                            let FileScanTaskDeleteFile {
+                                ref file_path,
+                                file_type,
+                            } = entry;
+
+                            let record_batch_stream =
+                                
Self::create_parquet_record_batch_stream_builder(
+                                    file_path,
+                                    file_io.clone(),
+                                    false,
+                                )
+                                .await?
+                                .build()?
+                                .map(|item| match item {
+                                    Ok(val) => Ok(val),
+                                    Err(err) => {
+                                        Err(Error::new(ErrorKind::DataInvalid, 
err.to_string())
+                                            .with_source(err))
+                                    }
+                                })
+                                .boxed();
+
+                            let result = match file_type {
+                                DataContentType::PositionDeletes => {
+                                    
parse_positional_delete_file(record_batch_stream).await
+                                }
+                                DataContentType::EqualityDeletes => {
+                                    
parse_equality_delete_file(record_batch_stream).await
+                                }
+                                _ => Err(Error::new(
+                                    ErrorKind::Unexpected,
+                                    "Expected equality or positional delete",
+                                )),
+                            }?;
+
+                            tx.send(Ok(result)).await?;
+                            Ok(())
+                        }
+                    })
+                    .await;
+
+            if let Err(error) = result {
+                let _ = channel_for_error.send(Err(error)).await;
+            }
+        });
+
+        let results = rx.try_collect::<Vec<_>>().await?;
+        if results.is_empty() {
+            Ok(None)
+        } else {
+            Ok(Some(results))
+        }
+    }
+
+    fn get_positional_delete_indexes(
+        data_file_path: &str,
+        deletes: &[Deletes],
+    ) -> Option<Vec<usize>> {
+        let mut results = deletes
+            .iter()
+            .filter_map(|d| {
+                if let Deletes::Positional(map) = d {
+                    if let Some(indices) = map.get(data_file_path) {
+                        return Some(indices.iter().map(|&i| i as usize));
+                    }
+                }
+
+                None
+            })
+            .peekable();
+
+        if results.peek().is_none() {
+            None
+        } else {
+            Some(results.flatten().collect())
+        }
+    }
+
+    fn get_equality_deletes(
+        delete_files: &[Deletes],
+        snapshot_schema: SchemaRef,
+    ) -> Result<Option<BoundPredicate>> {
+        let mut result_predicate = AlwaysFalse;
+
+        for delete_file in delete_files {
+            if let Deletes::Equality(record_batches) = delete_file {
+                for record_batch in record_batches {
+                    let batch_schema_arrow = record_batch.schema();
+                    let batch_schema_iceberg = 
arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
+
+                    let datum_columns_with_names: Result<Vec<_>> = record_batch
+                        .columns()
+                        .iter()
+                        .zip(batch_schema_iceberg.as_struct().fields())
+                        .map(|(column, field)| {
+                            let col_as_datum_vec =
+                                
Self::equality_delete_column_to_datum_vec(column, field);
+                            col_as_datum_vec.map(|c| (c, 
field.name.to_string()))
+                        })
+                        .collect();
+                    let datum_columns_with_names = datum_columns_with_names?;
+
+                    for row_idx in 0..record_batch.num_rows() {
+                        let mut row_predicate = AlwaysTrue;
+                        for (column, field_name) in &datum_columns_with_names {
+                            if let Some(Some(datum)) = column.get(row_idx) {
+                                row_predicate = row_predicate
+                                    
.and(Reference::new(field_name).equal_to(datum.clone()));
+                            }
+                        }
+                        result_predicate = result_predicate.or(row_predicate);
+                    }
+                }
+            }
+        }
+
+        Ok(if result_predicate == AlwaysFalse {
+            None
+        } else {
+            // This is a deletion filter, not a selection filter, so we need 
to invert it
+            result_predicate = result_predicate.negate();
+            Some(result_predicate.bind(snapshot_schema, true)?)
+        })
+    }
+
+    fn equality_delete_column_to_datum_vec(
+        column: &ArrayRef,
+        field: &NestedFieldRef,
+    ) -> Result<Vec<Option<Datum>>> {

Review Comment:
   Should this be a struct set?



##########
crates/iceberg/src/scan.rs:
##########
@@ -922,6 +1035,19 @@ pub struct FileScanTask {
     /// The predicate to filter.
     #[serde(skip_serializing_if = "Option::is_none")]
     pub predicate: Option<BoundPredicate>,
+
+    /// The list of delete files that may need to be applied to this data file
+    pub deletes: Option<Arc<Vec<FileScanTaskDeleteFile>>>,
+}
+
+/// A task to scan part of file.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct FileScanTaskDeleteFile {

Review Comment:
   This may evolve as we add more feature, so I would suggest to make this a 
crate only data structure.



##########
crates/iceberg/src/spec/delete_file.rs:
##########
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::scan::ArrowRecordBatchStream;
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
+pub(crate) const FIELD_ID_DELETE_POS: i32 = i32::MAX - 102;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Positional(HashMap<String, Vec<u64>>),
+
+    // Equality delete files are initially parsed solely as an
+    // unprocessed list of `RecordBatch`es from the equality
+    // delete files.
+    // I don't think we can do better than this by
+    // storing a Predicate (because the equality deletes use the
+    // field_id rather than the field name, so if we keep this as
+    // a Predicate then a field name change would break it).
+    // Similarly, I don't think we can store this as a BoundPredicate
+    // as the column order could be different across different data
+    // files and so the accessor in the bound predicate could be invalid).
+    Equality(Vec<RecordBatch>),
+}
+
+enum PosDelSchema {
+    WithRow,
+    WithoutRow,
+}
+
+fn validate_schema(schema: SchemaRef) -> Result<PosDelSchema> {
+    let fields = schema.flattened_fields();
+    match fields.len() {
+        2 | 3 => {
+            let Some(path_field_id_raw) = 
fields[0].metadata().get(PARQUET_FIELD_ID_META_KEY)
+            else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 0)",
+                ));
+            };
+
+            let Some(pos_field_id_raw) = 
fields[1].metadata().get(PARQUET_FIELD_ID_META_KEY) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 1)",
+                ));
+            };
+
+            let Ok(path_field_id_val) = path_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 0 not parseable as an int)",
+                ));
+            };
+
+            let Ok(pos_field_id_val) = pos_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 1 not parseable as an int)",
+                ));
+            };
+
+            if path_field_id_val != FIELD_ID_DELETE_FILE_PATH
+                || pos_field_id_val != FIELD_ID_DELETE_POS
+            {
+                Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(unexpected field ID)",
+                ))
+            } else if fields.len() == 2 {
+                Ok(PosDelSchema::WithoutRow)
+            } else {
+                // TODO: should we check that col 3 is of type Struct
+                //   and that it contains a subset of the table schema?
+                Ok(PosDelSchema::WithRow)
+            }
+        }
+        _ => Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Positional Delete file did not have the expected schema",
+        )),
+    }
+}
+
+pub(crate) async fn parse_positional_delete_file(
+    mut record_batch_stream: ArrowRecordBatchStream,
+) -> Result<Deletes> {
+    let mut result: HashMap<String, Vec<u64>> = HashMap::new();
+
+    while let Some(batch) = record_batch_stream.next().await {
+        let batch = batch?;
+        let schema = batch.schema();
+
+        // Don't care about what schema type it is at
+        // present as we're ignoring the "row" column from files
+        // with 3-column schemas. We only care if it is valid
+        let _schema_type = validate_schema(schema)?;
+
+        let columns = batch.columns();
+
+        let Some(file_paths) = 
columns[0].as_any().downcast_ref::<StringArray>() else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Could not downcast file paths array to StringArray",
+            ));
+        };
+        let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() 
else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Could not downcast positions array to Int64Array",
+            ));
+        };
+
+        for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
+            let (Some(file_path), Some(pos)) = (file_path, pos) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "null values in delete file",
+                ));
+            };
+
+            result
+                .entry(file_path.to_string())
+                .and_modify(|entry| {
+                    (*entry).push(pos as u64);
+                })
+                .or_insert(vec![pos as u64]);
+        }
+    }
+
+    Ok(Deletes::Positional(result))
+}
+
+pub(crate) async fn parse_equality_delete_file(

Review Comment:
   Consider schema evolution, we should use same logic as data file processing.



##########
crates/iceberg/src/spec/delete_file.rs:
##########
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::scan::ArrowRecordBatchStream;
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
+pub(crate) const FIELD_ID_DELETE_POS: i32 = i32::MAX - 102;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Positional(HashMap<String, Vec<u64>>),
+
+    // Equality delete files are initially parsed solely as an
+    // unprocessed list of `RecordBatch`es from the equality
+    // delete files.
+    // I don't think we can do better than this by
+    // storing a Predicate (because the equality deletes use the
+    // field_id rather than the field name, so if we keep this as
+    // a Predicate then a field name change would break it).
+    // Similarly, I don't think we can store this as a BoundPredicate
+    // as the column order could be different across different data
+    // files and so the accessor in the bound predicate could be invalid).
+    Equality(Vec<RecordBatch>),
+}
+
+enum PosDelSchema {
+    WithRow,
+    WithoutRow,
+}
+
+fn validate_schema(schema: SchemaRef) -> Result<PosDelSchema> {
+    let fields = schema.flattened_fields();
+    match fields.len() {
+        2 | 3 => {
+            let Some(path_field_id_raw) = 
fields[0].metadata().get(PARQUET_FIELD_ID_META_KEY)
+            else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 0)",
+                ));
+            };
+
+            let Some(pos_field_id_raw) = 
fields[1].metadata().get(PARQUET_FIELD_ID_META_KEY) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 1)",
+                ));
+            };
+
+            let Ok(path_field_id_val) = path_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 0 not parseable as an int)",
+                ));
+            };
+
+            let Ok(pos_field_id_val) = pos_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 1 not parseable as an int)",
+                ));
+            };
+
+            if path_field_id_val != FIELD_ID_DELETE_FILE_PATH
+                || pos_field_id_val != FIELD_ID_DELETE_POS
+            {
+                Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(unexpected field ID)",
+                ))
+            } else if fields.len() == 2 {
+                Ok(PosDelSchema::WithoutRow)
+            } else {
+                // TODO: should we check that col 3 is of type Struct
+                //   and that it contains a subset of the table schema?
+                Ok(PosDelSchema::WithRow)
+            }
+        }
+        _ => Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Positional Delete file did not have the expected schema",
+        )),
+    }
+}
+
+pub(crate) async fn parse_positional_delete_file(

Review Comment:
   I think this logic is quite similar to data file reader, I would expected it 
to reuse code of data file reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to