liurenjie1024 commented on code in PR #960: URL: https://github.com/apache/iceberg-rust/pull/960#discussion_r1970839596
########## crates/iceberg/src/transaction.rs: ########## @@ -169,6 +176,48 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + async fn parquet_files_to_data_files( + &self, + file_io: &FileIO, + file_paths: Vec<String>, + table_metadata: &TableMetadata, + ) -> Result<Vec<DataFile>> { + // TODO: support adding to partitioned table + let mut data_files: Vec<DataFile> = Vec::new(); + + if !table_metadata.default_spec.fields().is_empty() { Review Comment: We should use [this method](https://github.com/apache/iceberg-rust/blob/821f8dda3c4ebb635994e6acb8ed7914452f235f/crates/iceberg/src/spec/partition.rs#L99) here. ########## crates/iceberg/src/transaction.rs: ########## @@ -18,27 +18,34 @@ //! This module contains transaction api. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; +use arrow_array::StringArray; +use futures::TryStreamExt; +use parquet::arrow::async_reader::AsyncFileReader; use uuid::Uuid; +use crate::arrow::ArrowFileReader; use crate::error::Result; -use crate::io::OutputFile; +use crate::io::{FileIO, OutputFile}; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, + SortDirection, SortField, SortOrder, Struct, StructType, Summary, TableMetadata, Transform, + MAIN_BRANCH, }; use crate::table::Table; +use crate::writer::file_writer::ParquetWriter; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; /// Table transaction. +#[derive(Clone)] Review Comment: Why we need this? ########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -105,7 +106,8 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr } } -struct IndexByParquetPathName { +/// A mapping from Parquet column path names to internal field id +pub struct IndexByParquetPathName { Review Comment: We don't need to make them public? ########## crates/iceberg/src/transaction.rs: ########## @@ -205,8 +254,71 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, + file_path: Vec<String>, + ) -> Result<Transaction<'a>> { + let table_metadata = transaction.table.metadata(); + + let data_file = Transaction::parquet_files_to_data_files( + &transaction, + transaction.table.file_io(), + file_path, + table_metadata, + ) + .await?; + + let mut fast_append_action = + Transaction::fast_append(&transaction, Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_file)?; + + fast_append_action.apply(transaction).await + } + /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result<Transaction<'a>> { + pub async fn apply(self, transaction: Transaction<'a>) -> Result<Transaction<'a>> { + // Checks duplicate files + let new_files: HashSet<&str> = self + .snapshot_produce_action + .added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut manifest_stream = transaction.table.inspect().manifests().scan().await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::<StringArray>() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); Review Comment: I think we can do early return here? ########## crates/iceberg/src/transaction.rs: ########## @@ -205,8 +254,71 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, Review Comment: ```suggestion &self ``` ########## crates/iceberg/src/scan.rs: ########## @@ -1413,6 +1462,214 @@ pub mod tests { writer.close().unwrap(); } } + + pub async fn setup_unpartitioned_manifest_files(&mut self) { Review Comment: Seems they are not used? ########## crates/iceberg/src/transaction.rs: ########## @@ -205,8 +254,71 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, + file_path: Vec<String>, + ) -> Result<Transaction<'a>> { + let table_metadata = transaction.table.metadata(); + + let data_file = Transaction::parquet_files_to_data_files( Review Comment: ```suggestion let data_files = Transaction::parquet_files_to_data_files( ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -169,6 +176,48 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + async fn parquet_files_to_data_files( Review Comment: Should we also move this method to `parquet_writer` module? ########## crates/iceberg/src/transaction.rs: ########## @@ -205,8 +254,71 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, + file_path: Vec<String>, + ) -> Result<Transaction<'a>> { + let table_metadata = transaction.table.metadata(); + + let data_file = Transaction::parquet_files_to_data_files( + &transaction, + transaction.table.file_io(), + file_path, + table_metadata, + ) + .await?; + + let mut fast_append_action = + Transaction::fast_append(&transaction, Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_file)?; + + fast_append_action.apply(transaction).await + } + /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result<Transaction<'a>> { + pub async fn apply(self, transaction: Transaction<'a>) -> Result<Transaction<'a>> { Review Comment: ```suggestion pub async fn apply(self) -> Result<Transaction<'a>> { ``` We already have an `Transaction` instance in `SnapshotProduceAction` ########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -301,7 +313,8 @@ impl MinMaxColAggregator { Ok(()) } - fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) { + /// Returns lower and upper bounds + pub fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) { Review Comment: Ditto. ########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -256,7 +267,8 @@ impl MinMaxColAggregator { .or_insert(datum); } - fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { + /// Update statistics + pub fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { Review Comment: Ditto. ########## crates/iceberg/src/transaction.rs: ########## @@ -205,8 +254,71 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, + file_path: Vec<String>, + ) -> Result<Transaction<'a>> { + let table_metadata = transaction.table.metadata(); + + let data_file = Transaction::parquet_files_to_data_files( + &transaction, + transaction.table.file_io(), + file_path, + table_metadata, + ) + .await?; + + let mut fast_append_action = + Transaction::fast_append(&transaction, Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_file)?; + + fast_append_action.apply(transaction).await Review Comment: ```suggestion self.add_data_files(data_file)?; ``` ########## crates/iceberg/src/transaction.rs: ########## @@ -169,6 +176,48 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + async fn parquet_files_to_data_files( + &self, + file_io: &FileIO, + file_paths: Vec<String>, + table_metadata: &TableMetadata, + ) -> Result<Vec<DataFile>> { + // TODO: support adding to partitioned table + let mut data_files: Vec<DataFile> = Vec::new(); + + if !table_metadata.default_spec.fields().is_empty() { Review Comment: It's weird to add this check here, we should check this in the entrance of `add_parquet_files` ########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -226,7 +236,8 @@ struct MinMaxColAggregator { } impl MinMaxColAggregator { - fn new(schema: SchemaRef) -> Self { + /// Creates new and empty `MinMaxColAggregator` + pub fn new(schema: SchemaRef) -> Self { Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org