jonathanc-n commented on code in PR #902: URL: https://github.com/apache/iceberg-rust/pull/902#discussion_r1955391542
########## crates/integration_tests/tests/merge_append_test.rs: ########## @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use iceberg::spec::{ + DataFile, ManifestEntry, ManifestStatus, NestedField, PrimitiveType, Schema, Type, +}; +use iceberg::table::Table; +use iceberg::transaction::{Transaction, MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_integration_tests::set_test_fixture; +use parquet::file::properties::WriterProperties; + +async fn write_new_data_file(table: &Table) -> Vec<DataFile> { + let schema: Arc<arrow_schema::Schema> = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + data_file_writer.close().await.unwrap() +} + +#[tokio::test] +async fn test_append_data_file() { + let fixture = set_test_fixture("test_create_table").await; + + // Create table + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Enable merge append for table + let tx = Transaction::new(&table); + table = tx + .set_properties(HashMap::from([ Review Comment: Should we also try adding a test here for the `MANIFEST_TARGET_SIZE_BYTES` property? ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + first_manifest: &ManifestFile, + group_manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer()?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.tx.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>) + } + }) + .collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?; + + let merged_bins: Vec<Vec<ManifestFile>> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::<Result<Vec<_>>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} + +impl ManifestProcess for MergeManifsetProcess { + async fn process_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + self.merge_manifeset(snapshot_produce, manifests).await Review Comment: ```suggestion async fn process_manifest<'a>( &self, snapshot_produce: &mut SnapshotProduceAction<'a>, manifests: Vec<ManifestFile>, ) -> Result<Vec<ManifestFile>> { self.merge_manifest(snapshot_produce, manifests).await ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { Review Comment: ```suggestion struct MergeManifestProcess { target_size_bytes: u32, min_count_to_merge: u32, } impl MergeManifestProcess { ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + first_manifest: &ManifestFile, + group_manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer()?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.tx.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>) + } + }) + .collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?; + + let merged_bins: Vec<Vec<ManifestFile>> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::<Result<Vec<_>>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} + +impl ManifestProcess for MergeManifsetProcess { Review Comment: ```suggestion impl ManifestProcess for MergeManifestProcess { ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -213,6 +242,86 @@ impl<'a> FastAppendAction<'a> { } } +/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests +/// based on the target size. +pub struct MergeAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, + target_size_bytes: u32, + min_count_to_merge: u32, + merge_enabled: bool, +} + +impl<'a> MergeAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec<u8>, + snapshot_properties: HashMap<String, String>, + ) -> Result<Self> { + let target_size_bytes: u32 = tx + .table + .metadata() + .properties() + .get(MANIFEST_TARGET_SIZE_BYTES) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT); + let min_count_to_merge: u32 = tx + .table + .metadata() + .properties() + .get(MANIFEST_MIN_MERGE_COUNT) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT); + let merge_enabled = tx + .table + .metadata() + .properties() + .get(MANIFEST_MERGE_ENABLED) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT); + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + target_size_bytes, + min_count_to_merge, + merge_enabled, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator<Item = DataFile>, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result<Transaction<'a>> { + if self.merge_enabled { + let process = MergeManifsetProcess { Review Comment: ```suggestion let process = MergeManifestProcess { ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -427,8 +696,9 @@ impl<'a> SnapshotProduceAction<'a> { let mut manifest_files = vec![added_manifest]; manifest_files.extend(existing_manifests); - let manifest_files = manifest_process.process_manifeset(manifest_files); - Ok(manifest_files) + manifest_process + .process_manifeset(self, manifest_files) Review Comment: ```suggestion .process_manifest(self, manifest_files) ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + first_manifest: &ManifestFile, + group_manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer()?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.tx.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>) + } + }) + .collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?; + + let merged_bins: Vec<Vec<ManifestFile>> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::<Result<Vec<_>>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} + +impl ManifestProcess for MergeManifsetProcess { + async fn process_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + self.merge_manifeset(snapshot_produce, manifests).await } } trait ManifestProcess: Send + Sync { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>; + fn process_manifeset<'a>( Review Comment: ```suggestion fn process_manifest<'a>( ``` ########## crates/iceberg/src/utils.rs: ########## @@ -40,3 +40,148 @@ pub(crate) fn available_parallelism() -> NonZeroUsize { NonZeroUsize::new(DEFAULT_PARALLELISM).unwrap() }) } + +pub mod bin { Review Comment: This implementation feels a bit slow if stressed, we can enhance it in a follow up pr (ex. adding the lookback window similar to the java implementation). What are your thoughts @ZENOTME? ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; Review Comment: ```suggestion for manifest_file in manifest_bin { let manifest_file = manifest_file.load_manifest(&file_io).await?; ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( Review Comment: ```suggestion async fn process_manifest<'a>( ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + first_manifest: &ManifestFile, + group_manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer()?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.tx.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>) + } + }) + .collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?; + + let merged_bins: Vec<Vec<ManifestFile>> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::<Result<Vec<_>>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); Review Comment: Would this be an expensive clone (not wrapped in arc)? is there a way to avoid? ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + first_manifest: &ManifestFile, + group_manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer()?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.tx.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>) + } + }) + .collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?; + + let merged_bins: Vec<Vec<ManifestFile>> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::<Result<Vec<_>>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifeset<'a>( Review Comment: ```suggestion async fn merge_manifest<'a>( ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -267,13 +376,174 @@ trait SnapshotProduceOperation: Send + Sync { struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { - manifests + async fn process_manifeset<'a>( + &self, + _snapshot_producer: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + Ok(manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifsetProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } + + fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec<ManifestFile>, + mut writer: ManifestWriter, + ) -> Result<ManifestFile> { + for manifset_file in manifest_bin { + let manifest_file = manifset_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + first_manifest: &ManifestFile, + group_manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer()?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.tx.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>) + } + }) + .collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?; + + let merged_bins: Vec<Vec<ManifestFile>> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::<Result<Vec<_>>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifeset<'a>( + &self, + snapshot_produce: &mut SnapshotProduceAction<'a>, + manifests: Vec<ManifestFile>, + ) -> Result<Vec<ManifestFile>> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} + +impl ManifestProcess for MergeManifsetProcess { + async fn process_manifeset<'a>( Review Comment: ```suggestion async fn process_manifest<'a>( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org