liurenjie1024 commented on code in PR #738:
URL: https://github.com/apache/iceberg-rust/pull/738#discussion_r1920149166


##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -210,38 +284,207 @@ impl ManifestWriter {
             deleted_rows: 0,
             min_seq_num: None,
             key_metadata,
-            partitions: vec![],
+            manifset_entries: Vec::new(),
+            metadata,
         }
     }
 
     fn construct_partition_summaries(
         &mut self,
         partition_type: &StructType,
     ) -> Result<Vec<FieldSummary>> {
-        let partitions = std::mem::take(&mut self.partitions);
         let mut field_stats: Vec<_> = partition_type
             .fields()
             .iter()
             .map(|f| 
PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
             .collect();
-        for partition in partitions {
-            for (literal, stat) in 
partition.into_iter().zip_eq(field_stats.iter_mut()) {
+        for partition in self.manifset_entries.iter().map(|e| 
&e.data_file.partition) {
+            for (literal, stat) in 
partition.iter().zip_eq(field_stats.iter_mut()) {
                 let primitive_literal = literal.map(|v| 
v.as_primitive_literal().unwrap());
                 stat.update(primitive_literal)?;
             }
         }
         Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
     }
 
-    /// Write a manifest.
-    pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
+    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
+        match self.metadata.content {
+            ManifestContentType::Data => {
+                if data_file.content != DataContentType::Data {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Content type of entry {:?} should have 
DataContentType::Data",
+                            data_file.content
+                        ),
+                    ));
+                }
+            }
+            ManifestContentType::Deletes => {
+                if data_file.content != DataContentType::EqualityDeletes
+                    && data_file.content != DataContentType::PositionDeletes
+                {
+                    return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Content type of entry {:?} should have 
DataContentType::EqualityDeletes or DataContentType::PositionDeletes", 
data_file.content),
+                ));
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Add a new manifest entry. This method will update following status of 
the entry:
+    /// - Update the entry status to `Added`
+    /// - Set the snapshot id to the current snapshot id
+    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
+    /// - Set the file sequence number to `None`
+    pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_data_file(&entry.data_file)?;
+        if entry.sequence_number().is_some_and(|n| n >= 0) {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = self.snapshot_id;
+            entry.file_sequence_number = None;
+        } else {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = self.snapshot_id;
+            entry.sequence_number = None;
+            entry.file_sequence_number = None;
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add an added entry for a file with a specific sequence number. The 
entry's snapshot ID will be this manifest's snapshot ID. The entry's data 
sequence
+    /// number will be the provided data sequence number. The entry's file 
sequence number will be
+    /// assigned at commit.
+    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> 
Result<()> {
+        self.check_data_file(&data_file)?;
+        let entry = ManifestEntry {
+            status: ManifestStatus::Added,
+            snapshot_id: self.snapshot_id,
+            sequence_number: (sequence_number >= 0).then_some(sequence_number),
+            file_sequence_number: None,
+            data_file,
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add a delete manifest entry. This method will update following status 
of the entry:
+    /// - Update the entry status to `Deleted`
+    /// - Set the snapshot id to the current snapshot id
+    ///
+    /// # TODO
+    /// Remove this allow later
+    #[allow(dead_code)]
+    pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_data_file(&entry.data_file)?;
+        entry.status = ManifestStatus::Deleted;
+        entry.snapshot_id = self.snapshot_id;
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add a delete manifest entry. The entry's snapshot ID will be this 
manifest's snapshot ID.
+    /// However, the original data and file sequence numbers of the file must 
be preserved when
+    /// the file is marked as deleted.
+    pub fn delete_file(
+        &mut self,
+        data_file: DataFile,
+        sequence_number: i64,
+        file_sequence_number: i64,
+    ) -> Result<()> {
+        self.check_data_file(&data_file)?;
+        let entry = ManifestEntry {
+            status: ManifestStatus::Deleted,
+            snapshot_id: self.snapshot_id,
+            sequence_number: Some(sequence_number),
+            file_sequence_number: Some(file_sequence_number),
+            data_file,
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add an existing manifest entry. This method will update following 
status of the entry:
+    /// - Update the entry status to `Existing`
+    ///
+    /// # TODO
+    /// Remove this allow later
+    #[allow(dead_code)]
+    pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_data_file(&entry.data_file)?;
+        entry.status = ManifestStatus::Existing;
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add an existing manifest entry. The original data and file sequence 
numbers, snapshot ID,
+    /// which were assigned at commit, must be preserved when adding an 
existing entry.
+    pub fn existing_file(
+        &mut self,
+        data_file: DataFile,
+        snapshot_id: i64,
+        sequence_number: i64,
+        file_sequence_number: i64,

Review Comment:
   This should be optional, `file_sequence_number` could be inherited from 
snapshot.



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -210,38 +284,207 @@ impl ManifestWriter {
             deleted_rows: 0,
             min_seq_num: None,
             key_metadata,
-            partitions: vec![],
+            manifset_entries: Vec::new(),
+            metadata,
         }
     }
 
     fn construct_partition_summaries(
         &mut self,
         partition_type: &StructType,
     ) -> Result<Vec<FieldSummary>> {
-        let partitions = std::mem::take(&mut self.partitions);
         let mut field_stats: Vec<_> = partition_type
             .fields()
             .iter()
             .map(|f| 
PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
             .collect();
-        for partition in partitions {
-            for (literal, stat) in 
partition.into_iter().zip_eq(field_stats.iter_mut()) {
+        for partition in self.manifset_entries.iter().map(|e| 
&e.data_file.partition) {
+            for (literal, stat) in 
partition.iter().zip_eq(field_stats.iter_mut()) {
                 let primitive_literal = literal.map(|v| 
v.as_primitive_literal().unwrap());
                 stat.update(primitive_literal)?;
             }
         }
         Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
     }
 
-    /// Write a manifest.
-    pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
+    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
+        match self.metadata.content {
+            ManifestContentType::Data => {
+                if data_file.content != DataContentType::Data {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Content type of entry {:?} should have 
DataContentType::Data",
+                            data_file.content
+                        ),
+                    ));
+                }
+            }
+            ManifestContentType::Deletes => {
+                if data_file.content != DataContentType::EqualityDeletes
+                    && data_file.content != DataContentType::PositionDeletes
+                {
+                    return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Content type of entry {:?} should have 
DataContentType::EqualityDeletes or DataContentType::PositionDeletes", 
data_file.content),
+                ));
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Add a new manifest entry. This method will update following status of 
the entry:
+    /// - Update the entry status to `Added`
+    /// - Set the snapshot id to the current snapshot id
+    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
+    /// - Set the file sequence number to `None`
+    pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_data_file(&entry.data_file)?;
+        if entry.sequence_number().is_some_and(|n| n >= 0) {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = self.snapshot_id;
+            entry.file_sequence_number = None;
+        } else {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = self.snapshot_id;
+            entry.sequence_number = None;
+            entry.file_sequence_number = None;
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add an added entry for a file with a specific sequence number. The 
entry's snapshot ID will be this manifest's snapshot ID. The entry's data 
sequence
+    /// number will be the provided data sequence number. The entry's file 
sequence number will be
+    /// assigned at commit.
+    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> 
Result<()> {
+        self.check_data_file(&data_file)?;
+        let entry = ManifestEntry {
+            status: ManifestStatus::Added,
+            snapshot_id: self.snapshot_id,
+            sequence_number: (sequence_number >= 0).then_some(sequence_number),
+            file_sequence_number: None,
+            data_file,
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add a delete manifest entry. This method will update following status 
of the entry:
+    /// - Update the entry status to `Deleted`
+    /// - Set the snapshot id to the current snapshot id
+    ///
+    /// # TODO
+    /// Remove this allow later
+    #[allow(dead_code)]
+    pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_data_file(&entry.data_file)?;
+        entry.status = ManifestStatus::Deleted;
+        entry.snapshot_id = self.snapshot_id;
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add a delete manifest entry. The entry's snapshot ID will be this 
manifest's snapshot ID.
+    /// However, the original data and file sequence numbers of the file must 
be preserved when
+    /// the file is marked as deleted.
+    pub fn delete_file(
+        &mut self,
+        data_file: DataFile,
+        sequence_number: i64,
+        file_sequence_number: i64,

Review Comment:
   This should be optional, file_sequence_number could be inherited from 
snapshot.



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -41,6 +41,9 @@ use crate::io::OutputFile;
 use crate::spec::PartitionField;
 use crate::{Error, ErrorKind};
 
+/// Placeholder for snapshot ID. The field with this value must be replaced 
with the actual snapshot ID before it is committed.
+pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;

Review Comment:
   We should move this to `snapshot` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to