Xuanwo commented on code in PR #1074: URL: https://github.com/apache/iceberg-rust/pull/1074#discussion_r1994104147
########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -352,89 +354,25 @@ impl ParquetWriter { Ok(data_files) } - fn to_data_file_builder( - schema: SchemaRef, - metadata: FileMetaData, - written_size: usize, - file_path: String, - ) -> Result<DataFileBuilder> { - let index_by_parquet_path = { - let mut visitor = IndexByParquetPathName::new(); - visit_schema(&schema, &mut visitor)?; - visitor - }; - - let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = { - let mut per_col_size: HashMap<i32, u64> = HashMap::new(); - let mut per_col_val_num: HashMap<i32, u64> = HashMap::new(); - let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new(); - let mut min_max_agg = MinMaxColAggregator::new(schema); - - for row_group in &metadata.row_groups { - for column_chunk in row_group.columns.iter() { - let Some(column_chunk_metadata) = &column_chunk.meta_data else { - continue; - }; - let physical_type = column_chunk_metadata.type_; - let Some(&field_id) = - index_by_parquet_path.get(&column_chunk_metadata.path_in_schema.join(".")) - else { - // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163 - // Ignore the field if it is not in schema. - continue; - }; - *per_col_size.entry(field_id).or_insert(0) += - column_chunk_metadata.total_compressed_size as u64; - *per_col_val_num.entry(field_id).or_insert(0) += - column_chunk_metadata.num_values as u64; - if let Some(null_count) = column_chunk_metadata - .statistics - .as_ref() - .and_then(|s| s.null_count) - { - *per_col_null_val_num.entry(field_id).or_insert(0_u64) += null_count as u64; - } - if let Some(statistics) = &column_chunk_metadata.statistics { - min_max_agg.update( - field_id, - from_thrift(physical_type.try_into()?, Some(statistics.clone()))? - .unwrap(), - )?; - } - } - } + fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result<ParquetMetaData> { + let mut buffer = Vec::new(); + { + let mut protocol = TCompactOutputProtocol::new(&mut buffer); + file_metadata + .write_to_out_protocol(&mut protocol) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata") + .with_source(err) + })?; + + protocol.flush().map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err) + })?; + } - ( - per_col_size, - per_col_val_num, - per_col_null_val_num, - min_max_agg.produce(), - ) - }; + let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).unwrap(); Review Comment: How about returning an error instead? ########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -551,19 +489,23 @@ impl FileWriter for ParquetWriter { Ok(()) } - async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> { - let Some(writer) = self.inner_writer else { - return Ok(vec![]); + async fn close(mut self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> { + let writer = match self.inner_writer.take() { + Some(writer) => writer, + None => return Ok(vec![]), }; + let metadata = writer.close().await.map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err) })?; let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed); - Ok(vec![Self::to_data_file_builder( + let parquet_metadata = Arc::new(self.thrift_to_parquet_metadata(metadata).unwrap()); Review Comment: The same. ########## Cargo.toml: ########## @@ -94,6 +94,7 @@ serde_json = "1.0.138" serde_repr = "0.1.16" serde_with = "3.4" tempfile = "3.18" +thrift = "0.17.0" Review Comment: We need `thrift` here since `parquet` is using it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org