JanKaul commented on code in PR #56:
URL: https://github.com/apache/iceberg-rust/pull/56#discussion_r1321022229


##########
crates/iceberg/src/spec/manifest_list.rs:
##########
@@ -0,0 +1,881 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ManifestList for Iceberg.
+
+use crate::{avro::schema_to_avro_schema, spec::Literal, Error};
+use apache_avro::{from_value, types::Value, Reader};
+use once_cell::sync::Lazy;
+use std::sync::Arc;
+
+use super::{FormatVersion, ListType, NestedField, NestedFieldRef, Schema, 
StructType};
+
+/// Snapshots are embedded in table metadata, but the list of manifests for a
+/// snapshot are stored in a separate manifest list file.
+///
+/// A new manifest list is written for each attempt to commit a snapshot
+/// because the list of manifests always changes to produce a new snapshot.
+/// When a manifest list is written, the (optimistic) sequence number of the
+/// snapshot is written for all new manifest files tracked by the list.
+///
+/// A manifest list includes summary metadata that can be used to avoid
+/// scanning all of the manifests in a snapshot when planning a table scan.
+/// This includes the number of added, existing, and deleted files, and a
+/// summary of values for each field of the partition spec used to write the
+/// manifest.
+#[derive(Debug, Clone)]
+pub struct ManifestList {
+    /// Entries in a manifest list.
+    entries: Vec<ManifestListEntry>,
+}
+
+impl ManifestList {
+    /// Parse manifest list from bytes.
+    ///
+    /// QUESTION: Will we have more than one manifest list in a single file?
+    pub fn parse_with_version(
+        bs: &[u8],
+        version: FormatVersion,
+        partition_type: &StructType,
+    ) -> Result<ManifestList, Error> {
+        match version {
+            FormatVersion::V2 => {
+                let schema = schema_to_avro_schema("manifest_list", 
&Self::v2_schema()).unwrap();
+                let reader = Reader::with_schema(&schema, bs)?;
+                let values = Value::Array(reader.collect::<Result<Vec<Value>, 
_>>()?);
+                
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type)
+            }
+            FormatVersion::V1 => {
+                let schema = schema_to_avro_schema("manifest_list", 
&Self::v1_schema()).unwrap();
+                let reader = Reader::with_schema(&schema, bs)?;
+                let values = Value::Array(reader.collect::<Result<Vec<Value>, 
_>>()?);
+                
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type)
+            }
+        }
+    }
+
+    /// Get the entries in the manifest list.
+    pub fn entries(&self) -> &[ManifestListEntry] {
+        &self.entries
+    }
+
+    const MANIFEST_PATH: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                500,
+                "manifest_path",
+                super::Type::Primitive(super::PrimitiveType::String),
+            ))
+        })
+    };
+    const MANIFEST_LENGTH: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                501,
+                "manifest_length",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const PARTITION_SPEC_ID: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                502,
+                "partition_spec_id",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const CONTENT: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                517,
+                "content",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                515,
+                "sequence_number",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const MIN_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                516,
+                "min_sequence_number",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const ADDED_SNAPSHOT_ID: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                503,
+                "added_snapshot_id",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const ADDED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                504,
+                "added_data_files_count",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const ADDED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                504,
+                "added_data_files_count",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const EXISTING_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                505,
+                "existing_data_files_count",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const EXISTING_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                505,
+                "existing_data_files_count",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const DELETED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                506,
+                "deleted_data_files_count",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const DELETED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                506,
+                "deleted_data_files_count",
+                super::Type::Primitive(super::PrimitiveType::Int),
+            ))
+        })
+    };
+    const ADDED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                512,
+                "added_rows_count",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const ADDED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                512,
+                "added_rows_count",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const EXISTING_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                513,
+                "existing_rows_count",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const EXISTING_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                513,
+                "existing_rows_count",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const DELETED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                514,
+                "deleted_rows_count",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const DELETED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                514,
+                "deleted_rows_count",
+                super::Type::Primitive(super::PrimitiveType::Long),
+            ))
+        })
+    };
+    const PARTITIONS: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            // element type
+            let fields = vec![
+                Arc::new(NestedField::required(
+                    509,
+                    "contains_null",
+                    super::Type::Primitive(super::PrimitiveType::Boolean),
+                )),
+                Arc::new(NestedField::optional(
+                    518,
+                    "contains_nan",
+                    super::Type::Primitive(super::PrimitiveType::Boolean),
+                )),
+                Arc::new(NestedField::optional(
+                    510,
+                    "lower_bound",
+                    super::Type::Primitive(super::PrimitiveType::Binary),
+                )),
+                Arc::new(NestedField::optional(
+                    511,
+                    "upper_bound",
+                    super::Type::Primitive(super::PrimitiveType::Binary),
+                )),
+            ];
+            let element_field = Arc::new(NestedField::required(
+                508,
+                "r_508",
+                super::Type::Struct(StructType::new(fields)),
+            ));
+            Arc::new(NestedField::optional(
+                507,
+                "partitions",
+                super::Type::List(ListType { element_field }),
+            ))
+        })
+    };
+    const KEY_METADATA: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                519,
+                "key_metadata",
+                super::Type::Primitive(super::PrimitiveType::Binary),
+            ))
+        })
+    };
+
+    /// Get the v2 schema of the manifest list entry.
+    pub(crate) fn v2_schema() -> Schema {
+        let fields = vec![
+            Self::MANIFEST_PATH.clone(),
+            Self::MANIFEST_LENGTH.clone(),
+            Self::PARTITION_SPEC_ID.clone(),
+            Self::CONTENT.clone(),
+            Self::SEQUENCE_NUMBER.clone(),
+            Self::MIN_SEQUENCE_NUMBER.clone(),
+            Self::ADDED_SNAPSHOT_ID.clone(),
+            Self::ADDED_FILES_COUNT_V2.clone(),
+            Self::EXISTING_FILES_COUNT_V2.clone(),
+            Self::DELETED_FILES_COUNT_V2.clone(),
+            Self::ADDED_ROWS_COUNT_V2.clone(),
+            Self::EXISTING_ROWS_COUNT_V2.clone(),
+            Self::DELETED_ROWS_COUNT_V2.clone(),
+            Self::PARTITIONS.clone(),
+            Self::KEY_METADATA.clone(),
+        ];
+        Schema::builder().with_fields(fields).build().unwrap()
+    }
+    /// Get the v1 schema of the manifest list entry.
+    pub(crate) fn v1_schema() -> Schema {
+        let fields = vec![
+            Self::MANIFEST_PATH.clone(),
+            Self::MANIFEST_LENGTH.clone(),
+            Self::PARTITION_SPEC_ID.clone(),
+            Self::ADDED_SNAPSHOT_ID.clone(),
+            Self::ADDED_FILES_COUNT_V1.clone().to_owned(),
+            Self::EXISTING_FILES_COUNT_V1.clone(),
+            Self::DELETED_FILES_COUNT_V1.clone(),
+            Self::ADDED_ROWS_COUNT_V1.clone(),
+            Self::EXISTING_ROWS_COUNT_V1.clone(),
+            Self::DELETED_ROWS_COUNT_V1.clone(),
+            Self::PARTITIONS.clone(),
+            Self::KEY_METADATA.clone(),
+        ];
+        Schema::builder().with_fields(fields).build().unwrap()
+    }
+}
+
+/// Entry in a manifest list.
+#[derive(Debug, PartialEq, Clone)]
+pub struct ManifestListEntry {
+    /// field: 500
+    ///
+    /// Location of the manifest file
+    manifest_path: String,
+    /// field: 501
+    ///
+    /// Length of the manifest file in bytes
+    manifest_length: i64,
+    /// field: 502
+    ///
+    /// ID of a partition spec used to write the manifest; must be listed
+    /// in table metadata partition-specs
+    partition_spec_id: i32,
+    /// field: 517
+    ///
+    /// The type of files tracked by the manifest, either data or delete
+    /// files; 0 for all v1 manifests
+    content: ManifestContentType,
+    /// field: 515
+    ///
+    /// The sequence number when the manifest was added to the table; use 0
+    /// when reading v1 manifest lists
+    sequence_number: i64,
+    /// field: 516
+    ///
+    /// The minimum data sequence number of all live data or delete files in
+    /// the manifest; use 0 when reading v1 manifest lists
+    min_sequence_number: i64,
+    /// field: 503
+    ///
+    /// ID of the snapshot where the manifest file was added
+    added_snapshot_id: i64,
+    /// field: 504
+    ///
+    /// Number of entries in the manifest that have status ADDED, when null
+    /// this is assumed to be non-zero
+    added_data_files_count: Option<i32>,
+    /// field: 505
+    ///
+    /// Number of entries in the manifest that have status EXISTING (0),
+    /// when null this is assumed to be non-zero
+    existing_data_files_count: Option<i32>,
+    /// field: 506
+    ///
+    /// Number of entries in the manifest that have status DELETED (2),
+    /// when null this is assumed to be non-zero
+    deleted_data_files_count: Option<i32>,
+    /// field: 512
+    ///
+    /// Number of rows in all of files in the manifest that have status
+    /// ADDED, when null this is assumed to be non-zero
+    added_rows_count: Option<i64>,
+    /// field: 513
+    ///
+    /// Number of rows in all of files in the manifest that have status
+    /// EXISTING, when null this is assumed to be non-zero
+    existing_rows_count: Option<i64>,
+    /// field: 514
+    ///
+    /// Number of rows in all of files in the manifest that have status
+    /// DELETED, when null this is assumed to be non-zero
+    deleted_rows_count: Option<i64>,
+    /// field: 507
+    /// element_field: 508
+    ///
+    /// A list of field summaries for each partition field in the spec. Each
+    /// field in the list corresponds to a field in the manifest file’s
+    /// partition spec.
+    partitions: Vec<FieldSummary>,
+    /// field: 519
+    ///
+    /// Implementation-specific key metadata for encryption
+    key_metadata: Vec<u8>,
+}
+
+/// The type of files tracked by the manifest, either data or delete files; 
Data(0) for all v1 manifests
+#[derive(Debug, PartialEq, Clone)]
+pub enum ManifestContentType {
+    /// The manifest content is data.
+    Data = 0,
+    /// The manifest content is deletes.
+    Deletes = 1,
+}

Review Comment:
   > > I think you have to distinguish between position deletes = 1 and 
equality deletes = 2. If we use the serde_repr crate we could directly 
serialize/deserialize it as follows:
   > 
   > Thanks for the reminder! I have a question:
   > 
   > In [spec of data 
file](https://iceberg.apache.org/spec/#manifests:~:text=int%20with%20meaning%3A%200%3A%20DATA%2C%201%3A%20POSITION%20DELETES%2C%202%3A%20EQUALITY%20DELETES),
 it distinguish position deletes and equality delete. In [spec of 
manifest_list](https://iceberg.apache.org/spec/#manifests:~:text=int%20with%20meaning%3A%200%3A%20data%2C%201%3A%20deletes),
 it only distinguish data and delete.
   > 
   > So is that just a inconsistent in spec?
   
   Sorry, my bad. I thought it was the same struct in `datafile`. But it looks 
like they are different. Maybe the information is not required in ManifestList. 
Forget my comment then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to