ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580444775


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> 
{
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = 
self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = 
self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),
+            next_seq_num,
+        );
+        manifest_list_writer.add_manifests(manifest_files.into_iter())?;
+        manifest_list_writer.close().await?;
+
+        let new_snapshot = Snapshot::builder()
+            .with_manifest_list(manifest_list_path)
+            .with_snapshot_id(self.snapshot_id)
+            .with_parent_snapshot_id(self.parent_snapshot_id)
+            .with_sequence_number(next_seq_num)
+            .with_summary(summary)
+            .with_schema_id(self.schema_id)
+            .with_timestamp_ms(commit_ts)
+            .build();
+
+        let new_snapshot_id = new_snapshot.snapshot_id();
+        self.tx.append_updates(vec![
+            TableUpdate::AddSnapshot {
+                snapshot: new_snapshot,
+            },
+            TableUpdate::SetSnapshotRef {
+                ref_name: MAIN_BRANCH.to_string(),
+                reference: SnapshotReference::new(
+                    new_snapshot_id,
+                    SnapshotRetention::branch(None, None, None),
+                ),
+            },
+        ])?;
+        self.tx.append_requirements(vec![
+            TableRequirement::UuidMatch {
+                uuid: self.tx.table.metadata().uuid(),
+            },
+            TableRequirement::RefSnapshotIdMatch {
+                r#ref: MAIN_BRANCH.to_string(),
+                snapshot_id: self.parent_snapshot_id,

Review Comment:
   > This will work for now but might get problematic later on. Just a heads up.
   
   I'm not sure whether my understanding of "problematic"  is correct. Did you 
mean that this problem is that if we apply two append actions, like
   ```
   1: TableRequirement::UuidMatch
       TableRequirement::RefSnapshotIdMatch (parent_snapshot_id)
   2. TableRequirement::UuidMatch
      TableRequirement::RefSnapshotIdMatch (parent_snapshot_id)
   ```
   The requirement of last one will fail, because the previous update will 
change snapshot id so that the last validation will fail. So when we apply, we 
should return a tx can reflect the update of snapshot so that we can stack the 
update together.🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to