liurenjie1024 commented on code in PR #349: URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1865150243
########## crates/iceberg/src/transaction.rs: ########## @@ -122,6 +172,365 @@ impl<'a> Transaction<'a> { } } +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec<u8>, + snapshot_properties: HashMap<String, String>, + ) -> Result<Self> { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator<Item = DataFile>, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result<Transaction<'a>> { + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result<Vec<ManifestEntry>> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result<Vec<ManifestFile>> { + let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()) + } +} + +trait SnapshotProduceOperation: Send + Sync { + fn operation(&self) -> Operation; + #[allow(unused)] + fn delete_entries( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send; + fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send; +} + +struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess { + fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { + manifests + } +} + +trait ManifestProcess: Send + Sync { + fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>; +} + +struct SnapshotProduceAction<'a> { Review Comment: Usually an action is a transaction action, I think naming it `SnapshotProducer` would be enough? ########## crates/iceberg/src/transaction.rs: ########## @@ -122,6 +172,365 @@ impl<'a> Transaction<'a> { } } +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { Review Comment: It would be better to move this part to a standalone module under transaction module. ########## crates/iceberg/src/transaction.rs: ########## @@ -122,6 +172,365 @@ impl<'a> Transaction<'a> { } } +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec<u8>, + snapshot_properties: HashMap<String, String>, + ) -> Result<Self> { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator<Item = DataFile>, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result<Transaction<'a>> { + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result<Vec<ManifestEntry>> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result<Vec<ManifestFile>> { + let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()) + } +} + +trait SnapshotProduceOperation: Send + Sync { + fn operation(&self) -> Operation; + #[allow(unused)] + fn delete_entries( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send; + fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send; +} + +struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess { + fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { + manifests + } +} + +trait ManifestProcess: Send + Sync { + fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>; +} + +struct SnapshotProduceAction<'a> { + tx: Transaction<'a>, + snapshot_id: i64, + key_metadata: Vec<u8>, + commit_uuid: Uuid, + snapshot_properties: HashMap<String, String>, + added_data_files: Vec<DataFile>, + // A counter used to generate unique manifest file names. + // It starts from 0 and increments for each new manifest file. + // Note: This counter is limited to the range of (0..u64::MAX). + manifest_counter: RangeFrom<u64>, +} + +impl<'a> SnapshotProduceAction<'a> { + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + key_metadata: Vec<u8>, + commit_uuid: Uuid, + snapshot_properties: HashMap<String, String>, + ) -> Result<Self> { + Ok(Self { + tx, + snapshot_id, + commit_uuid, + snapshot_properties, + added_data_files: vec![], + manifest_counter: (0..), + key_metadata, + }) + } + + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable with partition type", + )); + } + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { + if !field + .field_type + .as_primitive_type() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition field should only be primitive type.", + ) + })? + .compatible(&value.as_primitive_literal().unwrap()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable partition type", + )); + } + } + Ok(()) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator<Item = DataFile>, + ) -> Result<&mut Self> { + let data_files: Vec<DataFile> = data_files.into_iter().collect(); + for data_file in &data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + Self::validate_partition_value( + data_file.partition(), + self.tx + .table + .metadata() + .default_partition_spec() + .partition_type(), + )?; + } + self.added_data_files.extend(data_files); + Ok(self) + } + + fn new_manifest_output(&mut self) -> Result<OutputFile> { + let new_manifest_path = format!( + "{}/{}/{}-m{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.commit_uuid, + self.manifest_counter.next().unwrap(), + DataFileFormat::Avro + ); + self.tx.table.file_io().new_output(new_manifest_path) + } + + // Write manifest file for added data files and return the ManifestFile for ManifestList. + async fn write_added_manifest(&mut self) -> Result<ManifestFile> { + let added_data_files = std::mem::take(&mut self.added_data_files); + let manifest_entries = added_data_files + .into_iter() + .map(|data_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file); + if self.tx.table.metadata().format_version() == FormatVersion::V1 { + builder.snapshot_id(self.snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }) + .collect(); + let schema = self.tx.table.metadata().current_schema(); + let manifest_meta = ManifestMetadata::builder() + .schema(schema.clone()) + .schema_id(schema.schema_id()) + .format_version(self.tx.table.metadata().format_version()) + .partition_spec( + self.tx + .table + .metadata() + .default_partition_spec() + .as_ref() + .clone(), + ) + .content(crate::spec::ManifestContentType::Data) + .build(); + let manifest = Manifest::new(manifest_meta, manifest_entries); + let writer = ManifestWriter::new( + self.new_manifest_output()?, + self.snapshot_id, + self.key_metadata.clone(), + ); + writer.write(manifest).await + } + + async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>( + &mut self, + snapshot_produce_operation: &OP, + manifest_process: &MP, + ) -> Result<Vec<ManifestFile>> { + let added_manifest = self.write_added_manifest().await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + // # TODO + // Support process delete entries. + + let mut manifest_files = vec![added_manifest]; + manifest_files.extend(existing_manifests); + let manifest_files = manifest_process.process_manifeset(manifest_files); + Ok(manifest_files) + } + + // # TODO + // Fulfill this function + fn summary<OP: SnapshotProduceOperation>(&self, snapshot_produce_operation: &OP) -> Summary { + Summary { Review Comment: I think we need snapshot summary builder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org