CTTY commented on code in PR #1657:
URL: https://github.com/apache/iceberg-rust/pull/1657#discussion_r2384579825


##########
crates/iceberg/src/writer/base_writer/data_file_writer.rs:
##########
@@ -20,74 +20,101 @@
 use arrow_array::RecordBatch;
 use itertools::Itertools;
 
-use crate::Result;
-use crate::spec::{DataContentType, DataFile, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DEFAULT_PARTITION_SPEC_ID, DataContentType, DataFile, 
PartitionKey, Struct};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::RollingFileWriter;
 use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
 
 /// Builder for `DataFileWriter`.
 #[derive(Clone, Debug)]
-pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
-    inner: B,
-    partition_value: Option<Struct>,
-    partition_spec_id: i32,
+pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, 
F: FileNameGenerator> {
+    inner_writer: RollingFileWriter<B, L, F>,
+    partition_key: Option<PartitionKey>,
 }
 
-impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
+    DataFileWriterBuilder<B, L, F>
+{
     /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
-    pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: 
i32) -> Self {
+    pub fn new(
+        inner_writer: RollingFileWriter<B, L, F>,
+        partition_key: Option<PartitionKey>,
+    ) -> Self {
         Self {
-            inner,
-            partition_value,
-            partition_spec_id,
+            inner_writer,
+            partition_key,
         }
     }
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
-    type R = DataFileWriter<B>;
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriterBuilder
+    for DataFileWriterBuilder<B, L, F>
+{
+    type R = DataFileWriter<B, L, F>;
 
     async fn build(self) -> Result<Self::R> {
         Ok(DataFileWriter {
-            inner_writer: Some(self.inner.clone().build().await?),
-            partition_value: self.partition_value.unwrap_or(Struct::empty()),
-            partition_spec_id: self.partition_spec_id,
+            inner_writer: Some(self.inner_writer),
+            partition_key: self.partition_key,
         })
     }
 }
 
 /// A writer write data is within one spec/partition.
 #[derive(Debug)]
-pub struct DataFileWriter<B: FileWriterBuilder> {
-    inner_writer: Option<B::R>,
-    partition_value: Struct,
-    partition_spec_id: i32,
+pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
+    inner_writer: Option<RollingFileWriter<B, L, F>>,
+    partition_key: Option<PartitionKey>,
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriter
+    for DataFileWriter<B, L, F>
+{
     async fn write(&mut self, batch: RecordBatch) -> Result<()> {
-        self.inner_writer.as_mut().unwrap().write(&batch).await
+        self.inner_writer
+            .as_mut()
+            .unwrap()
+            .write(&self.partition_key, &batch)
+            .await
     }
 
     async fn close(&mut self) -> Result<Vec<DataFile>> {
-        let writer = self.inner_writer.take().unwrap();
-        Ok(writer
-            .close()
-            .await?
-            .into_iter()
-            .map(|mut res| {
-                res.content(DataContentType::Data);
-                res.partition(self.partition_value.clone());
-                res.partition_spec_id(self.partition_spec_id);
-                res.build().expect("Guaranteed to be valid")
-            })
-            .collect_vec())
+        if let Some(writer) = self.inner_writer.take() {
+            Ok(writer
+                .close()
+                .await?
+                .into_iter()
+                .map(|mut res| {
+                    res.content(DataContentType::Data);
+                    res.partition(
+                        self.partition_key
+                            .as_ref()
+                            .map_or(Struct::empty(), |pk| pk.data().clone()),
+                    );
+                    res.partition_spec_id(
+                        self.partition_key
+                            .as_ref()
+                            .map_or(DEFAULT_PARTITION_SPEC_ID, |pk| 
pk.spec().spec_id()),
+                    );

Review Comment:
   I've added 
   
   ```
    #[builder(default = "Struct::empty()")]
       pub(crate) partition: Struct,
   ```
   and 
   ```
       #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
       pub(crate) partition_spec_id: i32,
   ```
   to make building data files easier, it's following the same logic tho



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to