ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1844241268


##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,387 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                parent_snapshot_id,
+                schema_id,
+                format_version,
+                partition_spec,
+                schema,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce
+            .parent_snapshot_id
+            .and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+        else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| {
+                entry.has_added_files()
+                    || entry.has_existing_files()
+                    || entry.added_snapshot_id == snapshot_produce.snapshot_id

Review Comment:
   For this check, explanation comes from @Fokko 
   
   > We've copied this from the Java code: 
[https://github.com/apache/iceberg/blob/307593ffd99752b2d62cc91f4928285fc0c62b75/co[…]e/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java](https://github.com/apache/iceberg/blob/307593ffd99752b2d62cc91f4928285fc0c62b75/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L934-L939)
   > I don't think this is used in PyIceberg today. In Java it is possible to 
add existing manifests to a commit, while in PyIceberg we only accept 
datafiles. This is on purpose since the ability to add manifests also comes 
with problems of its own.
   
   I think this check is also useless for iceberg-rust right now. To avoid 
confusion, maybe we should:
   1. Add a comment to explain it 
   2. Or discard it now and add it back when we support adding existing 
manifests like java.



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,387 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                parent_snapshot_id,
+                schema_id,
+                format_version,
+                partition_spec,
+                schema,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce
+            .parent_snapshot_id
+            .and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+        else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| {
+                entry.has_added_files()
+                    || entry.has_existing_files()
+                    || entry.added_snapshot_id == snapshot_produce.snapshot_id

Review Comment:
   For this check, explanation comes from @Fokko 
   
   > We've copied this from the Java code: 
[https://github.com/apache/iceberg/blob/307593ffd99752b2d62cc91f4928285fc0c62b75/co[…]e/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java](https://github.com/apache/iceberg/blob/307593ffd99752b2d62cc91f4928285fc0c62b75/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L934-L939)
   > I don't think this is used in PyIceberg today. In Java it is possible to 
add existing manifests to a commit, while in PyIceberg we only accept 
datafiles. This is on purpose since the ability to add manifests also comes 
with problems of its own.
   
   I think this check is also useless for iceberg-rust now. To avoid confusion, 
maybe we should:
   1. Add a comment to explain it 
   2. Or discard it now and add it back when we support adding existing 
manifests like java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to