ZENOTME commented on code in PR #902:
URL: https://github.com/apache/iceberg-rust/pull/902#discussion_r1992906249


##########
crates/iceberg/src/transaction.rs:
##########
@@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync {
 struct DefaultManifestProcess;
 
 impl ManifestProcess for DefaultManifestProcess {
-    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
-        manifests
+    async fn process_manifeset<'a>(
+        &self,
+        _snapshot_producer: &mut SnapshotProduceAction<'a>,
+        manifests: Vec<ManifestFile>,
+    ) -> Result<Vec<ManifestFile>> {
+        Ok(manifests)
+    }
+}
+
+struct MergeManifsetProcess {
+    target_size_bytes: u32,
+    min_count_to_merge: u32,
+}
+
+impl MergeManifsetProcess {
+    pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self {
+        Self {
+            target_size_bytes,
+            min_count_to_merge,
+        }
+    }
+
+    fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, 
Vec<ManifestFile>> {
+        let mut grouped_manifests = BTreeMap::new();
+        for manifest in manifests {
+            grouped_manifests
+                .entry(manifest.partition_spec_id)
+                .or_insert_with(Vec::new)
+                .push(manifest);
+        }
+        grouped_manifests
+    }
+
+    async fn merge_bin(
+        &self,
+        snapshot_id: i64,
+        file_io: FileIO,
+        manifest_bin: Vec<ManifestFile>,
+        mut writer: ManifestWriter,
+    ) -> Result<ManifestFile> {
+        for manifset_file in manifest_bin {
+            let manifest_file = manifset_file.load_manifest(&file_io).await?;
+            for manifest_entry in manifest_file.entries() {
+                if manifest_entry.status() == ManifestStatus::Deleted
+                    && manifest_entry
+                        .snapshot_id()
+                        .is_some_and(|id| id == snapshot_id)
+                {
+                    //only files deleted by this snapshot should be added to 
the new manifest
+                    writer.add_delete_entry(manifest_entry.as_ref().clone())?;
+                } else if manifest_entry.status() == ManifestStatus::Added
+                    && manifest_entry
+                        .snapshot_id()
+                        .is_some_and(|id| id == snapshot_id)
+                {
+                    //added entries from this snapshot are still added, 
otherwise they should be existing
+                    writer.add_entry(manifest_entry.as_ref().clone())?;
+                } else if manifest_entry.status() != ManifestStatus::Deleted {
+                    // add all non-deleted files from the old manifest as 
existing files
+                    
writer.add_existing_entry(manifest_entry.as_ref().clone())?;
+                }
+            }
+        }
+
+        writer.write_manifest_file().await
+    }
+
+    async fn merge_group<'a>(
+        &self,
+        snapshot_produce: &mut SnapshotProduceAction<'a>,
+        first_manifest: &ManifestFile,
+        group_manifests: Vec<ManifestFile>,
+    ) -> Result<Vec<ManifestFile>> {
+        let packer: ListPacker<ManifestFile> = 
ListPacker::new(self.target_size_bytes);
+        let manifest_bins =
+            packer.pack(group_manifests, |manifest| manifest.manifest_length 
as u32);
+
+        let manifest_merge_futures = manifest_bins
+            .into_iter()
+            .map(|manifest_bin| {
+                if manifest_bin.len() == 1 {
+                    Ok(Box::pin(async { Ok(manifest_bin) })
+                        as Pin<
+                            Box<dyn Future<Output = Result<Vec<ManifestFile>>> 
+ Send>,
+                        >)
+                }
+                //  if the bin has the first manifest (the new data files or 
an appended manifest file) then only
+                //  merge it if the number of manifests is above the minimum 
count. this is applied only to bins
+                //  with an in-memory manifest so that large manifests don't 
prevent merging older groups.
+                else if manifest_bin
+                    .iter()
+                    .any(|manifest| manifest == first_manifest)
+                    && manifest_bin.len() < self.min_count_to_merge as usize
+                {
+                    Ok(Box::pin(async { Ok(manifest_bin) })
+                        as Pin<
+                            Box<dyn Future<Output = Result<Vec<ManifestFile>>> 
+ Send>,
+                        >)
+                } else {
+                    let writer = snapshot_produce.new_manifest_writer()?;
+                    let snapshot_id = snapshot_produce.snapshot_id;
+                    let file_io = snapshot_produce.tx.table.file_io().clone();
+                    Ok((Box::pin(async move {
+                        Ok(vec![
+                            self.merge_bin(
+                                snapshot_id,
+                                file_io,
+                                manifest_bin,
+                                writer,
+                            )
+                            .await?,
+                        ])
+                    }))
+                        as Pin<Box<dyn Future<Output = 
Result<Vec<ManifestFile>>> + Send>>)
+                }
+            })
+            .collect::<Result<Vec<Pin<Box<dyn Future<Output = 
Result<Vec<ManifestFile>>> + Send>>>>>()?;
+
+        let merged_bins: Vec<Vec<ManifestFile>> =
+            futures::future::join_all(manifest_merge_futures.into_iter())
+                .await
+                .into_iter()
+                .collect::<Result<Vec<_>>>()?;
+
+        Ok(merged_bins.into_iter().flatten().collect())
+    }
+
+    async fn merge_manifeset<'a>(
+        &self,
+        snapshot_produce: &mut SnapshotProduceAction<'a>,
+        manifests: Vec<ManifestFile>,
+    ) -> Result<Vec<ManifestFile>> {
+        if manifests.is_empty() {
+            return Ok(manifests);
+        }
+
+        let first_manifest = manifests[0].clone();

Review Comment:
   ```
   async fn merge_group<'a>(
           &self,
           snapshot_produce: &mut SnapshotProduceAction<'a>,
           first_manifest: &ManifestFile,
           group_manifests: Vec<ManifestFile>,
       )
   ```
   This function will take ownership of the manifests, so we have to clone the 
first one out. Looks like hard to avoid it. Welcome for any suggestion to avoid 
this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to