sdd commented on code in PR #383:
URL: https://github.com/apache/iceberg-rust/pull/383#discussion_r1630753095


##########
crates/iceberg/src/writer/file_writer/parquet_writer.rs:
##########
@@ -78,106 +89,407 @@ impl<T: LocationGenerator, F: FileNameGenerator> 
FileWriterBuilder for ParquetWr
     type R = ParquetWriter;
 
     async fn build(self) -> crate::Result<Self::R> {
-        // Fetch field id from schema
-        let field_ids = self
-            .schema
-            .fields()
-            .iter()
-            .map(|field| {
-                field
-                    .metadata()
-                    .get(PARQUET_FIELD_ID_META_KEY)
-                    .ok_or_else(|| {
-                        Error::new(
-                            crate::ErrorKind::Unexpected,
-                            "Field id not found in arrow schema metadata.",
-                        )
-                    })?
-                    .parse::<i32>()
-                    .map_err(|err| {
-                        Error::new(crate::ErrorKind::Unexpected, "Failed to 
parse field id.")
-                            .with_source(err)
-                    })
-            })
-            .collect::<crate::Result<Vec<_>>>()?;
-
+        let arrow_schema: ArrowSchemaRef = 
Arc::new(self.schema.as_ref().try_into()?);
         let written_size = Arc::new(AtomicI64::new(0));
         let out_file = self.file_io.new_output(
             self.location_generator
                 
.generate_location(&self.file_name_generator.generate_file_name()),
         )?;
         let inner_writer = TrackWriter::new(out_file.writer().await?, 
written_size.clone());
         let async_writer = AsyncFileWriter::new(inner_writer);
-        let writer = AsyncArrowWriter::try_new(async_writer, 
self.schema.clone(), Some(self.props))
-            .map_err(|err| {
-                Error::new(
-                    crate::ErrorKind::Unexpected,
-                    "Failed to build parquet writer.",
-                )
-                .with_source(err)
-            })?;
+        let writer =
+            AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), 
Some(self.props))
+                .map_err(|err| {
+                    Error::new(
+                        crate::ErrorKind::Unexpected,
+                        "Failed to build parquet writer.",
+                    )
+                    .with_source(err)
+                })?;
 
         Ok(ParquetWriter {
+            schema: self.schema.clone(),
             writer,
             written_size,
             current_row_num: 0,
             out_file,
-            field_ids,
         })
     }
 }
 
+#[derive(Default)]
+struct IndexByParquetPathName {
+    name_to_id: HashMap<String, i32>,
+
+    field_names: Vec<String>,
+
+    field_id: i32,
+}
+
+impl IndexByParquetPathName {
+    pub fn indexes(self) -> HashMap<String, i32> {
+        self.name_to_id
+    }
+}
+
+impl SchemaVisitor for IndexByParquetPathName {
+    type T = ();
+
+    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names.push(field.name.to_string());
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names.push(format!("list.{}", field.name));
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names
+            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names
+            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> 
{
+        Ok(())
+    }
+
+    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> 
Result<Self::T> {
+        Ok(())
+    }
+
+    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> 
Result<Self::T> {
+        Ok(())
+    }
+
+    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> 
Result<Self::T> {
+        Ok(())
+    }
+
+    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
+        let full_name = self.field_names.iter().map(String::as_str).join(".");
+        let field_id = self.field_id;
+        if let Some(existing_field_id) = 
self.name_to_id.get(full_name.as_str()) {
+            return Err(Error::new(ErrorKind::DataInvalid, format!("Invalid 
schema: multiple fields for name {full_name}: {field_id} and 
{existing_field_id}")));
+        } else {
+            self.name_to_id.insert(full_name, field_id);
+        }
+
+        Ok(())
+    }
+}
+
 /// `ParquetWriter`` is used to write arrow data into parquet file on storage.
 pub struct ParquetWriter {
+    schema: SchemaRef,
     out_file: OutputFile,
     writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
     written_size: Arc<AtomicI64>,
     current_row_num: usize,
-    field_ids: Vec<i32>,
+}
+
+/// Used to aggregate min and max value of each column.
+struct MinMaxColAggregator {
+    lower_bounds: HashMap<i32, Datum>,
+    upper_bounds: HashMap<i32, Datum>,
+    schema: SchemaRef,
+}
+
+impl MinMaxColAggregator {
+    fn new(schema: SchemaRef) -> Self {
+        Self {
+            lower_bounds: HashMap::new(),
+            upper_bounds: HashMap::new(),
+            schema,
+        }
+    }
+
+    fn update(&mut self, col_id: i32, value: Statistics) -> Result<()> {
+        let crate::spec::Type::Primitive(ty) = &self
+            .schema
+            .field_by_id(col_id)
+            .ok_or_else(|| {
+                Error::new(
+                    crate::ErrorKind::Unexpected,
+                    "Failed to get field by id in schema.",
+                )
+            })?
+            .field_type
+            .as_ref()
+        else {
+            return Err(Error::new(
+                crate::ErrorKind::Unexpected,
+                "Composed type is not supported for min max aggregation.",
+            ));
+        };
+
+        macro_rules! update_stat {
+            ($self:ident, $stat:ident, $convert_func:expr) => {
+                if $stat.min_is_exact() {
+                    let val = $convert_func($stat.min().clone())?;
+                    match $self.lower_bounds.entry(col_id) {
+                        std::collections::hash_map::Entry::Occupied(mut entry) 
=> {
+                            if entry.get() > &val {
+                                entry.insert(val);
+                            }
+                        }
+                        std::collections::hash_map::Entry::Vacant(entry) => {
+                            entry.insert(val);
+                        }
+                    }
+                }
+                if $stat.max_is_exact() {
+                    let val = $convert_func($stat.max().clone())?;
+                    match $self.upper_bounds.entry(col_id) {
+                        std::collections::hash_map::Entry::Occupied(mut entry) 
=> {
+                            if entry.get() < &val {
+                                entry.insert(val);
+                            }
+                        }
+                        std::collections::hash_map::Entry::Vacant(entry) => {
+                            entry.insert(val);
+                        }
+                    }
+                }
+            };
+        }
+
+        match (ty, value) {
+            (PrimitiveType::Boolean, Statistics::Boolean(stat)) => {
+                let convert_func = |v: bool| 
Result::<Datum>::Ok(Datum::bool(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Int, Statistics::Int32(stat)) => {
+                let convert_func = |v: i32| Result::<Datum>::Ok(Datum::int(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Long, Statistics::Int64(stat)) => {
+                let convert_func = |v: i64| 
Result::<Datum>::Ok(Datum::long(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Float, Statistics::Float(stat)) => {
+                let convert_func = |v: f32| 
Result::<Datum>::Ok(Datum::float(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Double, Statistics::Double(stat)) => {
+                let convert_func = |v: f64| 
Result::<Datum>::Ok(Datum::double(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::String, Statistics::ByteArray(stat)) => {
+                let convert_func = |v: ByteArray| {
+                    Result::<Datum>::Ok(Datum::string(
+                        String::from_utf8(v.data().to_vec()).unwrap(),
+                    ))
+                };
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
+                let convert_func =
+                    |v: ByteArray| 
Result::<Datum>::Ok(Datum::binary(v.data().to_vec()));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Date, Statistics::Int32(stat)) => {
+                let convert_func = |v: i32| 
Result::<Datum>::Ok(Datum::date(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Time, Statistics::Int64(stat)) => {
+                let convert_func = |v: i64| Datum::time_micros(v);
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Timestamp, Statistics::Int64(stat)) => {
+                let convert_func = |v: i64| 
Result::<Datum>::Ok(Datum::timestamp_micros(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Timestamptz, Statistics::Int64(stat)) => {
+                let convert_func = |v: i64| 
Result::<Datum>::Ok(Datum::timestamptz_micros(v));
+                update_stat!(self, stat, convert_func);
+            }
+            (
+                PrimitiveType::Decimal {
+                    precision: _,
+                    scale: _,
+                },
+                Statistics::ByteArray(stat),
+            ) => {
+                let convert_func = |v: ByteArray| -> Result<Datum> {
+                    Result::<Datum>::Ok(Datum::new(
+                        ty.clone(),
+                        PrimitiveLiteral::Decimal(i128::from_le_bytes(
+                            v.data().try_into().unwrap(),
+                        )),
+                    ))
+                };
+                update_stat!(self, stat, convert_func);
+            }
+            (
+                PrimitiveType::Decimal {
+                    precision: _,
+                    scale: _,
+                },
+                Statistics::Int32(stat),
+            ) => {
+                let convert_func = |v: i32| {
+                    Result::<Datum>::Ok(Datum::new(
+                        ty.clone(),
+                        PrimitiveLiteral::Decimal(i128::from(v)),
+                    ))
+                };
+                update_stat!(self, stat, convert_func);
+            }
+            (
+                PrimitiveType::Decimal {
+                    precision: _,
+                    scale: _,
+                },
+                Statistics::Int64(stat),
+            ) => {
+                let convert_func = |v: i64| {
+                    Result::<Datum>::Ok(Datum::new(
+                        ty.clone(),
+                        PrimitiveLiteral::Decimal(i128::from(v)),
+                    ))
+                };
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stat)) => {
+                let convert_func = |v: FixedLenByteArray| {
+                    if v.len() != 16 {
+                        return Err(Error::new(
+                            crate::ErrorKind::Unexpected,
+                            "Invalid length of uuid bytes.",
+                        ));
+                    }
+                    Ok(Datum::uuid(Uuid::from_bytes(
+                        v.data()[..16].try_into().unwrap(),
+                    )))
+                };
+                update_stat!(self, stat, convert_func);
+            }
+            (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) 
=> {
+                let convert_func = |v: FixedLenByteArray| {
+                    if v.len() != *len as usize {
+                        return Err(Error::new(
+                            crate::ErrorKind::Unexpected,
+                            "Invalid length of fixed bytes.",
+                        ));
+                    }
+                    Ok(Datum::fixed(v.data().to_vec()))
+                };
+                update_stat!(self, stat, convert_func);
+            }
+            (ty, value) => {
+                return Err(Error::new(
+                    crate::ErrorKind::Unexpected,
+                    format!("Statistics {} is not match with field type {}.", 
value, ty),
+                ))
+            }
+        }
+        Ok(())
+    }
+
+    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
+        (self.lower_bounds, self.upper_bounds)
+    }
 }
 
 impl ParquetWriter {
     fn to_data_file_builder(
-        field_ids: &[i32],
+        schema: SchemaRef,
         metadata: FileMetaData,
         written_size: usize,
         file_path: String,
     ) -> Result<DataFileBuilder> {
-        // Only enter here when the file is not empty.
-        assert!(!metadata.row_groups.is_empty());
-        if field_ids.len() != metadata.row_groups[0].columns.len() {
-            return Err(Error::new(
-                crate::ErrorKind::Unexpected,
-                "Len of field id is not match with len of columns in parquet 
metadata.",
-            ));
-        }
+        let index_by_parquet_path = {
+            let mut visitor = IndexByParquetPathName::default();
+            visit_schema(&schema, &mut visitor)?;
+            visitor.indexes()
+        };
 
-        let (column_sizes, value_counts, null_value_counts) =
-            {
-                let mut per_col_size: HashMap<i32, u64> = HashMap::new();
-                let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
-                let mut per_col_null_val_num: HashMap<i32, u64> = 
HashMap::new();
-                metadata.row_groups.iter().for_each(|group| {
-                    group.columns.iter().zip(field_ids.iter()).for_each(
-                        |(column_chunk, &field_id)| {
-                            if let Some(column_chunk_metadata) = 
&column_chunk.meta_data {
-                                *per_col_size.entry(field_id).or_insert(0) +=
-                                    
column_chunk_metadata.total_compressed_size as u64;
-                                *per_col_val_num.entry(field_id).or_insert(0) 
+=
-                                    column_chunk_metadata.num_values as u64;
-                                
*per_col_null_val_num.entry(field_id).or_insert(0_u64) +=
-                                    column_chunk_metadata
-                                        .statistics
-                                        .as_ref()
-                                        .map(|s| s.null_count)
-                                        .unwrap_or(None)
-                                        .unwrap_or(0) as u64;
-                            }
-                        },
-                    )
-                });
-                (per_col_size, per_col_val_num, per_col_null_val_num)
-            };
+        let (column_sizes, value_counts, null_value_counts, (lower_bounds, 
upper_bounds)) = {
+            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut min_max_agg = MinMaxColAggregator::new(schema);
+
+            for row_group in &metadata.row_groups {
+                for column_chunk in row_group.columns.iter() {
+                    if let Some(column_chunk_metadata) = 
&column_chunk.meta_data {

Review Comment:
   We could use `let Some(column_chunk_metadata) = &column_chunk.meta_data else 
{ continue; };` here instead to remove a level of indentation, as this code is 
already quite deeply indented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to