liurenjie1024 commented on code in PR #738: URL: https://github.com/apache/iceberg-rust/pull/738#discussion_r1920149166
########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + /// number will be the provided data sequence number. The entry's file sequence number will be + /// assigned at commit. + pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: self.snapshot_id, + sequence_number: (sequence_number >= 0).then_some(sequence_number), + file_sequence_number: None, + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = self.snapshot_id; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID. + /// However, the original data and file sequence numbers of the file must be preserved when + /// the file is marked as deleted. + pub fn delete_file( + &mut self, + data_file: DataFile, + sequence_number: i64, + file_sequence_number: i64, + ) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Deleted, + snapshot_id: self.snapshot_id, + sequence_number: Some(sequence_number), + file_sequence_number: Some(file_sequence_number), + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Existing` + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Existing; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. The original data and file sequence numbers, snapshot ID, + /// which were assigned at commit, must be preserved when adding an existing entry. + pub fn existing_file( + &mut self, + data_file: DataFile, + snapshot_id: i64, + sequence_number: i64, + file_sequence_number: i64, Review Comment: This should be optional, `file_sequence_number` could be inherited from snapshot. ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + /// number will be the provided data sequence number. The entry's file sequence number will be + /// assigned at commit. + pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: self.snapshot_id, + sequence_number: (sequence_number >= 0).then_some(sequence_number), + file_sequence_number: None, + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = self.snapshot_id; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID. + /// However, the original data and file sequence numbers of the file must be preserved when + /// the file is marked as deleted. + pub fn delete_file( + &mut self, + data_file: DataFile, + sequence_number: i64, + file_sequence_number: i64, Review Comment: This should be optional, file_sequence_number could be inherited from snapshot. ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -41,6 +41,9 @@ use crate::io::OutputFile; use crate::spec::PartitionField; use crate::{Error, ErrorKind}; +/// Placeholder for snapshot ID. The field with this value must be replaced with the actual snapshot ID before it is committed. +pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1; Review Comment: We should move this to `snapshot` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org