JanKaul commented on code in PR #79:
URL: https://github.com/apache/iceberg-rust/pull/79#discussion_r1362007623


##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -0,0 +1,671 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manifest for Iceberg.
+use super::{FormatVersion, ManifestContentType, PartitionSpec, Schema, Struct};
+use super::{Literal, Type};
+use crate::{Error, ErrorKind};
+use apache_avro::{from_value, Reader as AvroReader, Schema as AvroSchema};
+use std::collections::HashMap;
+use std::str::FromStr;
+
+/// A manifest contains metadata and a list of entries.
+pub struct Manifest {
+    metadata: ManifestMetadata,
+    entries: Vec<ManifestEntry>,
+}
+
+impl Manifest {
+    /// Parse manifest from bytes of avro file.
+    pub fn parse_avro(bs: &[u8]) -> Result<Self, Error> {
+        let reader = AvroReader::new(bs)?;
+
+        // Parse manifest metadata
+        let meta = reader.user_metadata();
+        let metadata = ManifestMetadata::parse(meta)?;
+
+        // Parse manifest entries
+        let partition_type =
+            
Type::Struct(metadata.partition_spec.partition_type(&metadata.schema)?);
+        let mut entries = Vec::<ManifestEntry>::new();
+
+        match metadata.format_version {
+            FormatVersion::V1 => {
+                let reader = AvroReader::with_schema(Self::v1_schema(), bs)?;
+                for value in reader {
+                    entries.push(
+                        from_value::<_serde::ManifestEntryV1>(&value?)?
+                            .try_into(&partition_type, &metadata.schema)?,
+                    );
+                }
+            }
+            FormatVersion::V2 => {
+                let reader = AvroReader::with_schema(Self::v2_schema(), bs)?;
+                for value in reader {
+                    entries.push(
+                        from_value::<_serde::ManifestEntryV2>(&value?)?
+                            .try_into(&partition_type, &metadata.schema)?,
+                    );
+                }
+            }
+        };
+
+        Ok(Manifest { metadata, entries })
+    }
+
+    fn v2_schema() -> &'static AvroSchema {
+        todo!()
+    }
+
+    fn v1_schema() -> &'static AvroSchema {
+        todo!()
+    }
+}
+
+/// Meta data of a manifest.
+#[derive(Debug, PartialEq, Clone)]
+pub struct ManifestMetadata {
+    /// The table schema at the time the manifest
+    /// was written
+    schema: Schema,
+    /// ID of the schema used to write the manifest as a string
+    schema_id: i32,
+    /// The partition spec used  to write the manifest
+    partition_spec: PartitionSpec,
+    /// ID of the partition spec used to write the manifest as a string
+    partition_spec_id: i32,
+    /// Table format version number of the manifest as a string
+    format_version: FormatVersion,
+    /// Type of content files tracked by the manifest: “data” or “deletes”
+    content: ManifestContentType,
+}
+
+impl ManifestMetadata {
+    /// Parse from metadata in avro file.
+    pub fn parse(meta: &HashMap<String, Vec<u8>>) -> Result<Self, Error> {
+        let schema = {
+            let bs = meta.get("schema").ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "schema is required in manifest metadata but not found",
+                )
+            })?;
+            serde_json::from_slice::<Schema>(bs).map_err(|err| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Fail to parse schema in manifest metadata",
+                )
+                .with_source(err)
+            })?
+        };
+        let schema_id: i32 = meta
+            .get("schema-id")
+            .and_then(|bs| {
+                Some(String::from_utf8_lossy(bs).parse().map_err(|err| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Fail to parse schema id in manifest metadata",
+                    )
+                    .with_source(err)
+                }))
+            })
+            .transpose()?
+            .unwrap_or(0);
+        let partition_spec = {
+            let bs = meta.get("partition-spec").ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "partition-spec is required in manifest metadata but not 
found",
+                )
+            })?;
+            serde_json::from_slice::<PartitionSpec>(bs).map_err(|err| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Fail to parse partition spec in manifest metadata",
+                )
+                .with_source(err)
+            })?
+        };
+        let partition_spec_id = meta
+            .get("partition-spec-id")
+            .and_then(|bs| {
+                Some(String::from_utf8_lossy(bs).parse().map_err(|err| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Fail to parse partition spec id in manifest metadata",
+                    )
+                    .with_source(err)
+                }))
+            })
+            .transpose()?
+            .unwrap_or(0);
+        let format_version = if let Some(bs) = meta.get("format-version") {
+            serde_json::from_slice::<FormatVersion>(bs).map_err(|err| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Fail to parse format version in manifest metadata",
+                )
+                .with_source(err)
+            })?
+        } else {
+            FormatVersion::V1
+        };
+        let content = if let Some(v) = meta.get("content") {
+            let v = String::from_utf8_lossy(v);
+            v.parse()?
+        } else {
+            ManifestContentType::Data
+        };
+        Ok(ManifestMetadata {
+            schema,
+            schema_id,
+            partition_spec,
+            partition_spec_id,
+            format_version,
+            content,
+        })
+    }
+}
+
+/// A manifest is an immutable Avro file that lists data files or delete
+/// files, along with each file’s partition data tuple, metrics, and tracking
+/// information.
+pub struct ManifestEntry {
+    /// field: 0
+    ///
+    /// Used to track additions and deletions.
+    status: ManifestStatus,
+    /// field id: 1
+    ///
+    /// Snapshot id where the file was added, or deleted if status is 2.
+    /// Inherited when null.
+    snapshot_id: Option<i64>,
+    /// field id: 3
+    ///
+    /// Data sequence number of the file.
+    /// Inherited when null and status is 1 (added).
+    sequence_number: Option<i64>,
+    /// field id: 4
+    ///
+    /// File sequence number indicating when the file was added.
+    /// Inherited when null and status is 1 (added).
+    file_sequence_number: Option<i64>,
+    /// field id: 2
+    ///
+    /// File path, partition tuple, metrics, …
+    data_file: DataFile,
+}
+
+/// Used to track additions and deletions in ManifestEntry.
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+pub enum ManifestStatus {
+    /// Value: 0
+    Existing = 0,
+    /// Value: 1
+    Added = 1,
+    /// Value: 2
+    ///
+    /// Deletes are informational only and not used in scans.
+    Deleted = 2,
+}
+
+impl TryFrom<i32> for ManifestStatus {
+    type Error = Error;
+
+    fn try_from(v: i32) -> Result<ManifestStatus, Error> {
+        match v {
+            0 => Ok(ManifestStatus::Existing),
+            1 => Ok(ManifestStatus::Added),
+            2 => Ok(ManifestStatus::Deleted),
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("manifest status {} is invalid", v),
+            )),
+        }
+    }
+}
+
+/// Data file carries data file path, partition tuple, metrics, …
+#[derive(Debug, PartialEq, Clone)]
+pub struct DataFile {
+    /// field id: 134
+    ///
+    /// Type of content stored by the data file: data, equality deletes,
+    /// or position deletes (all v1 files are data files)
+    content: DataContentType,
+    /// field id: 100
+    ///
+    /// Full URI for the file with FS scheme
+    file_path: String,
+    /// field id: 101
+    ///
+    /// String file format name, avro, orc or parquet
+    file_format: DataFileFormat,
+    /// field id: 102
+    ///
+    /// Partition data tuple, schema based on the partition spec output using
+    /// partition field ids for the struct field ids
+    partition: Struct,
+    /// field id: 103
+    ///
+    /// Number of records in this file
+    record_count: i64,
+    /// field id: 104
+    ///
+    /// Total file size in bytes
+    file_size_in_bytes: i64,
+    /// field id: 108
+    /// key field id: 117
+    /// value field id: 118
+    ///
+    /// Map from column id to the total size on disk of all regions that
+    /// store the column. Does not include bytes necessary to read other
+    /// columns, like footers. Leave null for row-oriented formats (Avro)
+    column_sizes: Option<HashMap<i32, i64>>,
+    /// field id: 109
+    /// key field id: 119
+    /// value field id: 120
+    ///
+    /// Map from column id to number of values in the column (including null
+    /// and NaN values)
+    value_counts: Option<HashMap<i32, i64>>,
+    /// field id: 110
+    /// key field id: 121
+    /// value field id: 122
+    ///
+    /// Map from column id to number of null values in the column
+    null_value_counts: Option<HashMap<i32, i64>>,
+    /// field id: 137
+    /// key field id: 138
+    /// value field id: 139
+    ///
+    /// Map from column id to number of NaN values in the column
+    nan_value_counts: Option<HashMap<i32, i64>>,
+    /// field id: 111
+    /// key field id: 123
+    /// value field id: 124
+    ///
+    /// Map from column id to number of distinct values in the column;
+    /// distinct counts must be derived using values in the file by counting
+    /// or using sketches, but not using methods like merging existing
+    /// distinct counts
+    distinct_counts: Option<HashMap<i32, i64>>,
+    /// field id: 125
+    /// key field id: 126
+    /// value field id: 127
+    ///
+    /// Map from column id to lower bound in the column serialized as binary.
+    /// Each value must be less than or equal to all non-null, non-NaN values
+    /// in the column for the file.
+    ///
+    /// Reference:
+    ///
+    /// - [Binary single-value 
serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
+    lower_bounds: Option<HashMap<i32, Literal>>,
+    /// field id: 128
+    /// key field id: 129
+    /// value field id: 130
+    ///
+    /// Map from column id to upper bound in the column serialized as binary.
+    /// Each value must be greater than or equal to all non-null, non-Nan
+    /// values in the column for the file.
+    ///
+    /// Reference:
+    ///
+    /// - [Binary single-value 
serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
+    upper_bounds: Option<HashMap<i32, Literal>>,
+    /// field id: 131
+    ///
+    /// Implementation-specific key metadata for encryption
+    key_metadata: Option<Vec<u8>>,
+    /// field id: 132
+    /// element field id: 133
+    ///
+    /// Split offsets for the data file. For example, all row group offsets
+    /// in a Parquet file. Must be sorted ascending
+    split_offsets: Option<Vec<i64>>,
+    /// field id: 135
+    /// element field id: 136
+    ///
+    /// Field ids used to determine row equality in equality delete files.
+    /// Required when content is EqualityDeletes and should be null
+    /// otherwise. Fields with ids listed in this column must be present
+    /// in the delete file
+    equality_ids: Option<Vec<i32>>,
+    /// field id: 140
+    ///
+    /// ID representing sort order for this file.
+    ///
+    /// If sort order ID is missing or unknown, then the order is assumed to
+    /// be unsorted. Only data files and equality delete files should be
+    /// written with a non-null order id. Position deletes are required to be
+    /// sorted by file and position, not a table order, and should set sort
+    /// order id to null. Readers must ignore sort order id for position
+    /// delete files.
+    sort_order_id: Option<i32>,
+}
+
+/// Type of content stored by the data file: data, equality deletes, or
+/// position deletes (all v1 files are data files)
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+pub enum DataContentType {
+    /// value: 0
+    Data = 0,
+    /// value: 1
+    PostionDeletes = 1,
+    /// value: 2
+    EqualityDeletes = 2,
+}
+
+impl TryFrom<i32> for DataContentType {
+    type Error = Error;
+
+    fn try_from(v: i32) -> Result<DataContentType, Error> {
+        match v {
+            0 => Ok(DataContentType::Data),
+            1 => Ok(DataContentType::PostionDeletes),
+            2 => Ok(DataContentType::EqualityDeletes),
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("data content type {} is invalid", v),
+            )),
+        }
+    }
+}
+
+/// Format of this data.
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+pub enum DataFileFormat {
+    /// Avro file format: <https://avro.apache.org/>
+    Avro,
+    /// Orc file format: <https://orc.apache.org/>
+    Orc,
+    /// Parquet file format: <https://parquet.apache.org/>
+    Parquet,
+}
+
+impl FromStr for DataFileFormat {
+    type Err = Error;
+
+    fn from_str(s: &str) -> Result<Self, Error> {
+        match s.to_lowercase().as_str() {
+            "avro" => Ok(Self::Avro),
+            "orc" => Ok(Self::Orc),
+            "parquet" => Ok(Self::Parquet),
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Unsupported data file format: {}", s),
+            )),
+        }
+    }
+}
+
+impl ToString for DataFileFormat {
+    fn to_string(&self) -> String {
+        match self {
+            DataFileFormat::Avro => "avro",
+            DataFileFormat::Orc => "orc",
+            DataFileFormat::Parquet => "parquet",
+        }
+        .to_string()
+    }
+}
+
+mod _serde {
+    use std::collections::HashMap;
+
+    use serde_bytes::ByteBuf;
+    use serde_derive::{Deserialize, Serialize};
+    use serde_with::serde_as;
+    use serde_with::Bytes;
+
+    use crate::spec::Literal;
+    use crate::spec::RawLiteral;
+    use crate::spec::Schema;
+    use crate::spec::Type;
+    use crate::Error;
+    use crate::ErrorKind;
+
+    use super::ManifestEntry;
+
+    #[derive(Serialize, Deserialize)]
+    pub(super) struct ManifestEntryV2 {
+        status: i32,
+        snapshot_id: Option<i64>,
+        sequence_number: Option<i64>,
+        file_sequence_number: Option<i64>,
+        data_file: DataFile,
+    }
+
+    impl ManifestEntryV2 {
+        pub fn try_from(value: ManifestEntry, partition_type: &Type) -> 
Result<Self, Error> {
+            Ok(Self {
+                status: value.status as i32,
+                snapshot_id: value.snapshot_id,
+                sequence_number: value.sequence_number,
+                file_sequence_number: value.file_sequence_number,
+                data_file: DataFile::try_from(value.data_file, 
partition_type)?,
+            })
+        }
+
+        pub fn try_into(
+            self,
+            partition_type: &Type,
+            schema: &Schema,
+        ) -> Result<ManifestEntry, Error> {
+            Ok(ManifestEntry {
+                status: self.status.try_into()?,
+                snapshot_id: self.snapshot_id,
+                sequence_number: self.sequence_number,
+                file_sequence_number: self.file_sequence_number,
+                data_file: self.data_file.try_into(partition_type, schema)?,
+            })
+        }
+    }
+
+    #[derive(Serialize, Deserialize)]
+    pub(super) struct ManifestEntryV1 {
+        status: i32,
+        snapshot_id: i64,
+        data_file: DataFile,
+    }
+
+    impl ManifestEntryV1 {
+        pub fn try_from(value: ManifestEntry, partition_type: &Type) -> 
Result<Self, Error> {
+            Ok(Self {
+                status: value.status as i32,
+                snapshot_id: value.snapshot_id.unwrap_or_default(),
+                data_file: DataFile::try_from(value.data_file, 
&partition_type)?,
+            })
+        }
+
+        pub fn try_into(
+            self,
+            partition_type: &Type,
+            schema: &Schema,
+        ) -> Result<ManifestEntry, Error> {
+            Ok(ManifestEntry {
+                status: self.status.try_into()?,
+                snapshot_id: Some(self.snapshot_id),
+                sequence_number: None,
+                file_sequence_number: None,
+                data_file: self.data_file.try_into(&partition_type, schema)?,
+            })
+        }
+    }
+
+    #[serde_as]
+    #[derive(Serialize, Deserialize)]
+    pub(super) struct DataFile {
+        #[serde(default)]
+        content: i32,
+        file_path: String,
+        file_format: String,
+        partition: RawLiteral,
+        record_count: i64,
+        file_size_in_bytes: i64,
+        column_sizes: Option<Vec<I64Entry>>,
+        value_counts: Option<Vec<I64Entry>>,
+        null_value_counts: Option<Vec<I64Entry>>,
+        nan_value_counts: Option<Vec<I64Entry>>,
+        distinct_counts: Option<Vec<I64Entry>>,
+        lower_bounds: Option<Vec<BytesEntry>>,
+        upper_bounds: Option<Vec<BytesEntry>>,
+        #[serde_as(as = "Option<Bytes>")]
+        key_metadata: Option<Vec<u8>>,
+        split_offsets: Option<Vec<i64>>,
+        #[serde(default)]
+        equality_ids: Option<Vec<i32>>,
+        sort_order_id: Option<i32>,
+    }
+
+    impl DataFile {
+        pub fn try_from(value: super::DataFile, partition_type: &Type) -> 
Result<Self, Error> {
+            Ok(Self {
+                content: value.content as i32,
+                file_path: value.file_path,
+                file_format: value.file_format.to_string(),
+                partition: 
RawLiteral::try_from(Literal::Struct(value.partition), partition_type)?,
+                record_count: value.record_count,
+                file_size_in_bytes: value.file_size_in_bytes,
+                column_sizes: value.column_sizes.map(to_i64_entry),
+                value_counts: value.value_counts.map(to_i64_entry),
+                null_value_counts: value.null_value_counts.map(to_i64_entry),
+                nan_value_counts: value.nan_value_counts.map(to_i64_entry),
+                distinct_counts: value.distinct_counts.map(to_i64_entry),
+                lower_bounds: value.lower_bounds.map(to_bytes_entry),
+                upper_bounds: value.upper_bounds.map(to_bytes_entry),
+                key_metadata: value.key_metadata,
+                split_offsets: value.split_offsets,
+                equality_ids: value.equality_ids,
+                sort_order_id: value.sort_order_id,
+            })
+        }
+        pub fn try_into(
+            self,
+            partition_type: &Type,
+            schema: &Schema,
+        ) -> Result<super::DataFile, Error> {
+            let partition = self
+                .partition
+                .try_into(&partition_type)?
+                .and_then(|v| {
+                    if let Literal::Struct(v) = v {
+                        Some(Ok(v))
+                    } else {
+                        Some(Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "partition value is not a struct",
+                        )))
+                    }
+                })
+                .transpose()?
+                .unwrap_or_default();
+            Ok(super::DataFile {
+                content: self.content.try_into()?,
+                file_path: self.file_path,
+                file_format: self.file_format.parse()?,
+                partition,
+                record_count: self.record_count,
+                file_size_in_bytes: self.file_size_in_bytes,
+                column_sizes: self.column_sizes.map(parse_i64_entry),
+                value_counts: self.value_counts.map(parse_i64_entry),
+                null_value_counts: self.null_value_counts.map(parse_i64_entry),
+                nan_value_counts: self.nan_value_counts.map(parse_i64_entry),
+                distinct_counts: self.distinct_counts.map(parse_i64_entry),
+                lower_bounds: self
+                    .lower_bounds
+                    .map(|v| parse_bytes_entry(v, schema))
+                    .transpose()?,
+                upper_bounds: self
+                    .upper_bounds
+                    .map(|v| parse_bytes_entry(v, schema))
+                    .transpose()?,
+                key_metadata: self.key_metadata,
+                split_offsets: self.split_offsets,
+                equality_ids: self.equality_ids,
+                sort_order_id: self.sort_order_id,
+            })
+        }
+    }
+
+    #[serde_as]
+    #[derive(Serialize, Deserialize)]
+    #[cfg_attr(test, derive(Debug, PartialEq, Eq))]
+    struct BytesEntry {
+        key: i32,
+        #[serde_as(as = "Bytes")]
+        value: Vec<u8>,

Review Comment:
   Would it make sense to use `serde_bytes::ByteBuf` here? It is supposed to 
have an optimized serialization/deserialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to