Xuanwo commented on code in PR #738: URL: https://github.com/apache/iceberg-rust/pull/738#discussion_r1920298893
########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + /// number will be the provided data sequence number. The entry's file sequence number will be + /// assigned at commit. + pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: self.snapshot_id, + sequence_number: (sequence_number >= 0).then_some(sequence_number), + file_sequence_number: None, + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { Review Comment: Maybe `delete_entry`? ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + /// number will be the provided data sequence number. The entry's file sequence number will be + /// assigned at commit. + pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: self.snapshot_id, + sequence_number: (sequence_number >= 0).then_some(sequence_number), + file_sequence_number: None, + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = self.snapshot_id; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID. + /// However, the original data and file sequence numbers of the file must be preserved when + /// the file is marked as deleted. + pub fn delete_file( + &mut self, + data_file: DataFile, + sequence_number: i64, + file_sequence_number: i64, + ) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Deleted, + snapshot_id: self.snapshot_id, + sequence_number: Some(sequence_number), + file_sequence_number: Some(file_sequence_number), + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Existing` + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Existing; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. The original data and file sequence numbers, snapshot ID, + /// which were assigned at commit, must be preserved when adding an existing entry. + pub fn existing_file( Review Comment: How about `add_existing_file`? ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { Review Comment: Maybe `add_entry`? ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + /// number will be the provided data sequence number. The entry's file sequence number will be + /// assigned at commit. + pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: self.snapshot_id, + sequence_number: (sequence_number >= 0).then_some(sequence_number), + file_sequence_number: None, + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = self.snapshot_id; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID. + /// However, the original data and file sequence numbers of the file must be preserved when + /// the file is marked as deleted. + pub fn delete_file( + &mut self, + data_file: DataFile, + sequence_number: i64, + file_sequence_number: i64, + ) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Deleted, + snapshot_id: self.snapshot_id, + sequence_number: Some(sequence_number), + file_sequence_number: Some(file_sequence_number), + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Existing` + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Existing; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. The original data and file sequence numbers, snapshot ID, + /// which were assigned at commit, must be preserved when adding an existing entry. + pub fn existing_file( + &mut self, + data_file: DataFile, + snapshot_id: i64, + sequence_number: i64, + file_sequence_number: i64, + ) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Existing, + snapshot_id: Some(snapshot_id), + sequence_number: Some(sequence_number), + file_sequence_number: Some(file_sequence_number), + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + fn add_entry(&mut self, entry: ManifestEntry) -> Result<()> { + // Check if the entry has sequence number + if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) + && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Manifest entry with status Existing or Deleted should have sequence number", + )); + } + + // Update the statistics + match entry.status { + ManifestStatus::Added => { + self.added_files += 1; + self.added_rows += entry.data_file.record_count; + } + ManifestStatus::Deleted => { + self.deleted_files += 1; + self.deleted_rows += entry.data_file.record_count; + } + ManifestStatus::Existing => { + self.existing_files += 1; + self.existing_rows += entry.data_file.record_count; + } + } + if entry.is_alive() { + if let Some(seq_num) = entry.sequence_number { + self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); + } + } + self.manifset_entries.push(entry); + Ok(()) + } + + /// Write manifest file and return it. + pub async fn to_manifest_file(mut self) -> Result<ManifestFile> { Review Comment: `to_manifest_file` sounds more like a conversion function, but it actually involves heavy I/O operations. How about using the name suggested in the comments: `write_manifest_file`? ########## crates/iceberg/src/spec/manifest.rs: ########## @@ -210,38 +284,207 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } fn construct_partition_summaries( &mut self, partition_type: &StructType, ) -> Result<Vec<FieldSummary>> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } } Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> { + fn check_data_file(&self, data_file: &DataFile) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if data_file.content != DataContentType::EqualityDeletes + && data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = self.snapshot_id; + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + /// number will be the provided data sequence number. The entry's file sequence number will be + /// assigned at commit. + pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: self.snapshot_id, + sequence_number: (sequence_number >= 0).then_some(sequence_number), + file_sequence_number: None, + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = self.snapshot_id; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID. + /// However, the original data and file sequence numbers of the file must be preserved when + /// the file is marked as deleted. + pub fn delete_file( + &mut self, + data_file: DataFile, + sequence_number: i64, + file_sequence_number: i64, + ) -> Result<()> { + self.check_data_file(&data_file)?; + let entry = ManifestEntry { + status: ManifestStatus::Deleted, + snapshot_id: self.snapshot_id, + sequence_number: Some(sequence_number), + file_sequence_number: Some(file_sequence_number), + data_file, + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Existing` + /// + /// # TODO + /// Remove this allow later + #[allow(dead_code)] + pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> { Review Comment: Hi, the API naming seems a bit unclear. If we are "adding an existing manifest entry," how about naming this API `add_existing_entry`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org