liurenjie1024 commented on code in PR #176:
URL: https://github.com/apache/iceberg-rust/pull/176#discussion_r1470518241


##########
crates/iceberg/src/writer/file_writer/parquet_writer.rs:
##########


Review Comment:
   I'm thinking moving this to another crate `iceberg-parquet`, the core crate 
only contains `FileWriter` trait. WDYT? cc @ZENOTME @Xuanwo @Fokko 



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -924,7 +924,7 @@ impl TryFrom<i32> for ManifestStatus {
 }
 
 /// Data file carries data file path, partition tuple, metrics, …
-#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
+#[derive(Debug, PartialEq, Clone, Eq, Builder)]

Review Comment:
   Why this change? `TypedBuilder` checks it in compile time, while `Builder` 
checks result in runtime.



##########
crates/iceberg/src/io.rs:
##########
@@ -278,7 +288,7 @@ impl OutputFile {
     }
 
     /// Creates output file for writing.
-    pub async fn writer(&self) -> Result<impl FileWrite> {
+    pub async fn writer(&self) -> Result<Writer> {

Review Comment:
   We can't make it concrete `Writer` here since we can't expose opendal api to 
user. You can store `Box<dyn FileWriter>` in parquet writer.



##########
crates/iceberg/src/io.rs:
##########
@@ -268,6 +268,16 @@ impl OutputFile {
             .await?)
     }
 
+    /// Delete file.
+    pub async fn delete(&self) -> Result<()> {

Review Comment:
   This seems unsound to me. If it's deleted, we should consume it, so its 
argument should be `self` rather `&self`



##########
crates/iceberg/src/writer/file_writer/parquet_writer.rs:
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The module contains the file writer for parquet file format.
+
+use std::{
+    collections::HashMap,
+    sync::{atomic::AtomicI64, Arc},
+};
+
+use crate::{
+    io::OutputFile,
+    spec::{DataFileBuilder, DataFileFormat},
+    writer::CurrentFileStatus,
+    Error,
+};
+use arrow_schema::SchemaRef;
+use opendal::Writer;
+use parquet::file::properties::WriterProperties;
+use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
+
+use super::{track_writer::TrackWriter, FileWriter, FileWriterBuilder};
+
+/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
+#[derive(Clone)]
+pub struct ParquetWriterBuilder {
+    out_file: OutputFile,
+    /// `buffer_size` determines the initial size of the intermediate buffer.
+    /// The intermediate buffer will automatically be resized if necessary
+    init_buffer_size: usize,
+    props: WriterProperties,
+    schema: SchemaRef,
+}
+
+impl ParquetWriterBuilder {
+    /// Create a new `ParquetWriterBuilder`
+    pub fn new(
+        out_file: OutputFile,
+        init_buffer_size: usize,
+        props: WriterProperties,
+        schema: SchemaRef,
+    ) -> Self {
+        Self {
+            out_file,
+            init_buffer_size,
+            props,
+            schema,
+        }
+    }
+}
+
+impl FileWriterBuilder for ParquetWriterBuilder {
+    type R = ParquetWriter;
+
+    async fn build(self) -> crate::Result<Self::R> {
+        let written_size = Arc::new(AtomicI64::new(0));
+        let inner_writer = TrackWriter::new(self.out_file.writer().await?, 
written_size.clone());
+        let writer = AsyncArrowWriter::try_new(
+            inner_writer,
+            self.schema,
+            self.init_buffer_size,
+            Some(self.props),
+        )
+        .map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "build error from parquet writer",
+            )
+            .with_source(err)
+        })?;
+        Ok(ParquetWriter {
+            out_file: self.out_file,
+            writer,
+            written_size,
+            current_row_num: 0,
+        })
+    }
+}
+
+/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
+pub struct ParquetWriter {
+    out_file: OutputFile,
+    writer: AsyncArrowWriter<TrackWriter<Writer>>,
+    written_size: Arc<AtomicI64>,
+    current_row_num: usize,
+}
+
+impl ParquetWriter {
+    fn to_data_file_builder(
+        metadata: FileMetaData,
+        written_size: usize,
+        file_path: String,
+    ) -> DataFileBuilder {
+        let (column_sizes, value_counts, null_value_counts) = {
+            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+            metadata.row_groups.iter().for_each(|group| {
+                group
+                    .columns
+                    .iter()
+                    .enumerate()
+                    .for_each(|(column_id, column_chunk)| {
+                        if let Some(column_chunk_metadata) = 
&column_chunk.meta_data {
+                            *per_col_size.entry(column_id as i32).or_insert(0) 
+=
+                                column_chunk_metadata.total_compressed_size as 
u64;
+                            *per_col_val_num.entry(column_id as 
i32).or_insert(0) +=
+                                column_chunk_metadata.num_values as u64;
+                            *per_col_null_val_num
+                                .entry(column_id as i32)
+                                .or_insert(0_u64) += column_chunk_metadata
+                                .statistics
+                                .as_ref()
+                                .map(|s| s.null_count)
+                                .unwrap_or(None)
+                                .unwrap_or(0)
+                                as u64;
+                        }
+                    })
+            });
+            (per_col_size, per_col_val_num, per_col_null_val_num)
+        };
+
+        let mut builder = DataFileBuilder::default();
+        builder
+            .file_format(DataFileFormat::Parquet)
+            .column_sizes(column_sizes)
+            .value_counts(value_counts)
+            .null_value_counts(null_value_counts)
+            .file_size_in_bytes(written_size as u64)
+            .record_count(metadata.num_rows as u64)
+            
.key_metadata(metadata.footer_signing_key_metadata.unwrap_or_default())
+            .file_path(file_path)
+            .split_offsets(
+                metadata
+                    .row_groups
+                    .iter()
+                    .filter_map(|group| group.file_offset)
+                    .collect(),
+            );
+        builder
+    }
+}
+
+impl FileWriter for ParquetWriter {
+    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> 
crate::Result<()> {
+        self.current_row_num += batch.num_rows();
+        self.writer.write(batch).await.map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "write error from parquet writer",
+            )
+            .with_source(err)
+        })?;
+        Ok(())
+    }
+
+    async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
+        let metadata = self.writer.close().await.map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "close error from parquet writer",
+            )
+            .with_source(err)
+        })?;
+        if self.current_row_num == 0 {
+            self.out_file.delete().await?;
+            return Ok(vec![]);
+        }
+        let written_size = 
self.written_size.load(std::sync::atomic::Ordering::Relaxed);
+
+        Ok(vec![Self::to_data_file_builder(
+            metadata,
+            written_size as usize,
+            self.out_file.location().to_string(),
+        )])
+    }
+}
+
+impl CurrentFileStatus for ParquetWriter {
+    fn current_file_path(&self) -> String {
+        self.out_file.location().to_string()
+    }
+
+    fn current_row_num(&self) -> usize {
+        self.current_row_num
+    }
+
+    fn current_written_size(&self) -> usize {
+        self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use anyhow::Result;
+    use arrow_array::ArrayRef;
+    use arrow_array::Int64Array;
+    use arrow_array::RecordBatch;
+    use bytes::Bytes;
+    use futures::AsyncReadExt;
+    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    use tempfile::TempDir;
+
+    use super::*;
+    use crate::io::FileIOBuilder;
+
+    #[derive(Clone)]
+    struct TestLocationGen;
+
+    #[tokio::test]
+    async fn test_parquet_writer() -> Result<()> {
+        // create output file
+        let tmp_dir = TempDir::new().unwrap();
+        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), 
"test.parquet");
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+        let output_file = file_io.new_output(&full_path).unwrap();
+
+        // prepare data
+        let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
+        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+
+        // write data
+        let mut pw = ParquetWriterBuilder::new(
+            output_file.clone(),
+            0,
+            WriterProperties::builder().build(),
+            to_write.schema(),
+        )
+        .build()
+        .await?;
+        pw.write(&to_write).await?;
+        pw.write(&to_write).await?;
+        pw.close().await?;

Review Comment:
   We should also add some checks for data file fields.



##########
crates/iceberg/src/writer/file_writer/parquet_writer.rs:
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The module contains the file writer for parquet file format.
+
+use std::{
+    collections::HashMap,
+    sync::{atomic::AtomicI64, Arc},
+};
+
+use crate::{
+    io::OutputFile,
+    spec::{DataFileBuilder, DataFileFormat},
+    writer::CurrentFileStatus,
+    Error,
+};
+use arrow_schema::SchemaRef;
+use opendal::Writer;
+use parquet::file::properties::WriterProperties;
+use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
+
+use super::{track_writer::TrackWriter, FileWriter, FileWriterBuilder};
+
+/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
+#[derive(Clone)]
+pub struct ParquetWriterBuilder {
+    out_file: OutputFile,
+    /// `buffer_size` determines the initial size of the intermediate buffer.
+    /// The intermediate buffer will automatically be resized if necessary
+    init_buffer_size: usize,
+    props: WriterProperties,
+    schema: SchemaRef,
+}
+
+impl ParquetWriterBuilder {
+    /// Create a new `ParquetWriterBuilder`
+    pub fn new(
+        out_file: OutputFile,
+        init_buffer_size: usize,
+        props: WriterProperties,
+        schema: SchemaRef,
+    ) -> Self {
+        Self {
+            out_file,
+            init_buffer_size,
+            props,
+            schema,
+        }
+    }
+}
+
+impl FileWriterBuilder for ParquetWriterBuilder {
+    type R = ParquetWriter;
+
+    async fn build(self) -> crate::Result<Self::R> {
+        let written_size = Arc::new(AtomicI64::new(0));
+        let inner_writer = TrackWriter::new(self.out_file.writer().await?, 
written_size.clone());
+        let writer = AsyncArrowWriter::try_new(
+            inner_writer,
+            self.schema,
+            self.init_buffer_size,
+            Some(self.props),
+        )
+        .map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "build error from parquet writer",
+            )
+            .with_source(err)
+        })?;
+        Ok(ParquetWriter {
+            out_file: self.out_file,
+            writer,
+            written_size,
+            current_row_num: 0,
+        })
+    }
+}
+
+/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
+pub struct ParquetWriter {
+    out_file: OutputFile,
+    writer: AsyncArrowWriter<TrackWriter<Writer>>,
+    written_size: Arc<AtomicI64>,
+    current_row_num: usize,
+}
+
+impl ParquetWriter {
+    fn to_data_file_builder(
+        metadata: FileMetaData,
+        written_size: usize,
+        file_path: String,
+    ) -> DataFileBuilder {
+        let (column_sizes, value_counts, null_value_counts) = {
+            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+            metadata.row_groups.iter().for_each(|group| {
+                group
+                    .columns
+                    .iter()
+                    .enumerate()
+                    .for_each(|(column_id, column_chunk)| {

Review Comment:
   This is incorrect, the `column_id` should be `field_id` of schema.



##########
crates/iceberg/src/io.rs:
##########
@@ -245,7 +245,7 @@ pub trait FileWrite: AsyncWrite {}
 impl<T> FileWrite for T where T: AsyncWrite {}
 
 /// Output file is used for writing to files..
-#[derive(Debug)]
+#[derive(Debug, Clone)]

Review Comment:
   Is it possible to remove `Clone`? It's unsound to me to clone an 
`OutputFile`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to