sdd commented on code in PR #652:
URL: https://github.com/apache/iceberg-rust/pull/652#discussion_r1895664169


##########
crates/iceberg/src/spec/delete_file.rs:
##########
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::scan::ArrowRecordBatchStream;
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
+pub(crate) const FIELD_ID_DELETE_POS: i32 = i32::MAX - 102;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Positional(HashMap<String, Vec<u64>>),
+
+    // Equality delete files are initially parsed solely as an
+    // unprocessed list of `RecordBatch`es from the equality
+    // delete files.
+    // I don't think we can do better than this by
+    // storing a Predicate (because the equality deletes use the
+    // field_id rather than the field name, so if we keep this as
+    // a Predicate then a field name change would break it).
+    // Similarly, I don't think we can store this as a BoundPredicate
+    // as the column order could be different across different data
+    // files and so the accessor in the bound predicate could be invalid).
+    Equality(Vec<RecordBatch>),
+}
+
+enum PosDelSchema {
+    WithRow,
+    WithoutRow,
+}
+
+fn validate_schema(schema: SchemaRef) -> Result<PosDelSchema> {
+    let fields = schema.flattened_fields();
+    match fields.len() {
+        2 | 3 => {
+            let Some(path_field_id_raw) = 
fields[0].metadata().get(PARQUET_FIELD_ID_META_KEY)
+            else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 0)",
+                ));
+            };
+
+            let Some(pos_field_id_raw) = 
fields[1].metadata().get(PARQUET_FIELD_ID_META_KEY) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 1)",
+                ));
+            };
+
+            let Ok(path_field_id_val) = path_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 0 not parseable as an int)",
+                ));
+            };
+
+            let Ok(pos_field_id_val) = pos_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 1 not parseable as an int)",
+                ));
+            };
+
+            if path_field_id_val != FIELD_ID_DELETE_FILE_PATH
+                || pos_field_id_val != FIELD_ID_DELETE_POS
+            {
+                Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(unexpected field ID)",
+                ))
+            } else if fields.len() == 2 {
+                Ok(PosDelSchema::WithoutRow)
+            } else {
+                // TODO: should we check that col 3 is of type Struct
+                //   and that it contains a subset of the table schema?
+                Ok(PosDelSchema::WithRow)
+            }
+        }
+        _ => Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Positional Delete file did not have the expected schema",
+        )),
+    }
+}
+
+pub(crate) async fn parse_positional_delete_file(
+    mut record_batch_stream: ArrowRecordBatchStream,
+) -> Result<Deletes> {
+    let mut result: HashMap<String, Vec<u64>> = HashMap::new();
+
+    while let Some(batch) = record_batch_stream.next().await {
+        let batch = batch?;
+        let schema = batch.schema();
+
+        // Don't care about what schema type it is at
+        // present as we're ignoring the "row" column from files
+        // with 3-column schemas. We only care if it is valid
+        let _schema_type = validate_schema(schema)?;
+
+        let columns = batch.columns();
+
+        let Some(file_paths) = 
columns[0].as_any().downcast_ref::<StringArray>() else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Could not downcast file paths array to StringArray",
+            ));
+        };
+        let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() 
else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Could not downcast positions array to Int64Array",
+            ));
+        };
+
+        for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
+            let (Some(file_path), Some(pos)) = (file_path, pos) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "null values in delete file",
+                ));
+            };
+
+            result
+                .entry(file_path.to_string())
+                .and_modify(|entry| {
+                    (*entry).push(pos as u64);
+                })
+                .or_insert(vec![pos as u64]);
+        }
+    }
+
+    Ok(Deletes::Positional(result))
+}
+
+pub(crate) async fn parse_equality_delete_file(

Review Comment:
   resolved, this change will be in a follow-up PR



##########
crates/iceberg/src/spec/delete_file.rs:
##########
@@ -0,0 +1,780 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+
+use crate::scan::ArrowRecordBatchStream;
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
+pub(crate) const FIELD_ID_DELETE_POS: i32 = i32::MAX - 102;
+
+// Represents a parsed Delete file that can be safely stored
+// in the Object Cache.
+pub(crate) enum Deletes {
+    // Positional delete files are parsed into a map of
+    // filename to a sorted list of row indices.
+    // TODO: Ignoring the stored rows that are present in
+    //   positional deletes for now. I think they only used for statistics?
+    Positional(HashMap<String, Vec<u64>>),
+
+    // Equality delete files are initially parsed solely as an
+    // unprocessed list of `RecordBatch`es from the equality
+    // delete files.
+    // I don't think we can do better than this by
+    // storing a Predicate (because the equality deletes use the
+    // field_id rather than the field name, so if we keep this as
+    // a Predicate then a field name change would break it).
+    // Similarly, I don't think we can store this as a BoundPredicate
+    // as the column order could be different across different data
+    // files and so the accessor in the bound predicate could be invalid).
+    Equality(Vec<RecordBatch>),
+}
+
+enum PosDelSchema {
+    WithRow,
+    WithoutRow,
+}
+
+fn validate_schema(schema: SchemaRef) -> Result<PosDelSchema> {
+    let fields = schema.flattened_fields();
+    match fields.len() {
+        2 | 3 => {
+            let Some(path_field_id_raw) = 
fields[0].metadata().get(PARQUET_FIELD_ID_META_KEY)
+            else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 0)",
+                ));
+            };
+
+            let Some(pos_field_id_raw) = 
fields[1].metadata().get(PARQUET_FIELD_ID_META_KEY) else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(missing field ID for field 1)",
+                ));
+            };
+
+            let Ok(path_field_id_val) = path_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 0 not parseable as an int)",
+                ));
+            };
+
+            let Ok(pos_field_id_val) = pos_field_id_raw.parse::<i32>() else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(field ID 1 not parseable as an int)",
+                ));
+            };
+
+            if path_field_id_val != FIELD_ID_DELETE_FILE_PATH
+                || pos_field_id_val != FIELD_ID_DELETE_POS
+            {
+                Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Positional Delete file did not have the expected schema 
(unexpected field ID)",
+                ))
+            } else if fields.len() == 2 {
+                Ok(PosDelSchema::WithoutRow)
+            } else {
+                // TODO: should we check that col 3 is of type Struct
+                //   and that it contains a subset of the table schema?
+                Ok(PosDelSchema::WithRow)
+            }
+        }
+        _ => Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Positional Delete file did not have the expected schema",
+        )),
+    }
+}
+
+pub(crate) async fn parse_positional_delete_file(

Review Comment:
   resolved, this change will be in a follow-up PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to