xanderbailey commented on code in PR #2367:
URL: https://github.com/apache/iceberg-rust/pull/2367#discussion_r3199947397
##########
crates/iceberg/src/transaction/append.rs:
##########
@@ -84,13 +84,13 @@ impl FastAppendAction {
#[async_trait]
impl TransactionAction for FastAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
- let snapshot_producer = SnapshotProducer::new(
- table,
- self.commit_uuid.unwrap_or_else(Uuid::now_v7),
- self.key_metadata.clone(),
- self.snapshot_properties.clone(),
- self.added_data_files.clone(),
- );
+ let snapshot_producer = SnapshotProducer::builder()
+ .with_table(table)
+ .with_commit_uuid(self.commit_uuid.unwrap_or_else(Uuid::now_v7))
+ .with_key_metadata(self.key_metadata.clone())
+ .with_snapshot_properties(self.snapshot_properties.clone())
+ .with_added_data_files(self.added_data_files.clone())
+ .build();
Review Comment:
Nice change
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -240,8 +242,12 @@ impl<'a> SnapshotProducer<'a> {
self.table.metadata().current_schema().clone(),
self.table
.metadata()
- .default_partition_spec()
- .as_ref()
+ .partition_spec_by_id(spec_id)
+ .ok_or(Error::new(
Review Comment:
`ok_or` will allocate the error on the success path, `ok_or_else` might be
better?
##########
crates/iceberg/src/transaction/append.rs:
##########
@@ -122,7 +122,7 @@ impl SnapshotProduceOperation for FastAppendOperation {
async fn existing_manifest(
&self,
- snapshot_produce: &SnapshotProducer<'_>,
+ snapshot_produce: &mut SnapshotProducer<'_>,
Review Comment:
Does this need to be mut?
##########
crates/iceberg/src/spec/snapshot.rs:
##########
@@ -36,7 +36,7 @@ pub const MAIN_BRANCH: &str = "main";
/// Reference to [`Snapshot`].
pub type SnapshotRef = Arc<Snapshot>;
-#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
Review Comment:
Why does this need to be `Hash` now?
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
Review Comment:
Do we need to include these changes in the summary here?
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -312,13 +325,69 @@ impl<'a> SnapshotProducer<'a> {
builder.build()
}
});
- let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
+ let mut writer = self.new_manifest_writer(
+ content_type,
+ self.table.metadata().default_partition_spec_id(),
+ )?;
for entry in manifest_entries {
writer.add_entry(entry)?;
}
writer.write_manifest_file().await
}
+ async fn write_deleted_manifest(
+ &mut self,
+ deleted_entries: Vec<ManifestEntry>,
+ ) -> Result<Vec<ManifestFile>> {
+ if deleted_entries.is_empty() {
+ Ok(Vec::new())
+ } else {
Review Comment:
nit: can we early return here to avoid the nesting?
```suggestion
if deleted_entries.is_empty() {
return Ok(Vec::new())
}
```
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -289,9 +295,16 @@ impl<'a> SnapshotProducer<'a> {
}
// Write manifest file for added data files and return the ManifestFile
for ManifestList.
- async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
- let added_data_files = std::mem::take(&mut self.added_data_files);
- if added_data_files.is_empty() {
+ async fn write_added_manifest(
+ &mut self,
+ content_type: ManifestContentType,
+ ) -> Result<ManifestFile> {
+ let added_files = match content_type {
+ ManifestContentType::Data => std::mem::take(&mut
self.added_data_files),
+ ManifestContentType::Deletes => std::mem::take(&mut
self.added_delete_files),
+ };
+
+ if added_files.is_empty() {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"No added data files found when write an added manifest file",
Review Comment:
Does this need to be updated to added data / delete files?
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -107,41 +107,39 @@ pub(crate) trait ManifestProcess: Send + Sync {
) -> Vec<ManifestFile>;
}
+#[derive(TypedBuilder)]
+#[builder(field_defaults(setter(prefix = "with_")))]
pub(crate) struct SnapshotProducer<'a> {
pub(crate) table: &'a Table,
+ #[builder(
+ setter(skip),
+ default_code = "SnapshotProducer::generate_unique_snapshot_id(table)"
+ )]
snapshot_id: i64,
commit_uuid: Uuid,
+ #[builder(default)]
key_metadata: Option<Vec<u8>>,
+ #[builder(default)]
snapshot_properties: HashMap<String, String>,
+ #[builder(default)]
added_data_files: Vec<DataFile>,
+ #[builder(default)]
+ added_delete_files: Vec<DataFile>,
+ #[builder(default)]
+ removed_data_files: Vec<DataFile>,
+ #[builder(default)]
+ removed_delete_files: Vec<DataFile>,
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
+ #[builder(setter(skip), default_code = "(0..)")]
manifest_counter: RangeFrom<u64>,
}
impl<'a> SnapshotProducer<'a> {
- pub(crate) fn new(
- table: &'a Table,
- commit_uuid: Uuid,
- key_metadata: Option<Vec<u8>>,
- snapshot_properties: HashMap<String, String>,
- added_data_files: Vec<DataFile>,
- ) -> Self {
- Self {
- table,
- snapshot_id: Self::generate_unique_snapshot_id(table),
- commit_uuid,
- key_metadata,
- snapshot_properties,
- added_data_files,
- manifest_counter: (0..),
- }
- }
-
pub(crate) fn validate_added_data_files(&self) -> Result<()> {
Review Comment:
Java has a `validateDeleteFileForVersion` which might be nice to have the
same?
https://github.com/apache/iceberg/blob/0f657edf12dc29f8487a679bfdd4210e9588d014/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L295-L316
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]