ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1855682480


##########
crates/iceberg/src/transaction.rs:
##########
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                parent_snapshot_id,
+                schema_id,
+                format_version,
+                partition_spec,
+                schema,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce
+            .parent_snapshot_id
+            .and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+        else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+            .cloned()
+            .collect())
+    }
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+    fn operation(&self) -> Operation;
+    #[allow(unused)]
+    fn delete_entries(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+    fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
+        manifests
+    }
+}
+
+trait ManifestProcess: Send + Sync {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile>;
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: Arc<BoundPartitionSpec>,
+    schema: Schema,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: Uuid,
+
+    snapshot_properties: HashMap<String, String>,
+    added_data_files: Vec<DataFile>,
+
+    // A counter used to generate unique manifest file names.
+    // It starts from 0 and increments for each new manifest file.
+    // Note: This counter is limited to the range of (0..u64::MAX).
+    manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: Arc<BoundPartitionSpec>,
+        schema: Schema,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+            snapshot_properties,
+            added_data_files: vec![],
+            manifest_counter: (0..),
+            partition_spec,
+            schema,
+            key_metadata,
+        })
+    }
+
+    // Check if the partition value is compatible with the partition type.
+    fn validate_partition_value(
+        partition_value: &Struct,
+        partition_type: &StructType,
+    ) -> Result<()> {
+        if partition_value.fields().len() != partition_type.fields().len() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Partition value is not compatitable with partition type",
+            ));
+        }
+        if partition_value
+            .fields()
+            .iter()
+            .zip(partition_type.fields())
+            .any(|(field_from_value, field_from_type)| {
+                !field_from_type
+                    .field_type
+                    .as_primitive_type()
+                    .unwrap()

Review Comment:
   As https://iceberg.apache.org/spec/#partitioning, the field in the partition 
must be a primitive type. The struct here from 
`self.tx.table.metadata().default_partition_spec().partition_type()`. I think 
we should guarantee this in 
https://github.com/apache/iceberg-rust/blob/6e0bcf56028e0d20d5ceeedf87dbb3db7c929ee3/crates/iceberg/src/spec/partition.rs#L486
 so that we can just sure the struct return by partition_type() is a valid 
partition type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to