Fokko commented on code in PR #349: URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1846645067
########## crates/iceberg/src/transaction.rs: ########## @@ -122,6 +189,387 @@ impl<'a> Transaction<'a> { } } +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + parent_snapshot_id: Option<i64>, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: Arc<BoundPartitionSpec>, + key_metadata: Vec<u8>, + commit_uuid: String, + snapshot_properties: HashMap<String, String>, + ) -> Result<Self> { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + parent_snapshot_id, + schema_id, + format_version, + partition_spec, + schema, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator<Item = DataFile>, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result<Transaction<'a>> { + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result<Vec<ManifestEntry>> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result<Vec<ManifestFile>> { + let Some(snapshot) = snapshot_produce + .parent_snapshot_id + .and_then(|id| snapshot_produce.tx.table.metadata().snapshot_by_id(id)) + else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| { + entry.has_added_files() + || entry.has_existing_files() + || entry.added_snapshot_id == snapshot_produce.snapshot_id Review Comment: Thanks for checking that, I would be in favor of leaving it out for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org