jonathanc-n commented on code in PR #960:
URL: https://github.com/apache/iceberg-rust/pull/960#discussion_r1955169524


##########
crates/iceberg/src/transaction.rs:
##########
@@ -169,6 +175,172 @@ impl<'a> Transaction<'a> {
 
         catalog.update_table(table_commit).await
     }
+
+    /// Adds existing parquet files
+    pub async fn add_parquet_files(
+        self,
+        file_paths: Vec<String>,
+        check_duplicate_files: bool,
+    ) -> Result<Transaction<'a>> {
+        if check_duplicate_files {
+            let unique_paths: HashSet<_> = file_paths.iter().collect();
+            if unique_paths.len() != file_paths.len() {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Duplicate file paths provided",
+                ));
+            }
+        }
+        let table_metadata = self.table.metadata();
+
+        let data_files = Transaction::parquet_files_to_data_files(
+            &self,
+            self.table.file_io(),
+            file_paths,
+            table_metadata,
+        )
+        .await?;
+
+        let mut fast_append_action = self.fast_append(Some(Uuid::new_v4()), 
Vec::new())?;
+        fast_append_action.add_data_files(data_files)?;
+
+        fast_append_action.apply().await
+    }
+
+    async fn parquet_files_to_data_files(
+        &self,
+        file_io: &FileIO,
+        file_paths: Vec<String>,
+        table_metadata: &TableMetadata,
+    ) -> Result<Vec<DataFile>> {
+        let mut data_files: Vec<DataFile> = Vec::new();
+        let partition_value =
+            
self.create_default_partition_value(&table_metadata.default_partition_type)?;
+
+        for file_path in file_paths {
+            let input_file = file_io.new_input(&file_path)?;
+            if !input_file.exists().await? {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "File does not exist".to_string(),
+                ));
+            }
+            let file_metadata = input_file.metadata().await?;
+            let file_size_in_bytes = file_metadata.size as usize;
+            let reader = input_file.reader().await?;
+
+            let mut parquet_reader = ArrowFileReader::new(file_metadata, 
reader);
+            let parquet_metadata = 
parquet_reader.get_metadata().await.map_err(|err| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Error reading Parquet metadata: {}", err),
+                )
+            })?;
+            let builder = self.parquet_to_data_file_builder(
+                table_metadata.current_schema().clone(),
+                parquet_metadata,
+                &partition_value,
+                file_size_in_bytes,
+                file_path,
+            )?;
+            let data_file = builder.build().unwrap();
+            data_files.push(data_file);
+        }
+        Ok(data_files)
+    }
+
+    /// `ParquetMetadata` to data file builder
+    pub fn parquet_to_data_file_builder(
+        &self,
+        schema: SchemaRef,
+        metadata: Arc<ParquetMetaData>,
+        partition: &Struct,
+        written_size: usize,
+        file_path: String,
+    ) -> Result<DataFileBuilder> {
+        let index_by_parquet_path = {
+            let mut visitor = IndexByParquetPathName::new();
+            visit_schema(&schema, &mut visitor)?;
+            visitor
+        };
+
+        let (column_sizes, value_counts, null_value_counts, (lower_bounds, 
upper_bounds)) = {
+            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut min_max_agg = MinMaxColAggregator::new(schema);
+
+            for row_group in metadata.row_groups() {
+                for column_chunk_metadata in row_group.columns() {
+                    let parquet_path = 
column_chunk_metadata.column_descr().path().string();
+
+                    let Some(&field_id) = 
index_by_parquet_path.get(&parquet_path) else {
+                        continue;
+                    };
+
+                    *per_col_size.entry(field_id).or_insert(0) +=
+                        column_chunk_metadata.compressed_size() as u64;
+                    *per_col_val_num.entry(field_id).or_insert(0) +=
+                        column_chunk_metadata.num_values() as u64;
+
+                    if let Some(statistics) = 
column_chunk_metadata.statistics() {
+                        if let Some(null_count) = statistics.null_count_opt() {
+                            *per_col_null_val_num.entry(field_id).or_insert(0) 
+= null_count;
+                        }
+
+                        min_max_agg.update(field_id, statistics.clone())?;
+                    }
+                }
+            }
+            (
+                per_col_size,
+                per_col_val_num,
+                per_col_null_val_num,
+                min_max_agg.produce(),
+            )
+        };
+
+        let mut builder = DataFileBuilder::default();
+        builder
+            .content(DataContentType::Data)
+            .file_path(file_path)
+            .file_format(DataFileFormat::Parquet)
+            .partition(partition.clone())
+            .record_count(metadata.file_metadata().num_rows() as u64)
+            .file_size_in_bytes(written_size as u64)
+            .column_sizes(column_sizes)
+            .value_counts(value_counts)
+            .null_value_counts(null_value_counts)
+            .lower_bounds(lower_bounds)
+            .upper_bounds(upper_bounds)
+            .split_offsets(
+                metadata
+                    .row_groups()
+                    .iter()
+                    .filter_map(|group| group.file_offset())
+                    .collect(),
+            );
+
+        Ok(builder)
+    }
+
+    fn create_default_partition_value(&self, partition_type: &StructType) -> 
Result<Struct> {
+        let literals = partition_type
+            .fields()
+            .iter()
+            .map(|field| {
+                let primitive_type = 
field.field_type.as_primitive_type().ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "Partition field should only be a primitive type.",
+                    )
+                })?;
+                Ok(Some(primitive_type.type_to_literal()))

Review Comment:
   Yeah this makes sense, we can go with first option and add a todo on the 
check if partition exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to