liurenjie1024 commented on code in PR #1657:
URL: https://github.com/apache/iceberg-rust/pull/1657#discussion_r2381465307


##########
crates/iceberg/src/writer/file_writer/rolling_writer.rs:
##########
@@ -15,67 +15,121 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::{Debug, Formatter};
+
 use arrow_array::RecordBatch;
 
-use crate::spec::DataFileBuilder;
+use crate::io::{FileIO, OutputFile};
+use crate::spec::{DataFileBuilder, 
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey};
 use crate::writer::CurrentFileStatus;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
 use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
 use crate::{Error, ErrorKind, Result};
 
-/// Builder for creating a `RollingFileWriter` that rolls over to a new file
-/// when the data size exceeds a target threshold.
-#[derive(Clone)]
-pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
+/// A writer that automatically rolls over to a new file when the data size
+/// exceeds a target threshold.
+///
+/// This writer wraps another file writer that tracks the amount of data 
written.
+/// When the data size exceeds the target size, it closes the current file and
+/// starts writing to a new one.
+pub struct RollingFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
+    inner: Option<B::R>,
     inner_builder: B,
     target_file_size: usize,
+    data_file_builders: Vec<DataFileBuilder>,
+    file_io: FileIO,
+    location_generator: L,
+    file_name_generator: F,
+}
+
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> Clone

Review Comment:
   Why we need to implement clone? This is a stateful struct since it contains 
generated data files, the semantics of clone is unclear.



##########
crates/iceberg/src/writer/base_writer/data_file_writer.rs:
##########
@@ -20,74 +20,101 @@
 use arrow_array::RecordBatch;
 use itertools::Itertools;
 
-use crate::Result;
-use crate::spec::{DataContentType, DataFile, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DEFAULT_PARTITION_SPEC_ID, DataContentType, DataFile, 
PartitionKey, Struct};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::RollingFileWriter;
 use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
 
 /// Builder for `DataFileWriter`.
 #[derive(Clone, Debug)]
-pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
-    inner: B,
-    partition_value: Option<Struct>,
-    partition_spec_id: i32,
+pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, 
F: FileNameGenerator> {
+    inner_writer: RollingFileWriter<B, L, F>,
+    partition_key: Option<PartitionKey>,
 }
 
-impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
+    DataFileWriterBuilder<B, L, F>
+{
     /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
-    pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: 
i32) -> Self {
+    pub fn new(
+        inner_writer: RollingFileWriter<B, L, F>,
+        partition_key: Option<PartitionKey>,
+    ) -> Self {
         Self {
-            inner,
-            partition_value,
-            partition_spec_id,
+            inner_writer,
+            partition_key,
         }
     }
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
-    type R = DataFileWriter<B>;
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriterBuilder
+    for DataFileWriterBuilder<B, L, F>
+{
+    type R = DataFileWriter<B, L, F>;
 
     async fn build(self) -> Result<Self::R> {
         Ok(DataFileWriter {
-            inner_writer: Some(self.inner.clone().build().await?),
-            partition_value: self.partition_value.unwrap_or(Struct::empty()),
-            partition_spec_id: self.partition_spec_id,
+            inner_writer: Some(self.inner_writer),
+            partition_key: self.partition_key,
         })
     }
 }
 
 /// A writer write data is within one spec/partition.
 #[derive(Debug)]
-pub struct DataFileWriter<B: FileWriterBuilder> {
-    inner_writer: Option<B::R>,
-    partition_value: Struct,
-    partition_spec_id: i32,
+pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
+    inner_writer: Option<RollingFileWriter<B, L, F>>,
+    partition_key: Option<PartitionKey>,
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriter
+    for DataFileWriter<B, L, F>
+{
     async fn write(&mut self, batch: RecordBatch) -> Result<()> {
-        self.inner_writer.as_mut().unwrap().write(&batch).await
+        self.inner_writer
+            .as_mut()
+            .unwrap()

Review Comment:
   We should not use `unwrap` here. We could use `if let Some(_)` to make it 
easier to read. 



##########
crates/iceberg/src/writer/base_writer/equality_delete_writer.rs:
##########
@@ -157,8 +168,16 @@ impl<B: FileWriterBuilder> IcebergWriter for 
EqualityDeleteFileWriter<B> {
                 .map(|mut res| {
                     res.content(crate::spec::DataContentType::EqualityDeletes);
                     
res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
-                    res.partition(self.partition_value.clone());
-                    res.partition_spec_id(self.partition_spec_id);
+                    res.partition(

Review Comment:
   Similar question.



##########
crates/iceberg/src/writer/base_writer/data_file_writer.rs:
##########
@@ -20,74 +20,101 @@
 use arrow_array::RecordBatch;
 use itertools::Itertools;
 
-use crate::Result;
-use crate::spec::{DataContentType, DataFile, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DEFAULT_PARTITION_SPEC_ID, DataContentType, DataFile, 
PartitionKey, Struct};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::RollingFileWriter;
 use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
 
 /// Builder for `DataFileWriter`.
 #[derive(Clone, Debug)]
-pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
-    inner: B,
-    partition_value: Option<Struct>,
-    partition_spec_id: i32,
+pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, 
F: FileNameGenerator> {
+    inner_writer: RollingFileWriter<B, L, F>,
+    partition_key: Option<PartitionKey>,
 }
 
-impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
+    DataFileWriterBuilder<B, L, F>
+{
     /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
-    pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: 
i32) -> Self {
+    pub fn new(
+        inner_writer: RollingFileWriter<B, L, F>,
+        partition_key: Option<PartitionKey>,
+    ) -> Self {
         Self {
-            inner,
-            partition_value,
-            partition_spec_id,
+            inner_writer,
+            partition_key,
         }
     }
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
-    type R = DataFileWriter<B>;
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriterBuilder
+    for DataFileWriterBuilder<B, L, F>
+{
+    type R = DataFileWriter<B, L, F>;
 
     async fn build(self) -> Result<Self::R> {
         Ok(DataFileWriter {
-            inner_writer: Some(self.inner.clone().build().await?),
-            partition_value: self.partition_value.unwrap_or(Struct::empty()),
-            partition_spec_id: self.partition_spec_id,
+            inner_writer: Some(self.inner_writer),
+            partition_key: self.partition_key,
         })
     }
 }
 
 /// A writer write data is within one spec/partition.
 #[derive(Debug)]
-pub struct DataFileWriter<B: FileWriterBuilder> {
-    inner_writer: Option<B::R>,
-    partition_value: Struct,
-    partition_spec_id: i32,
+pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
+    inner_writer: Option<RollingFileWriter<B, L, F>>,
+    partition_key: Option<PartitionKey>,
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriter
+    for DataFileWriter<B, L, F>
+{
     async fn write(&mut self, batch: RecordBatch) -> Result<()> {
-        self.inner_writer.as_mut().unwrap().write(&batch).await
+        self.inner_writer
+            .as_mut()
+            .unwrap()
+            .write(&self.partition_key, &batch)
+            .await
     }
 
     async fn close(&mut self) -> Result<Vec<DataFile>> {
-        let writer = self.inner_writer.take().unwrap();
-        Ok(writer
-            .close()
-            .await?
-            .into_iter()
-            .map(|mut res| {
-                res.content(DataContentType::Data);
-                res.partition(self.partition_value.clone());
-                res.partition_spec_id(self.partition_spec_id);
-                res.build().expect("Guaranteed to be valid")
-            })
-            .collect_vec())
+        if let Some(writer) = self.inner_writer.take() {
+            Ok(writer
+                .close()
+                .await?
+                .into_iter()
+                .map(|mut res| {
+                    res.content(DataContentType::Data);
+                    res.partition(
+                        self.partition_key
+                            .as_ref()
+                            .map_or(Struct::empty(), |pk| pk.data().clone()),
+                    );
+                    res.partition_spec_id(
+                        self.partition_key
+                            .as_ref()
+                            .map_or(DEFAULT_PARTITION_SPEC_ID, |pk| 
pk.spec().spec_id()),
+                    );

Review Comment:
   Do we have to do this? I assume we could skip this for unpartitioned data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to