sdd commented on code in PR #652:
URL: https://github.com/apache/iceberg-rust/pull/652#discussion_r1879516829


##########
crates/iceberg/src/scan.rs:
##########
@@ -951,6 +1077,82 @@ impl FileScanTask {
     }
 }
 
+type DeleteFileManagerResult = 
Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;
+
+/// Manages async retrieval of all the delete files from FileIO that are
+/// applicable to the scan. Provides references to them for inclusion within 
FileScanTasks
+#[derive(Debug, Clone)]
+struct DeleteFileManager {

Review Comment:
   Refactored to a separate module and rebased back on latest `main` as it was 
getting a bit stale. I'll be working to update this PR to bring it closer to 
`DeleteFileIndex`, ideally in a way that allows me to split this into smaller 
PRs as well



##########
crates/iceberg/src/spec/delete_file.rs:
##########
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::scan::ArrowRecordBatchStream;
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
+pub(crate) const FIELD_ID_DELETE_POS: i32 = i32::MAX - 102;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Positional(HashMap<String, Vec<u64>>),
+
+    // Equality delete files are initially parsed solely as an
+    // unprocessed list of `RecordBatch`es from the equality
+    // delete files.
+    // I don't think we can do better than this by
+    // storing a Predicate (because the equality deletes use the
+    // field_id rather than the field name, so if we keep this as
+    // a Predicate then a field name change would break it).
+    // Similarly, I don't think we can store this as a BoundPredicate
+    // as the column order could be different across different data
+    // files and so the accessor in the bound predicate could be invalid).
+    Equality(Vec<RecordBatch>),
+}
+
+enum PosDelSchema {
+    WithRow,
+    WithoutRow,
+}
+
+fn validate_schema(schema: SchemaRef) -> Result<PosDelSchema> {
+    let fields = schema.flattened_fields();
+    match fields.len() {
+        2 | 3 => {
+            let Some(path_field_id_raw) = 
fields[0].metadata().get(PARQUET_FIELD_ID_META_KEY)
+            else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 0)",
+                ));
+            };
+
+            let Some(pos_field_id_raw) = 
fields[1].metadata().get(PARQUET_FIELD_ID_META_KEY) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 1)",
+                ));
+            };
+
+            let Ok(path_field_id_val) = path_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 0 not parseable as an int)",
+                ));
+            };
+
+            let Ok(pos_field_id_val) = pos_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 1 not parseable as an int)",
+                ));
+            };
+
+            if path_field_id_val != FIELD_ID_DELETE_FILE_PATH
+                || pos_field_id_val != FIELD_ID_DELETE_POS
+            {
+                Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(unexpected field ID)",
+                ))
+            } else if fields.len() == 2 {
+                Ok(PosDelSchema::WithoutRow)
+            } else {
+                // TODO: should we check that col 3 is of type Struct
+                //   and that it contains a subset of the table schema?
+                Ok(PosDelSchema::WithRow)
+            }
+        }
+        _ => Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Positional Delete file did not have the expected schema",
+        )),
+    }
+}
+
+pub(crate) async fn parse_positional_delete_file(

Review Comment:
   Will make a start on this now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to