Fokko commented on code in PR #349: URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611283989
########## crates/iceberg/src/transaction.rs: ########## @@ -121,6 +190,313 @@ impl<'a> Transaction<'a> { } } +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { + tx: Transaction<'a>, + + parent_snapshot_id: Option<i64>, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: PartitionSpec, + key_metadata: Vec<u8>, + + commit_uuid: String, + manifest_id: i64, + + appended_data_files: Vec<DataFile>, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + parent_snapshot_id: Option<i64>, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: PartitionSpec, + key_metadata: Vec<u8>, + commit_uuid: String, + ) -> Result<Self> { + Ok(Self { + tx, + parent_snapshot_id, + snapshot_id, + schema, + schema_id, + format_version, + partition_spec, + key_metadata, + commit_uuid, + manifest_id: 0, + appended_data_files: vec![], + }) + } + + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable with partition type", + )); + } + if partition_value + .fields() + .iter() + .zip(partition_type.fields()) + .any(|(field, field_type)| !field_type.field_type.compatible(field)) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable partition type", + )); + } + Ok(()) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator<Item = DataFile>, + ) -> Result<&mut Self> { + let data_files: Vec<DataFile> = data_files.into_iter().collect(); + for data_file in &data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + Self::validate_partition_value( + data_file.partition(), + &self.partition_spec.partition_type(&self.schema)?, + )?; + } + self.appended_data_files.extend(data_files); + Ok(self) + } + + fn generate_manifest_file_path(&mut self) -> String { + let manifest_id = self.manifest_id; + self.manifest_id += 1; + format!( + "{}/{}/{}-m{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + &self.commit_uuid, + manifest_id, + DataFileFormat::Avro + ) + } + + async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> { + if let Some(snapshot) = self.tx.table.metadata().current_snapshot() { + let manifest_list = snapshot + .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref()) + .await?; + let mut manifest_files = Vec::with_capacity(manifest_list.entries().len()); + for entry in manifest_list.entries() { + // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921 + // Why we need this? + if entry.added_snapshot_id == self.snapshot_id { + continue; + } + let manifest = entry.load_manifest(self.tx.table.file_io()).await?; + // Skip manifest with all delete entries. + if manifest.entries().iter().all(|entry| !entry.is_alive()) { + continue; + } + manifest_files.push(entry.clone()); + } + Ok(manifest_files) + } else { + Ok(vec![]) + } + } + + // Write manifest file for added data files and return the ManifestFile for ManifestList. + async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> { + let appended_data_files = std::mem::take(&mut self.appended_data_files); + let manifest_entries = appended_data_files + .into_iter() + .map(|data_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file); + if self.format_version as u8 == 1u8 { + builder.snapshot_id(self.snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }) + .collect(); + let manifest_meta = ManifestMetadata::builder() + .schema(self.schema.clone()) + .schema_id(self.schema_id) + .format_version(self.format_version) + .partition_spec(self.partition_spec.clone()) + .content(crate::spec::ManifestContentType::Data) + .build(); + let manifest = Manifest::new(manifest_meta, manifest_entries); + let writer = ManifestWriter::new( + self.tx + .table + .file_io() + .new_output(self.generate_manifest_file_path())?, + self.snapshot_id, + self.key_metadata.clone(), + ); + writer.write(manifest).await + } + + // # TODO: + // Complete the summary. + fn summary(&self) -> Summary { + Summary { + operation: crate::spec::Operation::Append, + other: HashMap::new(), + } + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(mut self) -> Result<Transaction<'a>> { + let summary = self.summary(); + let manifest = self.manifest_for_data_file().await?; + let existing_manifest_files = self.manifest_from_parent_snapshot().await?; + + let snapshot_produce_action = SnapshotProduceAction::new( + self.tx, + self.snapshot_id, + self.parent_snapshot_id, + self.schema_id, + self.format_version, + self.commit_uuid, + )?; + + snapshot_produce_action + .apply( + vec![manifest] + .into_iter() + .chain(existing_manifest_files.into_iter()), + summary, + ) + .await + } +} + +struct SnapshotProduceAction<'a> { + tx: Transaction<'a>, + + parent_snapshot_id: Option<i64>, + snapshot_id: i64, + schema_id: i32, + format_version: FormatVersion, + + commit_uuid: String, +} + +impl<'a> SnapshotProduceAction<'a> { + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + parent_snapshot_id: Option<i64>, + schema_id: i32, + format_version: FormatVersion, + commit_uuid: String, + ) -> Result<Self> { + Ok(Self { + tx, + parent_snapshot_id, + snapshot_id, + schema_id, + format_version, + commit_uuid, + }) + } + + fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String { + format!( + "{}/{}/snap-{}-{}-{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.snapshot_id, + next_seq_num, + self.commit_uuid, + DataFileFormat::Avro + ) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply( + mut self, + manifest_files: impl IntoIterator<Item = ManifestFile>, + summary: Summary, + ) -> Result<Transaction<'a>> { + let next_seq_num = if self.format_version as u8 > 1u8 { Review Comment: This is incorrect. Here we differentiate between V1 and V2 tables and on line 452 we create a V2 writer. In the case of a V1 table, we always would write 0 as the Sequence number 😱 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org