Fokko commented on code in PR #79:
URL: https://github.com/apache/iceberg-rust/pull/79#discussion_r1425045199


##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -0,0 +1,1864 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manifest for Iceberg.
+use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
+
+use super::{
+    FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, 
PartitionSpec, Schema,
+    Struct,
+};
+use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER};
+use crate::io::OutputFile;
+use crate::spec::PartitionField;
+use crate::{Error, ErrorKind};
+use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as 
AvroWriter};
+use futures::AsyncWriteExt;
+use serde_json::to_vec;
+use std::cmp::min;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+/// A manifest contains metadata and a list of entries.
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub struct Manifest {
+    metadata: ManifestMetadata,
+    entries: Vec<ManifestEntry>,
+}
+
+impl Manifest {
+    /// Parse manifest from bytes of avro file.
+    pub fn parse_avro(bs: &[u8]) -> Result<Self, Error> {
+        let reader = AvroReader::new(bs)?;
+
+        // Parse manifest metadata
+        let meta = reader.user_metadata();
+        let metadata = ManifestMetadata::parse(meta)?;
+
+        // Parse manifest entries
+        let partition_type = 
metadata.partition_spec.partition_type(&metadata.schema)?;
+
+        let entries = match metadata.format_version {
+            FormatVersion::V1 => {
+                let schema = manifest_schema_v1(partition_type.clone())?;
+                let reader = AvroReader::with_schema(&schema, bs)?;
+                reader
+                    .into_iter()
+                    .map(|value| {
+                        from_value::<_serde::ManifestEntryV1>(&value?)?
+                            .try_into(&partition_type, &metadata.schema)
+                    })
+                    .collect::<Result<Vec<_>, Error>>()?
+            }
+            FormatVersion::V2 => {
+                let schema = manifest_schema_v2(partition_type.clone())?;
+                let reader = AvroReader::with_schema(&schema, bs)?;
+                reader
+                    .into_iter()
+                    .map(|value| {
+                        from_value::<_serde::ManifestEntryV2>(&value?)?
+                            .try_into(&partition_type, &metadata.schema)
+                    })
+                    .collect::<Result<Vec<_>, Error>>()?
+            }
+        };
+
+        Ok(Manifest { metadata, entries })
+    }
+}
+
+/// A manifest writer.
+pub struct ManifestWriter {
+    output: OutputFile,
+
+    snapshot_id: i64,
+
+    added_files: u32,
+    added_rows: u64,
+    existing_files: u32,
+    existing_rows: u64,
+    deleted_files: u32,
+    deleted_rows: u64,
+
+    min_seq_num: Option<i64>,
+
+    key_metadata: Vec<u8>,
+
+    field_summary: HashMap<i32, FieldSummary>,
+}
+
+impl ManifestWriter {
+    /// Create a new manifest writer.
+    pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec<u8>) -> 
Self {
+        Self {
+            output,
+            snapshot_id,
+            added_files: 0,
+            added_rows: 0,
+            existing_files: 0,
+            existing_rows: 0,
+            deleted_files: 0,
+            deleted_rows: 0,
+            min_seq_num: None,
+            key_metadata,
+            field_summary: HashMap::new(),
+        }
+    }
+
+    fn update_field_summary(&mut self, entry: &ManifestEntry) {
+        // Update field summary
+        for (&k, &v) in &entry.data_file.null_value_counts {
+            let field_summary = self.field_summary.entry(k).or_default();
+            if v > 0 {
+                field_summary.contains_null = true;
+            }
+        }
+
+        for (&k, &v) in &entry.data_file.nan_value_counts {
+            let field_summary = self.field_summary.entry(k).or_default();
+            if v > 0 {
+                field_summary.contains_nan = Some(true);
+            }
+            if v == 0 {
+                field_summary.contains_nan = Some(false);
+            }
+        }
+
+        for (&k, v) in &entry.data_file.lower_bounds {
+            let field_summary = self.field_summary.entry(k).or_default();
+            if let Some(cur) = &field_summary.lower_bound {
+                if v < cur {
+                    field_summary.lower_bound = Some(v.clone());
+                }
+            } else {
+                field_summary.lower_bound = Some(v.clone());
+            }
+        }
+
+        for (&k, v) in &entry.data_file.upper_bounds {
+            let field_summary = self.field_summary.entry(k).or_default();
+            if let Some(cur) = &field_summary.upper_bound {
+                if v > cur {
+                    field_summary.upper_bound = Some(v.clone());
+                }
+            } else {
+                field_summary.upper_bound = Some(v.clone());
+            }
+        }
+    }
+
+    fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> 
Vec<FieldSummary> {
+        let mut partition_summary = 
Vec::with_capacity(self.field_summary.len());
+        for field in spec_fields {
+            let entry = self
+                .field_summary
+                .remove(&field.source_id)
+                .unwrap_or(FieldSummary::default());
+            partition_summary.push(entry);
+        }
+        partition_summary
+    }
+
+    /// Write a manifest entry.
+    pub async fn write(mut self, manifest: Manifest) -> 
Result<ManifestListEntry, Error> {
+        // Create the avro writer
+        let partition_type = manifest
+            .metadata
+            .partition_spec
+            .partition_type(&manifest.metadata.schema)?;
+        let table_schema = &manifest.metadata.schema;
+        let avro_schema = match manifest.metadata.format_version {
+            FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?,
+            FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?,
+        };
+        let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
+        avro_writer.add_user_metadata(
+            "schema".to_string(),
+            to_vec(table_schema).map_err(|err| {
+                Error::new(ErrorKind::DataInvalid, "Fail to serialize table 
schema")
+                    .with_source(err)
+            })?,
+        )?;
+        avro_writer.add_user_metadata(
+            "schema-id".to_string(),
+            table_schema.schema_id().to_string(),
+        )?;
+        avro_writer.add_user_metadata(
+            "partition-spec".to_string(),
+            to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| {
+                Error::new(ErrorKind::DataInvalid, "Fail to serialize 
partition spec")
+                    .with_source(err)
+            })?,
+        )?;
+        avro_writer.add_user_metadata(
+            "partition-spec-id".to_string(),
+            manifest.metadata.partition_spec.spec_id.to_string(),
+        )?;
+        avro_writer.add_user_metadata(
+            "format-version".to_string(),
+            (manifest.metadata.format_version as u8).to_string(),
+        )?;
+        if manifest.metadata.format_version == FormatVersion::V2 {
+            avro_writer
+                .add_user_metadata("content".to_string(), 
manifest.metadata.content.to_string())?;
+        }
+
+        // Write manifest entries
+        for entry in manifest.entries {
+            if (entry.status == ManifestStatus::Deleted || entry.status == 
ManifestStatus::Existing)
+                && (entry.sequence_number.is_none() || 
entry.file_sequence_number.is_none())
+            {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Manifest entry with status Existing or Deleted should 
have sequence number",
+                ));
+            }
+
+            match entry.status {
+                ManifestStatus::Added => {
+                    self.added_files += 1;
+                    self.added_rows += entry.data_file.record_count;
+                }
+                ManifestStatus::Deleted => {
+                    self.deleted_files += 1;
+                    self.deleted_rows += entry.data_file.record_count;
+                }
+                ManifestStatus::Existing => {
+                    self.existing_files += 1;
+                    self.existing_rows += entry.data_file.record_count;
+                }
+            }
+
+            if entry.is_alive() {
+                if let Some(seq_num) = entry.sequence_number {
+                    self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, 
|v| min(v, seq_num)));
+                }
+            }
+
+            self.update_field_summary(&entry);
+
+            let value = match manifest.metadata.format_version {
+                FormatVersion::V1 => {
+                    to_value(_serde::ManifestEntryV1::try_from(entry, 
&partition_type)?)?
+                        .resolve(&avro_schema)?
+                }
+                FormatVersion::V2 => {
+                    to_value(_serde::ManifestEntryV2::try_from(entry, 
&partition_type)?)?
+                        .resolve(&avro_schema)?
+                }
+            };
+
+            avro_writer.append(value)?;
+        }
+
+        let length = avro_writer.flush()?;
+        let content = avro_writer.into_inner()?;
+        let mut writer = self.output.writer().await?;
+        writer.write_all(&content).await.map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "Fail to write Manifest 
Entry").with_source(err)
+        })?;
+        writer.close().await.map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "Fail to write Manifest 
Entry").with_source(err)
+        })?;
+
+        let partition_summary =
+            
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
+
+        Ok(ManifestListEntry {
+            manifest_path: self.output.location().to_string(),
+            manifest_length: length as i64,
+            partition_spec_id: manifest.metadata.partition_spec.spec_id,
+            content: manifest.metadata.content,
+            // sequence_number and min_sequence_number with 
UNASSIGNED_SEQUENCE_NUMBER will be replace with
+            // real sequence number in `ManifestListWriter`.
+            sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+            min_sequence_number: 
self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
+            added_snapshot_id: self.snapshot_id,
+            added_data_files_count: Some(self.added_files),
+            existing_data_files_count: Some(self.existing_files),
+            deleted_data_files_count: Some(self.deleted_files),
+            added_rows_count: Some(self.added_rows),
+            existing_rows_count: Some(self.existing_rows),
+            deleted_rows_count: Some(self.deleted_rows),
+            partitions: partition_summary,
+            key_metadata: self.key_metadata,
+        })
+    }
+}
+
+/// This is a helper module that defines the schema field of the manifest list 
entry.
+mod _const_schema {
+    use std::sync::Arc;
+
+    use apache_avro::Schema as AvroSchema;
+    use once_cell::sync::Lazy;
+
+    use crate::{
+        avro::schema_to_avro_schema,
+        spec::{
+            ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, 
Schema, StructType, Type,
+        },
+        Error,
+    };
+
+    static STATUS: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                0,
+                "status",
+                Type::Primitive(PrimitiveType::Int),
+            ))
+        })
+    };
+
+    static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                1,
+                "snapshot_id",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                1,
+                "snapshot_id",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                3,
+                "sequence_number",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                4,
+                "file_sequence_number",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    static CONTENT: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                134,
+                "content",
+                Type::Primitive(PrimitiveType::Int),
+            ))
+        })
+    };
+
+    static FILE_PATH: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                100,
+                "file_path",
+                Type::Primitive(PrimitiveType::String),
+            ))
+        })
+    };
+
+    static FILE_FORMAT: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                101,
+                "file_format",
+                Type::Primitive(PrimitiveType::String),
+            ))
+        })
+    };
+
+    static RECORD_COUNT: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                103,
+                "record_count",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                104,
+                "file_size_in_bytes",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    // Deprecated. Always write a default in v1. Do not write in v2.
+    static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::required(
+                105,
+                "block_size_in_bytes",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
+
+    static COLUMN_SIZES: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                108,
+                "column_sizes",
+                Type::Map(MapType {
+                    key_field: Arc::new(NestedField::required(
+                        117,
+                        "key",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    value_field: Arc::new(NestedField::required(
+                        118,
+                        "value",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                }),
+            ))
+        })
+    };
+
+    static VALUE_COUNTS: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                109,
+                "value_counts",
+                Type::Map(MapType {
+                    key_field: Arc::new(NestedField::required(
+                        119,
+                        "key",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    value_field: Arc::new(NestedField::required(
+                        120,
+                        "value",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                }),
+            ))
+        })
+    };
+
+    static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                110,
+                "null_value_counts",
+                Type::Map(MapType {
+                    key_field: Arc::new(NestedField::required(
+                        121,
+                        "key",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    value_field: Arc::new(NestedField::required(
+                        122,
+                        "value",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                }),
+            ))
+        })
+    };
+
+    static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                137,
+                "nan_value_counts",
+                Type::Map(MapType {
+                    key_field: Arc::new(NestedField::required(
+                        138,
+                        "key",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    value_field: Arc::new(NestedField::required(
+                        139,
+                        "value",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                }),
+            ))
+        })
+    };
+
+    static DISTINCT_COUNTS: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                111,
+                "distinct_counts",
+                Type::Map(MapType {
+                    key_field: Arc::new(NestedField::required(
+                        123,
+                        "key",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    value_field: Arc::new(NestedField::required(
+                        124,
+                        "value",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                }),
+            ))
+        })
+    };

Review Comment:
   Both Java and PyIceberg don't write `distinct_counts`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to