sdd commented on code in PR #383:
URL: https://github.com/apache/iceberg-rust/pull/383#discussion_r1630738655


##########
crates/iceberg/src/writer/file_writer/parquet_writer.rs:
##########
@@ -78,106 +89,407 @@ impl<T: LocationGenerator, F: FileNameGenerator> 
FileWriterBuilder for ParquetWr
     type R = ParquetWriter;
 
     async fn build(self) -> crate::Result<Self::R> {
-        // Fetch field id from schema
-        let field_ids = self
-            .schema
-            .fields()
-            .iter()
-            .map(|field| {
-                field
-                    .metadata()
-                    .get(PARQUET_FIELD_ID_META_KEY)
-                    .ok_or_else(|| {
-                        Error::new(
-                            crate::ErrorKind::Unexpected,
-                            "Field id not found in arrow schema metadata.",
-                        )
-                    })?
-                    .parse::<i32>()
-                    .map_err(|err| {
-                        Error::new(crate::ErrorKind::Unexpected, "Failed to 
parse field id.")
-                            .with_source(err)
-                    })
-            })
-            .collect::<crate::Result<Vec<_>>>()?;
-
+        let arrow_schema: ArrowSchemaRef = 
Arc::new(self.schema.as_ref().try_into()?);
         let written_size = Arc::new(AtomicI64::new(0));
         let out_file = self.file_io.new_output(
             self.location_generator
                 
.generate_location(&self.file_name_generator.generate_file_name()),
         )?;
         let inner_writer = TrackWriter::new(out_file.writer().await?, 
written_size.clone());
         let async_writer = AsyncFileWriter::new(inner_writer);
-        let writer = AsyncArrowWriter::try_new(async_writer, 
self.schema.clone(), Some(self.props))
-            .map_err(|err| {
-                Error::new(
-                    crate::ErrorKind::Unexpected,
-                    "Failed to build parquet writer.",
-                )
-                .with_source(err)
-            })?;
+        let writer =
+            AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), 
Some(self.props))
+                .map_err(|err| {
+                    Error::new(
+                        crate::ErrorKind::Unexpected,
+                        "Failed to build parquet writer.",
+                    )
+                    .with_source(err)
+                })?;
 
         Ok(ParquetWriter {
+            schema: self.schema.clone(),
             writer,
             written_size,
             current_row_num: 0,
             out_file,
-            field_ids,
         })
     }
 }
 
+#[derive(Default)]
+struct IndexByParquetPathName {
+    name_to_id: HashMap<String, i32>,
+
+    field_names: Vec<String>,
+
+    field_id: i32,
+}
+
+impl IndexByParquetPathName {
+    pub fn indexes(self) -> HashMap<String, i32> {
+        self.name_to_id
+    }
+}
+
+impl SchemaVisitor for IndexByParquetPathName {
+    type T = ();
+
+    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names.push(field.name.to_string());
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names.push(format!("list.{}", field.name));
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names
+            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
+        self.field_names
+            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
+        self.field_id = field.id;
+        Ok(())
+    }
+
+    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
+        self.field_names.pop();
+        Ok(())
+    }
+
+    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> 
{
+        Ok(())
+    }
+
+    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> 
Result<Self::T> {
+        Ok(())
+    }
+
+    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> 
Result<Self::T> {
+        Ok(())
+    }
+
+    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> 
Result<Self::T> {
+        Ok(())
+    }
+
+    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
+        let full_name = self.field_names.iter().map(String::as_str).join(".");
+        let field_id = self.field_id;
+        if let Some(existing_field_id) = 
self.name_to_id.get(full_name.as_str()) {
+            return Err(Error::new(ErrorKind::DataInvalid, format!("Invalid 
schema: multiple fields for name {full_name}: {field_id} and 
{existing_field_id}")));
+        } else {
+            self.name_to_id.insert(full_name, field_id);
+        }
+
+        Ok(())
+    }
+}
+
 /// `ParquetWriter`` is used to write arrow data into parquet file on storage.
 pub struct ParquetWriter {
+    schema: SchemaRef,
     out_file: OutputFile,
     writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
     written_size: Arc<AtomicI64>,
     current_row_num: usize,
-    field_ids: Vec<i32>,
+}
+
+/// Used to aggregate min and max value of each column.
+struct MinMaxColAggregator {
+    lower_bounds: HashMap<i32, Datum>,
+    upper_bounds: HashMap<i32, Datum>,
+    schema: SchemaRef,
+}
+
+impl MinMaxColAggregator {
+    fn new(schema: SchemaRef) -> Self {
+        Self {
+            lower_bounds: HashMap::new(),
+            upper_bounds: HashMap::new(),
+            schema,
+        }
+    }
+
+    fn update(&mut self, col_id: i32, value: Statistics) -> Result<()> {
+        let crate::spec::Type::Primitive(ty) = &self
+            .schema
+            .field_by_id(col_id)
+            .ok_or_else(|| {
+                Error::new(
+                    crate::ErrorKind::Unexpected,
+                    "Failed to get field by id in schema.",
+                )
+            })?
+            .field_type
+            .as_ref()
+        else {
+            return Err(Error::new(
+                crate::ErrorKind::Unexpected,
+                "Composed type is not supported for min max aggregation.",
+            ));
+        };
+
+        macro_rules! update_stat {
+            ($self:ident, $stat:ident, $convert_func:expr) => {
+                if $stat.min_is_exact() {
+                    let val = $convert_func($stat.min().clone())?;
+                    match $self.lower_bounds.entry(col_id) {

Review Comment:
   Is there no way to do something similar this instead, which would be a bit 
neater? I've not tried it, so I may be wrong 😅 
   
   ```rust
   $self.lower_bounds.entry(col_id)
       .and_modify(|e| *e.min(&val))
       .or_insert(val);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to