c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1852590840


##########
crates/iceberg/src/transaction.rs:
##########
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
         Ok(self)
     }
 
+    fn generate_unique_snapshot_id(&self) -> i64 {
+        let generate_random_id = || -> i64 {
+            let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+            let snapshot_id = (lhs ^ rhs) as i64;
+            if snapshot_id < 0 {
+                -snapshot_id
+            } else {
+                snapshot_id
+            }
+        };
+        let mut snapshot_id = generate_random_id();
+        while self
+            .table
+            .metadata()
+            .snapshots()
+            .any(|s| s.snapshot_id() == snapshot_id)
+        {
+            snapshot_id = generate_random_id();
+        }
+        snapshot_id
+    }
+
+    /// Creates a fast append action.
+    pub fn fast_append(
+        self,
+        commit_uuid: Option<Uuid>,
+        key_metadata: Vec<u8>,
+    ) -> Result<FastAppendAction<'a>> {
+        let parent_snapshot_id = self
+            .table
+            .metadata()
+            .current_snapshot()
+            .map(|s| s.snapshot_id());
+        let snapshot_id = self.generate_unique_snapshot_id();
+        let schema = self.table.metadata().current_schema().as_ref().clone();
+        let schema_id = schema.schema_id();
+        let format_version = self.table.metadata().format_version();
+        let partition_spec = 
self.table.metadata().default_partition_spec().clone();

Review Comment:
   Do we have an advantage of extracting `parent_snapshot_id`, `schema`, 
`schema_id`, `format_version` and `partition_spec` here? The `tx` is passed 
owned to the FastAppendAction and in turn owns `table`. So we have a bit of a 
redundancy. I think I would just calculate those values when needed instead of 
duplicating them eagerly. This would make the `SnapshotProduceAction` a bit 
slimmer.



##########
crates/e2e_test/Cargo.toml:
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-e2e_test"

Review Comment:
   should we mix "-" and "_" here?
   As our main crate is "iceberg-catalog-rest", I would have opted for 
"iceberg-e2e_test".



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,

Review Comment:
   And we have it again in `tx.table.schma` - see also my comment in the 
`fast_append` method



##########
crates/iceberg/src/spec/manifest_list.rs:
##########
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
 
 impl ManifestListWriter {
     /// Construct a v1 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
-    pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
i64) -> Self {
-        let metadata = HashMap::from_iter([
+    pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
Option<i64>) -> Self {
+        let mut metadata = HashMap::from_iter([
             ("snapshot-id".to_string(), snapshot_id.to_string()),
-            (
-                "parent-snapshot-id".to_string(),
-                parent_snapshot_id.to_string(),
-            ),
             ("format-version".to_string(), "1".to_string()),
         ]);
+        if let Some(parent_snapshot_id) = parent_snapshot_id {
+            metadata.insert(
+                "parent-snapshot-id".to_string(),
+                parent_snapshot_id.to_string(),
+            );
+        }
         Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
     }
 
     /// Construct a v2 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
     pub fn v2(
         output_file: OutputFile,
         snapshot_id: i64,
-        parent_snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
         sequence_number: i64,
     ) -> Self {
-        let metadata = HashMap::from_iter([
+        let mut metadata = HashMap::from_iter([
             ("snapshot-id".to_string(), snapshot_id.to_string()),
-            (
-                "parent-snapshot-id".to_string(),
-                parent_snapshot_id.to_string(),
-            ),
             ("sequence-number".to_string(), sequence_number.to_string()),
             ("format-version".to_string(), "2".to_string()),
         ]);
+        if let Some(parent_snapshot_id) = parent_snapshot_id {

Review Comment:
   I think this is the correct way to do it.
   
   I checked the other implementations:
   * @Fokko in pyiceberg, if `parent_snapshot_id` is `None`, the header of the 
Avro file contains `parent-snapshot-idNone` - I think it should be at least 
`null`. We might not want to include it at all. Problem is here: 
https://github.com/apache/iceberg-python/blob/7a83695330518bea0dee589b5b513297c4d59b66/pyiceberg/manifest.py#L979
   * I think Java writes `null`, but I couldn't get the test to run. Comes from 
here: 
https://github.com/apache/iceberg/blob/c448a4b87007ae8aad57456e19d32db45de3a4f6/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L81
   
   Maybe worth adapting the other implementations or settle on a specific value 
(Json null?) for no snapshot id?



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                parent_snapshot_id,
+                schema_id,
+                format_version,
+                partition_spec,
+                schema,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce
+            .parent_snapshot_id
+            .and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+        else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+            .cloned()
+            .collect())
+    }
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+    fn operation(&self) -> Operation;
+    #[allow(unused)]
+    fn delete_entries(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+    fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
+        manifests
+    }
+}
+
+trait ManifestProcess: Send + Sync {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile>;
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: Arc<BoundPartitionSpec>,
+    schema: Schema,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: Uuid,
+
+    snapshot_properties: HashMap<String, String>,
+    added_data_files: Vec<DataFile>,
+
+    // A counter used to generate unique manifest file names.
+    // It starts from 0 and increments for each new manifest file.
+    // Note: This counter is limited to the range of (0..u64::MAX).
+    manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        schema: Schema,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+            snapshot_properties,
+            added_data_files: vec![],
+            manifest_counter: (0..),
+            partition_spec,
+            schema,
+            key_metadata,
+        })
+    }
+
+    // Check if the partition value is compatible with the partition type.
+    fn validate_partition_value(

Review Comment:
   Should this be a method of `Struct` or `StructType`? I think a test would 
also be good.



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,

Review Comment:
   We have a redundancy here - schema already brings `schema_id`. Maybe we 
could remove `schema_id` and get the id from the object?



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                parent_snapshot_id,
+                schema_id,
+                format_version,
+                partition_spec,
+                schema,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce
+            .parent_snapshot_id
+            .and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+        else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+            .cloned()
+            .collect())
+    }
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+    fn operation(&self) -> Operation;
+    #[allow(unused)]
+    fn delete_entries(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+    fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
+        manifests
+    }
+}
+
+trait ManifestProcess: Send + Sync {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile>;
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: Arc<BoundPartitionSpec>,
+    schema: Schema,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: Uuid,
+
+    snapshot_properties: HashMap<String, String>,
+    added_data_files: Vec<DataFile>,
+
+    // A counter used to generate unique manifest file names.
+    // It starts from 0 and increments for each new manifest file.
+    // Note: This counter is limited to the range of (0..u64::MAX).
+    manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        schema: Schema,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+            snapshot_properties,
+            added_data_files: vec![],
+            manifest_counter: (0..),
+            partition_spec,
+            schema,
+            key_metadata,
+        })
+    }
+
+    // Check if the partition value is compatible with the partition type.
+    fn validate_partition_value(
+        partition_value: &Struct,
+        partition_type: &StructType,
+    ) -> Result<()> {
+        if partition_value.fields().len() != partition_type.fields().len() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Partition value is not compatitable with partition type",
+            ));
+        }
+        if partition_value
+            .fields()
+            .iter()
+            .zip(partition_type.fields())
+            .any(|(field_from_value, field_from_type)| {
+                !field_from_type
+                    .field_type
+                    .as_primitive_type()
+                    .unwrap()
+                    
.compatible(&field_from_value.as_primitive_literal().unwrap())
+            })
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Partition value is not compatitable partition type",
+            ));
+        }
+        Ok(())
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        let data_files: Vec<DataFile> = data_files.into_iter().collect();
+        for data_file in &data_files {
+            if data_file.content_type() != crate::spec::DataContentType::Data {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Only data content type is allowed for fast append",
+                ));
+            }
+            Self::validate_partition_value(
+                data_file.partition(),
+                self.partition_spec.partition_type(),
+            )?;
+        }
+        self.added_data_files.extend(data_files);
+        Ok(self)
+    }
+
+    fn new_manifest_output(&mut self) -> Result<OutputFile> {
+        let new_manifest_path = format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.commit_uuid,
+            self.manifest_counter.next().unwrap(),
+            DataFileFormat::Avro
+        );
+        self.tx.table.file_io().new_output(new_manifest_path)
+    }
+
+    // Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+    async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
+        let added_data_files = std::mem::take(&mut self.added_data_files);
+        let manifest_entries = added_data_files
+            .into_iter()
+            .map(|data_file| {
+                let builder = ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .data_file(data_file);
+                if self.format_version as u8 == 1u8 {
+                    builder.snapshot_id(self.snapshot_id).build()
+                } else {
+                    // For format version > 1, we set the snapshot id at the 
inherited time to avoid rewrite the manifest file when
+                    // commit failed.
+                    builder.build()
+                }
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone().into())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.as_ref().clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.new_manifest_output()?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+        &mut self,
+        snapshot_produce_operation: &OP,
+        manifest_process: &MP,
+    ) -> Result<Vec<ManifestFile>> {
+        let added_manifest = self.write_added_manifest().await?;
+        let existing_manifests = 
snapshot_produce_operation.existing_manifest(self).await?;
+
+        let mut manifest_files = vec![added_manifest];
+        manifest_files.extend(existing_manifests);

Review Comment:
   I am not 100% what the unused `delete_entries` is for - I would presume to 
remove old manifests from the existing manifests?
   If so, I think we should add this logic here. Seems like an easy place to 
miss.



##########
crates/iceberg/src/transaction.rs:
##########
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
         Ok(self)
     }
 
+    fn generate_unique_snapshot_id(&self) -> i64 {
+        let generate_random_id = || -> i64 {
+            let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+            let snapshot_id = (lhs ^ rhs) as i64;
+            if snapshot_id < 0 {
+                -snapshot_id
+            } else {
+                snapshot_id
+            }
+        };
+        let mut snapshot_id = generate_random_id();
+        while self
+            .table
+            .metadata()
+            .snapshots()
+            .any(|s| s.snapshot_id() == snapshot_id)
+        {
+            snapshot_id = generate_random_id();
+        }
+        snapshot_id
+    }
+
+    /// Creates a fast append action.
+    pub fn fast_append(
+        self,
+        commit_uuid: Option<Uuid>,
+        key_metadata: Vec<u8>,
+    ) -> Result<FastAppendAction<'a>> {
+        let parent_snapshot_id = self
+            .table
+            .metadata()
+            .current_snapshot()
+            .map(|s| s.snapshot_id());
+        let snapshot_id = self.generate_unique_snapshot_id();
+        let schema = self.table.metadata().current_schema().as_ref().clone();
+        let schema_id = schema.schema_id();
+        let format_version = self.table.metadata().format_version();
+        let partition_spec = 
self.table.metadata().default_partition_spec().clone();
+        let commit_uuid = commit_uuid.unwrap_or_else(Uuid::new_v4);

Review Comment:
   I know we use `new_v4` at most places.
   However, `now_v7` is much better suited to be stored in databases as it 
produces predictably distributed ids.
   Would switching to `now_v7` be an option for you? @liurenjie1024  any 
objections?
   ```suggestion
           let commit_uuid = commit_uuid.unwrap_or_else(Uuid::now_v7);
   ```



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                parent_snapshot_id,
+                schema_id,
+                format_version,
+                partition_spec,
+                schema,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce
+            .parent_snapshot_id
+            .and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+        else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+            .cloned()
+            .collect())
+    }
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+    fn operation(&self) -> Operation;
+    #[allow(unused)]
+    fn delete_entries(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+    fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
+        manifests
+    }
+}
+
+trait ManifestProcess: Send + Sync {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile>;
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: Arc<BoundPartitionSpec>,
+    schema: Schema,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: Uuid,
+
+    snapshot_properties: HashMap<String, String>,
+    added_data_files: Vec<DataFile>,
+
+    // A counter used to generate unique manifest file names.
+    // It starts from 0 and increments for each new manifest file.
+    // Note: This counter is limited to the range of (0..u64::MAX).
+    manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        schema: Schema,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+            snapshot_properties,
+            added_data_files: vec![],
+            manifest_counter: (0..),
+            partition_spec,
+            schema,
+            key_metadata,
+        })
+    }
+
+    // Check if the partition value is compatible with the partition type.
+    fn validate_partition_value(
+        partition_value: &Struct,
+        partition_type: &StructType,
+    ) -> Result<()> {
+        if partition_value.fields().len() != partition_type.fields().len() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Partition value is not compatitable with partition type",
+            ));
+        }
+        if partition_value
+            .fields()
+            .iter()
+            .zip(partition_type.fields())
+            .any(|(field_from_value, field_from_type)| {
+                !field_from_type
+                    .field_type
+                    .as_primitive_type()
+                    .unwrap()

Review Comment:
   Are we sure this never fails? Should we return an error instead?



##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,

Review Comment:
   ```suggestion
           partition_spec: BoundPartitionSpecRef,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to