liurenjie1024 commented on code in PR #960:
URL: https://github.com/apache/iceberg-rust/pull/960#discussion_r1957494635


##########
crates/iceberg/src/transaction.rs:
##########
@@ -169,6 +177,180 @@ impl<'a> Transaction<'a> {
 
         catalog.update_table(table_commit).await
     }
+
+    /// Adds existing parquet files
+    pub async fn add_parquet_files(

Review Comment:
   As I suggested, I think this should be part `FastAppendAction` api. I mean 
we should add a `add_parquet_file` api in `FastAppendAction`



##########
crates/iceberg/src/transaction.rs:
##########
@@ -169,6 +177,180 @@ impl<'a> Transaction<'a> {
 
         catalog.update_table(table_commit).await
     }
+
+    /// Adds existing parquet files
+    pub async fn add_parquet_files(
+        self,
+        file_paths: Vec<String>,
+        check_duplicate_files: bool,

Review Comment:
   I don't think this should be optional, instead we should always check it. 



##########
crates/iceberg/src/transaction.rs:
##########
@@ -169,6 +177,180 @@ impl<'a> Transaction<'a> {
 
         catalog.update_table(table_commit).await
     }
+
+    /// Adds existing parquet files
+    pub async fn add_parquet_files(
+        self,
+        file_paths: Vec<String>,
+        check_duplicate_files: bool,
+    ) -> Result<Transaction<'a>> {
+        if check_duplicate_files {
+            let new_files: HashSet<&str> = file_paths.iter().map(|s| 
s.as_str()).collect();
+
+            let mut manifest_stream = 
self.table.inspect().manifests().scan().await?;
+            let mut referenced_files = Vec::new();
+
+            while let Some(batch) = manifest_stream.try_next().await? {
+                let file_path_array = batch
+                    .column(1)
+                    .as_any()
+                    .downcast_ref::<StringArray>()
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            "Failed to downcast file_path column to 
StringArray",
+                        )
+                    })?;
+
+                for i in 0..batch.num_rows() {
+                    let file_path = file_path_array.value(i);
+                    if new_files.contains(file_path) {
+                        referenced_files.push(file_path.to_string());
+                    }
+                }
+            }
+
+            if !referenced_files.is_empty() {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot add files that are already referenced by 
table, files: {}",
+                        referenced_files.join(", ")
+                    ),
+                ));
+            }
+        }
+
+        let table_metadata = self.table.metadata();
+
+        let data_files = Transaction::parquet_files_to_data_files(
+            &self,
+            self.table.file_io(),
+            file_paths,
+            table_metadata,
+        )
+        .await?;
+
+        let mut fast_append_action = self.fast_append(Some(Uuid::new_v4()), 
Vec::new())?;
+        fast_append_action.add_data_files(data_files)?;
+
+        fast_append_action.apply().await
+    }
+
+    async fn parquet_files_to_data_files(
+        &self,
+        file_io: &FileIO,
+        file_paths: Vec<String>,
+        table_metadata: &TableMetadata,
+    ) -> Result<Vec<DataFile>> {
+        let mut data_files: Vec<DataFile> = Vec::new();
+
+        // TODO: support adding to partitioned table
+        if !table_metadata.default_spec.fields().is_empty() {
+            return Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "Appending to partitioned tables is not supported",
+            ));
+        }
+
+        for file_path in file_paths {
+            let input_file = file_io.new_input(&file_path)?;
+            let file_metadata = input_file.metadata().await?;
+            let file_size_in_bytes = file_metadata.size as usize;
+            let reader = input_file.reader().await?;
+
+            let mut parquet_reader = ArrowFileReader::new(file_metadata, 
reader);
+            let parquet_metadata = 
parquet_reader.get_metadata().await.map_err(|err| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Error reading Parquet metadata: {}", err),
+                )
+            })?;
+            let builder = self.parquet_to_data_file_builder(
+                table_metadata.current_schema().clone(),
+                parquet_metadata,
+                file_size_in_bytes,
+                file_path,
+            )?;
+            let data_file = builder.build().unwrap();
+            data_files.push(data_file);
+        }
+        Ok(data_files)
+    }
+
+    /// `ParquetMetadata` to data file builder
+    pub fn parquet_to_data_file_builder(

Review Comment:
   I have some suggestion for this method:
   1. It has a lot of duplicated with parquet file writer, we should reuse them.
   2. This method should not be part of `Transaction`, it should be parquet 
module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to