liurenjie1024 commented on code in PR #79: URL: https://github.com/apache/iceberg-rust/pull/79#discussion_r1424901931
########## crates/iceberg/src/spec/partition.rs: ########## @@ -69,6 +71,30 @@ impl PartitionSpec { .iter() .all(|f| matches!(f.transform, Transform::Void)) } + + /// Returns the partition type of this partition spec. + pub fn partition_type(&self, schema: &Schema) -> Result<StructType, Error> { Review Comment: Could we add some ut for this? What should be the return type if no partition field? ########## crates/iceberg/src/spec/manifest_list.rs: ########## @@ -156,12 +171,36 @@ impl ManifestListWriter { match self.format_version { FormatVersion::V1 => { for manifest_entry in manifest_entries { - let manifest_entry: ManifestListEntryV1 = manifest_entry.into(); + let manifest_entry: ManifestListEntryV1 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } } FormatVersion::V2 => { - for manifest_entry in manifest_entries { + for mut manifest_entry in manifest_entries { + if manifest_entry.sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest_entry.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest_entry.added_snapshot_id + ), + )); + } + manifest_entry.sequence_number = self.sequence_number; Review Comment: This assignment is meaningless to me. This manifest entry will not return to user. ########## crates/iceberg/src/spec/manifest_list.rs: ########## @@ -156,12 +171,36 @@ impl ManifestListWriter { match self.format_version { FormatVersion::V1 => { for manifest_entry in manifest_entries { - let manifest_entry: ManifestListEntryV1 = manifest_entry.into(); + let manifest_entry: ManifestListEntryV1 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } } FormatVersion::V2 => { - for manifest_entry in manifest_entries { + for mut manifest_entry in manifest_entries { + if manifest_entry.sequence_number == UNASSIGNED_SEQUENCE_NUMBER { Review Comment: It seems weird to me to check this in a writer. I think this is supposed to do in transaction api. ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -0,0 +1,1847 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Manifest for Iceberg. +use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; + +use super::{ + FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, + Struct, +}; +use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER}; +use crate::io::OutputFile; +use crate::spec::PartitionField; +use crate::{Error, ErrorKind}; +use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; +use futures::AsyncWriteExt; +use serde_json::to_vec; +use std::cmp::min; +use std::collections::HashMap; +use std::str::FromStr; + +/// A manifest contains metadata and a list of entries. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Manifest { + metadata: ManifestMetadata, + entries: Vec<ManifestEntry>, +} + +impl Manifest { + /// Parse manifest from bytes of avro file. + pub fn parse_avro(bs: &[u8]) -> Result<Self, Error> { + let reader = AvroReader::new(bs)?; + + // Parse manifest metadata + let meta = reader.user_metadata(); + let metadata = ManifestMetadata::parse(meta)?; + + // Parse manifest entries + let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; + + let entries = match metadata.format_version { + FormatVersion::V1 => { + let schema = manifest_schema_v1(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV1>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::<Result<Vec<_>, Error>>()? + } + FormatVersion::V2 => { + let schema = manifest_schema_v2(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV2>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::<Result<Vec<_>, Error>>()? + } + }; + + Ok(Manifest { metadata, entries }) + } +} + +/// A manifest writer. +pub struct ManifestWriter { + output: OutputFile, + + snapshot_id: i64, + + added_files: u32, + added_rows: u64, + existing_files: u32, + existing_rows: u64, + deleted_files: u32, + deleted_rows: u64, + + min_seq_num: Option<i64>, + + key_metadata: Option<Vec<u8>>, + + field_summary: HashMap<i32, FieldSummary>, +} + +impl ManifestWriter { + /// Create a new manifest writer. + pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Option<Vec<u8>>) -> Self { + Self { + output, + snapshot_id, + added_files: 0, + added_rows: 0, + existing_files: 0, + existing_rows: 0, + deleted_files: 0, + deleted_rows: 0, + min_seq_num: None, + key_metadata, + field_summary: HashMap::new(), + } + } + + fn update_field_summary(&mut self, entry: &ManifestEntry) { + // Update field summary + if let Some(null) = &entry.data_file.null_value_counts { + for (&k, &v) in null { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_null = true; + } + } + } + if let Some(nan) = &entry.data_file.nan_value_counts { + for (&k, &v) in nan { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_nan = Some(true); + } + if v == 0 { + field_summary.contains_nan = Some(false); + } + } + } + if let Some(lower_bound) = &entry.data_file.lower_bounds { + for (&k, v) in lower_bound { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.lower_bound { + if v < cur { + field_summary.lower_bound = Some(v.clone()); + } + } else { + field_summary.lower_bound = Some(v.clone()); + } + } + } + if let Some(upper_bound) = &entry.data_file.upper_bounds { + for (&k, v) in upper_bound { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.upper_bound { + if v > cur { + field_summary.upper_bound = Some(v.clone()); + } + } else { + field_summary.upper_bound = Some(v.clone()); + } + } + } + } + + fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec<FieldSummary> { + let mut partition_summary = Vec::with_capacity(self.field_summary.len()); + for field in spec_fields { + let entry = self + .field_summary + .remove(&field.source_id) + .unwrap_or(FieldSummary::default()); + partition_summary.push(entry); + } + partition_summary + } + + /// Write a manifest entry. + pub async fn write(mut self, manifest: Manifest) -> Result<ManifestListEntry, Error> { + // Create the avro writer + let partition_type = manifest + .metadata + .partition_spec + .partition_type(&manifest.metadata.schema)?; + let table_schema = &manifest.metadata.schema; + let avro_schema = match manifest.metadata.format_version { + FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, + FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, + }; + let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); + avro_writer.add_user_metadata( + "schema".to_string(), + to_vec(table_schema).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "schema-id".to_string(), + table_schema.schema_id().to_string(), + )?; + avro_writer.add_user_metadata( + "partition-spec".to_string(), + to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "partition-spec-id".to_string(), + manifest.metadata.partition_spec.spec_id.to_string(), + )?; + avro_writer.add_user_metadata( + "format-version".to_string(), + (manifest.metadata.format_version as u8).to_string(), + )?; + if manifest.metadata.format_version == FormatVersion::V2 { + avro_writer + .add_user_metadata("content".to_string(), manifest.metadata.content.to_string())?; + } + + // Write manifest entries + for entry in manifest.entries { + if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) + && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Manifest entry with status Existing or Deleted should have sequence number", + )); + } + + match entry.status { + ManifestStatus::Added => { + self.added_files += 1; + self.added_rows += entry.data_file.record_count; + } + ManifestStatus::Deleted => { + self.deleted_files += 1; + self.deleted_rows += entry.data_file.record_count; + } + ManifestStatus::Existing => { + self.existing_files += 1; + self.existing_rows += entry.data_file.record_count; + } + } + + if entry.is_alive() { + if let Some(seq_num) = entry.sequence_number { + self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); + } + } + + self.update_field_summary(&entry); + + let value = match manifest.metadata.format_version { + FormatVersion::V1 => { + to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + FormatVersion::V2 => { + to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + }; + + avro_writer.append(value)?; + } + + let length = avro_writer.flush()?; + let content = avro_writer.into_inner()?; + let mut writer = self.output.writer().await?; + writer.write_all(&content).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + writer.close().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + + let partition_summary = + self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); + + Ok(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with + // real sequence number in `ManifestListWriter`. + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER), + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata.unwrap_or_default(), + }) + } +} + +/// This is a helper module that defines the schema field of the manifest list entry. +mod _const_schema { + use std::sync::Arc; + + use apache_avro::Schema as AvroSchema; + use once_cell::sync::Lazy; + + use crate::{ + avro::schema_to_avro_schema, + spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, + }, + Error, + }; + + static STATUS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 0, + "status", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 3, + "sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 4, + "file_sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static CONTENT: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 134, + "content", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + static FILE_PATH: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 100, + "file_path", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + static FILE_FORMAT: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 101, + "file_format", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + static RECORD_COUNT: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 103, + "record_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 104, + "file_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + // Deprecated. Always write a default in v1. Do not write in v2. + static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 105, + "block_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static COLUMN_SIZES: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 108, + "column_sizes", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 117, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 118, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static VALUE_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 109, + "value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 119, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 120, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 110, + "null_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 121, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 122, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 137, + "nan_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 138, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 139, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static DISTINCT_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 111, + "distinct_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 123, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 124, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static LOWER_BOUNDS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 125, + "lower_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 126, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 127, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + static UPPER_BOUNDS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 128, + "upper_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 129, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 130, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + static KEY_METADATA: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 131, + "key_metadata", + Type::Primitive(PrimitiveType::Binary), + )) + }) + }; + + static SPLIT_OFFSETS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 132, + "split_offsets", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 133, + "element", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static EQUALITY_IDS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 135, + "equality_ids", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 136, + "element", + Type::Primitive(PrimitiveType::Int), + )), + }), + )) + }) + }; + + static SORT_ORDER_ID: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 140, + "sort_order_id", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + pub fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> { + let fields = vec![ + STATUS.clone(), + SNAPSHOT_ID_V2.clone(), + SEQUENCE_NUMBER.clone(), + FILE_SEQUENCE_NUMBER.clone(), + Arc::new(NestedField::required( + 2, + "data_file", + Type::Struct(StructType::new(vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + DISTINCT_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + ])), + )), + ]; + let schema = Schema::builder().with_fields(fields).build().unwrap(); + schema_to_avro_schema("manifest", &schema) + } + + pub fn manifest_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> { + let fields = vec![ + STATUS.clone(), + SNAPSHOT_ID_V1.clone(), + Arc::new(NestedField::required( + 2, + "data_file", + Type::Struct(StructType::new(vec![ + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + BLOCK_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + DISTINCT_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + SORT_ORDER_ID.clone(), + ])), + )), + ]; + let schema = Schema::builder().with_fields(fields).build().unwrap(); + schema_to_avro_schema("manifest", &schema) + } +} + +/// Meta data of a manifest that is stored in the key-value metadata of the Avro file +#[derive(Debug, PartialEq, Clone, Eq)] +pub struct ManifestMetadata { + /// The table schema at the time the manifest + /// was written + schema: Schema, + /// ID of the schema used to write the manifest as a string + schema_id: i32, + /// The partition spec used to write the manifest + partition_spec: PartitionSpec, + /// Table format version number of the manifest as a string + format_version: FormatVersion, + /// Type of content files tracked by the manifest: “data” or “deletes” + content: ManifestContentType, +} + +impl ManifestMetadata { + /// Parse from metadata in avro file. + pub fn parse(meta: &HashMap<String, Vec<u8>>) -> Result<Self, Error> { + let schema = { + let bs = meta.get("schema").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "schema is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::<Schema>(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema in manifest metadata", + ) + .with_source(err) + })? + }; + let schema_id: i32 = meta + .get("schema-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema id in manifest metadata", + ) + .with_source(err) + }) + }) + .transpose()? + .unwrap_or(0); + let partition_spec = { + let fields = { + let bs = meta.get("partition-spec").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "partition-spec is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::<Vec<PartitionField>>(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec in manifest metadata", + ) + .with_source(err) + })? + }; + let spec_id = meta + .get("partition-spec-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec id in manifest metadata", + ) + .with_source(err) + }) + }) + .transpose()? + .unwrap_or(0); + PartitionSpec { spec_id, fields } + }; + let format_version = if let Some(bs) = meta.get("format-version") { + serde_json::from_slice::<FormatVersion>(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse format version in manifest metadata", + ) + .with_source(err) + })? + } else { + FormatVersion::V1 + }; + let content = if let Some(v) = meta.get("content") { + let v = String::from_utf8_lossy(v); + v.parse()? + } else { + ManifestContentType::Data + }; + Ok(ManifestMetadata { + schema, + schema_id, + partition_spec, + format_version, + content, + }) + } +} + +/// A manifest is an immutable Avro file that lists data files or delete +/// files, along with each file’s partition data tuple, metrics, and tracking +/// information. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ManifestEntry { + /// field: 0 + /// + /// Used to track additions and deletions. + status: ManifestStatus, + /// field id: 1 + /// + /// Snapshot id where the file was added, or deleted if status is 2. + /// Inherited when null. + snapshot_id: Option<i64>, + /// field id: 3 + /// + /// Data sequence number of the file. + /// Inherited when null and status is 1 (added). + sequence_number: Option<i64>, + /// field id: 4 + /// + /// File sequence number indicating when the file was added. + /// Inherited when null and status is 1 (added). + file_sequence_number: Option<i64>, + /// field id: 2 + /// + /// File path, partition tuple, metrics, … + data_file: DataFile, +} + +impl ManifestEntry { + /// Check if this manifest entry is deleted. + pub fn is_alive(&self) -> bool { + matches!( + self.status, + ManifestStatus::Added | ManifestStatus::Existing + ) + } +} + +/// Used to track additions and deletions in ManifestEntry. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum ManifestStatus { + /// Value: 0 + Existing = 0, + /// Value: 1 + Added = 1, + /// Value: 2 + /// + /// Deletes are informational only and not used in scans. + Deleted = 2, +} + +impl TryFrom<i32> for ManifestStatus { + type Error = Error; + + fn try_from(v: i32) -> Result<ManifestStatus, Error> { + match v { + 0 => Ok(ManifestStatus::Existing), + 1 => Ok(ManifestStatus::Added), + 2 => Ok(ManifestStatus::Deleted), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("manifest status {} is invalid", v), + )), + } + } +} + +/// Data file carries data file path, partition tuple, metrics, … +#[derive(Debug, PartialEq, Clone, Eq)] +pub struct DataFile { + /// field id: 134 + /// + /// Type of content stored by the data file: data, equality deletes, + /// or position deletes (all v1 files are data files) + content: DataContentType, + /// field id: 100 + /// + /// Full URI for the file with FS scheme + file_path: String, + /// field id: 101 + /// + /// String file format name, avro, orc or parquet + file_format: DataFileFormat, + /// field id: 102 + /// + /// Partition data tuple, schema based on the partition spec output using + /// partition field ids for the struct field ids + partition: Struct, + /// field id: 103 + /// + /// Number of records in this file + record_count: u64, + /// field id: 104 + /// + /// Total file size in bytes + file_size_in_bytes: u64, + /// field id: 108 + /// key field id: 117 + /// value field id: 118 + /// + /// Map from column id to the total size on disk of all regions that + /// store the column. Does not include bytes necessary to read other + /// columns, like footers. Leave null for row-oriented formats (Avro) + column_sizes: Option<HashMap<i32, u64>>, Review Comment: Use `HashMap` directly here? Same as below. ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -0,0 +1,1847 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Manifest for Iceberg. +use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; + +use super::{ + FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, + Struct, +}; +use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER}; +use crate::io::OutputFile; +use crate::spec::PartitionField; +use crate::{Error, ErrorKind}; +use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; +use futures::AsyncWriteExt; +use serde_json::to_vec; +use std::cmp::min; +use std::collections::HashMap; +use std::str::FromStr; + +/// A manifest contains metadata and a list of entries. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Manifest { + metadata: ManifestMetadata, + entries: Vec<ManifestEntry>, +} + +impl Manifest { + /// Parse manifest from bytes of avro file. + pub fn parse_avro(bs: &[u8]) -> Result<Self, Error> { + let reader = AvroReader::new(bs)?; + + // Parse manifest metadata + let meta = reader.user_metadata(); + let metadata = ManifestMetadata::parse(meta)?; + + // Parse manifest entries + let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; + + let entries = match metadata.format_version { + FormatVersion::V1 => { + let schema = manifest_schema_v1(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV1>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::<Result<Vec<_>, Error>>()? + } + FormatVersion::V2 => { + let schema = manifest_schema_v2(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV2>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::<Result<Vec<_>, Error>>()? + } + }; + + Ok(Manifest { metadata, entries }) + } +} + +/// A manifest writer. +pub struct ManifestWriter { + output: OutputFile, + + snapshot_id: i64, + + added_files: u32, + added_rows: u64, + existing_files: u32, + existing_rows: u64, + deleted_files: u32, + deleted_rows: u64, + + min_seq_num: Option<i64>, + + key_metadata: Option<Vec<u8>>, Review Comment: Use `Vec` directly? ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -0,0 +1,1847 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Manifest for Iceberg. +use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; + +use super::{ + FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, + Struct, +}; +use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER}; +use crate::io::OutputFile; +use crate::spec::PartitionField; +use crate::{Error, ErrorKind}; +use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; +use futures::AsyncWriteExt; +use serde_json::to_vec; +use std::cmp::min; +use std::collections::HashMap; +use std::str::FromStr; + +/// A manifest contains metadata and a list of entries. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Manifest { + metadata: ManifestMetadata, + entries: Vec<ManifestEntry>, +} + +impl Manifest { + /// Parse manifest from bytes of avro file. + pub fn parse_avro(bs: &[u8]) -> Result<Self, Error> { + let reader = AvroReader::new(bs)?; + + // Parse manifest metadata + let meta = reader.user_metadata(); + let metadata = ManifestMetadata::parse(meta)?; + + // Parse manifest entries + let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; + + let entries = match metadata.format_version { + FormatVersion::V1 => { + let schema = manifest_schema_v1(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV1>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::<Result<Vec<_>, Error>>()? + } + FormatVersion::V2 => { + let schema = manifest_schema_v2(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV2>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::<Result<Vec<_>, Error>>()? + } + }; + + Ok(Manifest { metadata, entries }) + } +} + +/// A manifest writer. +pub struct ManifestWriter { + output: OutputFile, + + snapshot_id: i64, + + added_files: u32, + added_rows: u64, + existing_files: u32, + existing_rows: u64, + deleted_files: u32, + deleted_rows: u64, + + min_seq_num: Option<i64>, + + key_metadata: Option<Vec<u8>>, + + field_summary: HashMap<i32, FieldSummary>, +} + +impl ManifestWriter { + /// Create a new manifest writer. + pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Option<Vec<u8>>) -> Self { + Self { + output, + snapshot_id, + added_files: 0, + added_rows: 0, + existing_files: 0, + existing_rows: 0, + deleted_files: 0, + deleted_rows: 0, + min_seq_num: None, + key_metadata, + field_summary: HashMap::new(), + } + } + + fn update_field_summary(&mut self, entry: &ManifestEntry) { + // Update field summary + if let Some(null) = &entry.data_file.null_value_counts { + for (&k, &v) in null { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_null = true; + } + } + } + if let Some(nan) = &entry.data_file.nan_value_counts { + for (&k, &v) in nan { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_nan = Some(true); + } + if v == 0 { + field_summary.contains_nan = Some(false); + } + } + } + if let Some(lower_bound) = &entry.data_file.lower_bounds { + for (&k, v) in lower_bound { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.lower_bound { + if v < cur { + field_summary.lower_bound = Some(v.clone()); + } + } else { + field_summary.lower_bound = Some(v.clone()); + } + } + } + if let Some(upper_bound) = &entry.data_file.upper_bounds { + for (&k, v) in upper_bound { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.upper_bound { + if v > cur { + field_summary.upper_bound = Some(v.clone()); + } + } else { + field_summary.upper_bound = Some(v.clone()); + } + } + } + } + + fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec<FieldSummary> { + let mut partition_summary = Vec::with_capacity(self.field_summary.len()); + for field in spec_fields { + let entry = self + .field_summary + .remove(&field.source_id) + .unwrap_or(FieldSummary::default()); + partition_summary.push(entry); + } + partition_summary + } + + /// Write a manifest entry. + pub async fn write(mut self, manifest: Manifest) -> Result<ManifestListEntry, Error> { + // Create the avro writer + let partition_type = manifest + .metadata + .partition_spec + .partition_type(&manifest.metadata.schema)?; + let table_schema = &manifest.metadata.schema; + let avro_schema = match manifest.metadata.format_version { + FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, + FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, + }; + let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); + avro_writer.add_user_metadata( + "schema".to_string(), + to_vec(table_schema).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "schema-id".to_string(), + table_schema.schema_id().to_string(), + )?; + avro_writer.add_user_metadata( + "partition-spec".to_string(), + to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "partition-spec-id".to_string(), + manifest.metadata.partition_spec.spec_id.to_string(), + )?; + avro_writer.add_user_metadata( + "format-version".to_string(), + (manifest.metadata.format_version as u8).to_string(), + )?; + if manifest.metadata.format_version == FormatVersion::V2 { + avro_writer + .add_user_metadata("content".to_string(), manifest.metadata.content.to_string())?; + } + + // Write manifest entries + for entry in manifest.entries { + if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) + && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Manifest entry with status Existing or Deleted should have sequence number", + )); + } + + match entry.status { + ManifestStatus::Added => { + self.added_files += 1; + self.added_rows += entry.data_file.record_count; + } + ManifestStatus::Deleted => { + self.deleted_files += 1; + self.deleted_rows += entry.data_file.record_count; + } + ManifestStatus::Existing => { + self.existing_files += 1; + self.existing_rows += entry.data_file.record_count; + } + } + + if entry.is_alive() { + if let Some(seq_num) = entry.sequence_number { + self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); + } + } + + self.update_field_summary(&entry); + + let value = match manifest.metadata.format_version { + FormatVersion::V1 => { + to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + FormatVersion::V2 => { + to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + }; + + avro_writer.append(value)?; + } + + let length = avro_writer.flush()?; + let content = avro_writer.into_inner()?; + let mut writer = self.output.writer().await?; + writer.write_all(&content).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + writer.close().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + + let partition_summary = + self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); + + Ok(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with + // real sequence number in `ManifestListWriter`. + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER), + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata.unwrap_or_default(), + }) + } +} + +/// This is a helper module that defines the schema field of the manifest list entry. +mod _const_schema { + use std::sync::Arc; + + use apache_avro::Schema as AvroSchema; + use once_cell::sync::Lazy; + + use crate::{ + avro::schema_to_avro_schema, + spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, + }, + Error, + }; + + static STATUS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 0, + "status", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 3, + "sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 4, + "file_sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static CONTENT: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 134, + "content", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + static FILE_PATH: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 100, + "file_path", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + static FILE_FORMAT: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 101, + "file_format", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + static RECORD_COUNT: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 103, + "record_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 104, + "file_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + // Deprecated. Always write a default in v1. Do not write in v2. + static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::required( + 105, + "block_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static COLUMN_SIZES: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 108, + "column_sizes", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 117, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 118, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static VALUE_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 109, + "value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 119, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 120, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 110, + "null_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 121, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 122, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 137, + "nan_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 138, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 139, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static DISTINCT_COUNTS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 111, + "distinct_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 123, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 124, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static LOWER_BOUNDS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 125, + "lower_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 126, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 127, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + static UPPER_BOUNDS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 128, + "upper_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 129, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 130, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + static KEY_METADATA: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 131, + "key_metadata", + Type::Primitive(PrimitiveType::Binary), + )) + }) + }; + + static SPLIT_OFFSETS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 132, + "split_offsets", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 133, + "element", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static EQUALITY_IDS: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 135, + "equality_ids", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 136, + "element", + Type::Primitive(PrimitiveType::Int), + )), + }), + )) + }) + }; + + static SORT_ORDER_ID: Lazy<NestedFieldRef> = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 140, + "sort_order_id", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + pub fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> { Review Comment: ```suggestion pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org