ZENOTME commented on code in PR #738: URL: https://github.com/apache/iceberg-rust/pull/738#discussion_r1918221289
########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +281,141 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_entry(&self, entry: &ManifestEntry) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if entry.data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + entry.data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if entry.data_file.content != DataContentType::EqualityDeletes + && entry.data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", entry.data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_entry(&entry)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = Some(self.snapshot_id); + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = Some(self.snapshot_id); + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + pub fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_entry(&entry)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = Some(self.snapshot_id); + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Existing` + pub fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> { Review Comment: Oh I got you. I think the interface here is package private. I forget to mark them as `pub(crate)` to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org