liurenjie1024 commented on code in PR #738:
URL: https://github.com/apache/iceberg-rust/pull/738#discussion_r1918041864


##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -210,38 +281,141 @@ impl ManifestWriter {
             deleted_rows: 0,
             min_seq_num: None,
             key_metadata,
-            partitions: vec![],
+            manifset_entries: Vec::new(),
+            metadata,
         }
     }
 
     fn construct_partition_summaries(
         &mut self,
         partition_type: &StructType,
     ) -> Result<Vec<FieldSummary>> {
-        let partitions = std::mem::take(&mut self.partitions);
         let mut field_stats: Vec<_> = partition_type
             .fields()
             .iter()
             .map(|f| 
PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
             .collect();
-        for partition in partitions {
-            for (literal, stat) in 
partition.into_iter().zip_eq(field_stats.iter_mut()) {
+        for partition in self.manifset_entries.iter().map(|e| 
&e.data_file.partition) {
+            for (literal, stat) in 
partition.iter().zip_eq(field_stats.iter_mut()) {
                 let primitive_literal = literal.map(|v| 
v.as_primitive_literal().unwrap());
                 stat.update(primitive_literal)?;
             }
         }
         Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
     }
 
-    /// Write a manifest.
-    pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
+    fn check_entry(&self, entry: &ManifestEntry) -> Result<()> {
+        match self.metadata.content {
+            ManifestContentType::Data => {
+                if entry.data_file.content != DataContentType::Data {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Content type of entry {:?} should have 
DataContentType::Data",
+                            entry.data_file.content
+                        ),
+                    ));
+                }
+            }
+            ManifestContentType::Deletes => {
+                if entry.data_file.content != DataContentType::EqualityDeletes
+                    && entry.data_file.content != 
DataContentType::PositionDeletes
+                {
+                    return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Content type of entry {:?} should have 
DataContentType::EqualityDeletes or DataContentType::PositionDeletes", 
entry.data_file.content),
+                ));
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Add a new manifest entry. This method will update following status of 
the entry:
+    /// - Update the entry status to `Added`
+    /// - Set the snapshot id to the current snapshot id
+    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
+    /// - Set the file sequence number to `None`
+    pub fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_entry(&entry)?;
+        if entry.sequence_number().is_some_and(|n| n >= 0) {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = Some(self.snapshot_id);
+            entry.file_sequence_number = None;
+        } else {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = Some(self.snapshot_id);
+            entry.sequence_number = None;
+            entry.file_sequence_number = None;
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add a delete manifest entry. This method will update following status 
of the entry:
+    /// - Update the entry status to `Deleted`
+    /// - Set the snapshot id to the current snapshot id
+    pub fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_entry(&entry)?;
+        entry.status = ManifestStatus::Deleted;
+        entry.snapshot_id = Some(self.snapshot_id);
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add an existing manifest entry. This method will update following 
status of the entry:
+    /// - Update the entry status to `Existing`
+    pub fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> {

Review Comment:
   This incorrect, an existing entry requires user to provide snapshot id, data 
sequence number, which are all optional in `ManifestEntry.`.



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -210,38 +281,141 @@ impl ManifestWriter {
             deleted_rows: 0,
             min_seq_num: None,
             key_metadata,
-            partitions: vec![],
+            manifset_entries: Vec::new(),
+            metadata,
         }
     }
 
     fn construct_partition_summaries(
         &mut self,
         partition_type: &StructType,
     ) -> Result<Vec<FieldSummary>> {
-        let partitions = std::mem::take(&mut self.partitions);
         let mut field_stats: Vec<_> = partition_type
             .fields()
             .iter()
             .map(|f| 
PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
             .collect();
-        for partition in partitions {
-            for (literal, stat) in 
partition.into_iter().zip_eq(field_stats.iter_mut()) {
+        for partition in self.manifset_entries.iter().map(|e| 
&e.data_file.partition) {
+            for (literal, stat) in 
partition.iter().zip_eq(field_stats.iter_mut()) {
                 let primitive_literal = literal.map(|v| 
v.as_primitive_literal().unwrap());
                 stat.update(primitive_literal)?;
             }
         }
         Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
     }
 
-    /// Write a manifest.
-    pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
+    fn check_entry(&self, entry: &ManifestEntry) -> Result<()> {
+        match self.metadata.content {
+            ManifestContentType::Data => {
+                if entry.data_file.content != DataContentType::Data {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Content type of entry {:?} should have 
DataContentType::Data",
+                            entry.data_file.content
+                        ),
+                    ));
+                }
+            }
+            ManifestContentType::Deletes => {
+                if entry.data_file.content != DataContentType::EqualityDeletes
+                    && entry.data_file.content != 
DataContentType::PositionDeletes
+                {
+                    return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Content type of entry {:?} should have 
DataContentType::EqualityDeletes or DataContentType::PositionDeletes", 
entry.data_file.content),
+                ));
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Add a new manifest entry. This method will update following status of 
the entry:
+    /// - Update the entry status to `Added`
+    /// - Set the snapshot id to the current snapshot id
+    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
+    /// - Set the file sequence number to `None`
+    pub fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
+        self.check_entry(&entry)?;
+        if entry.sequence_number().is_some_and(|n| n >= 0) {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = Some(self.snapshot_id);
+            entry.file_sequence_number = None;
+        } else {
+            entry.status = ManifestStatus::Added;
+            entry.snapshot_id = Some(self.snapshot_id);
+            entry.sequence_number = None;
+            entry.file_sequence_number = None;
+        };
+        self.add_entry(entry)?;
+        Ok(())
+    }
+
+    /// Add a delete manifest entry. This method will update following status 
of the entry:
+    /// - Update the entry status to `Deleted`
+    /// - Set the snapshot id to the current snapshot id
+    pub fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {

Review Comment:
   This is also incorrect. The sequence number must be provided.



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -114,6 +114,70 @@ impl Manifest {
     }
 }
 
+/// The builder used to create a [`ManifestWriter`].
+pub struct ManifestWriterBuilder {
+    output: OutputFile,
+    snapshot_id: i64,
+    key_metadata: Vec<u8>,
+    schema: SchemaRef,
+    partition_spec: PartitionSpec,
+}
+
+impl ManifestWriterBuilder {
+    /// Create a new builder.
+    pub fn new(
+        output: OutputFile,
+        snapshot_id: i64,

Review Comment:
   This should be optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to