jonathanc-n commented on code in PR #960: URL: https://github.com/apache/iceberg-rust/pull/960#discussion_r1957989529
########## crates/iceberg/src/transaction.rs: ########## @@ -169,6 +177,180 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + /// Adds existing parquet files + pub async fn add_parquet_files( + self, + file_paths: Vec<String>, + check_duplicate_files: bool, + ) -> Result<Transaction<'a>> { + if check_duplicate_files { + let new_files: HashSet<&str> = file_paths.iter().map(|s| s.as_str()).collect(); + + let mut manifest_stream = self.table.inspect().manifests().scan().await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::<StringArray>() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + } + + let table_metadata = self.table.metadata(); + + let data_files = Transaction::parquet_files_to_data_files( + &self, + self.table.file_io(), + file_paths, + table_metadata, + ) + .await?; + + let mut fast_append_action = self.fast_append(Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_files)?; + + fast_append_action.apply().await + } + + async fn parquet_files_to_data_files( + &self, + file_io: &FileIO, + file_paths: Vec<String>, + table_metadata: &TableMetadata, + ) -> Result<Vec<DataFile>> { + let mut data_files: Vec<DataFile> = Vec::new(); + + // TODO: support adding to partitioned table + if !table_metadata.default_spec.fields().is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Appending to partitioned tables is not supported", + )); + } + + for file_path in file_paths { + let input_file = file_io.new_input(&file_path)?; + let file_metadata = input_file.metadata().await?; + let file_size_in_bytes = file_metadata.size as usize; + let reader = input_file.reader().await?; + + let mut parquet_reader = ArrowFileReader::new(file_metadata, reader); + let parquet_metadata = parquet_reader.get_metadata().await.map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Error reading Parquet metadata: {}", err), + ) + })?; + let builder = self.parquet_to_data_file_builder( + table_metadata.current_schema().clone(), + parquet_metadata, + file_size_in_bytes, + file_path, + )?; + let data_file = builder.build().unwrap(); + data_files.push(data_file); + } + Ok(data_files) + } + + /// `ParquetMetadata` to data file builder + pub fn parquet_to_data_file_builder( Review Comment: I think i mentioned the problem earlier with reusing the function here. Writer returns raw metadata which is what the original `ParquetWriter::to_data_file_builder`. In this case we are doing a read with the arrowfilereader which can only return the parsed metadata. This can be fixed if there is some conversion function to convert raw to parsed but I haven't seemed to be able to find one 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org