sdd commented on code in PR #383: URL: https://github.com/apache/iceberg-rust/pull/383#discussion_r1630721046
########## crates/iceberg/src/writer/file_writer/parquet_writer.rs: ########## @@ -78,106 +89,407 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr type R = ParquetWriter; async fn build(self) -> crate::Result<Self::R> { - // Fetch field id from schema - let field_ids = self - .schema - .fields() - .iter() - .map(|field| { - field - .metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .ok_or_else(|| { - Error::new( - crate::ErrorKind::Unexpected, - "Field id not found in arrow schema metadata.", - ) - })? - .parse::<i32>() - .map_err(|err| { - Error::new(crate::ErrorKind::Unexpected, "Failed to parse field id.") - .with_source(err) - }) - }) - .collect::<crate::Result<Vec<_>>>()?; - + let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); let written_size = Arc::new(AtomicI64::new(0)); let out_file = self.file_io.new_output( self.location_generator .generate_location(&self.file_name_generator.generate_file_name()), )?; let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); let async_writer = AsyncFileWriter::new(inner_writer); - let writer = AsyncArrowWriter::try_new(async_writer, self.schema.clone(), Some(self.props)) - .map_err(|err| { - Error::new( - crate::ErrorKind::Unexpected, - "Failed to build parquet writer.", - ) - .with_source(err) - })?; + let writer = + AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), Some(self.props)) + .map_err(|err| { + Error::new( + crate::ErrorKind::Unexpected, + "Failed to build parquet writer.", + ) + .with_source(err) + })?; Ok(ParquetWriter { + schema: self.schema.clone(), writer, written_size, current_row_num: 0, out_file, - field_ids, }) } } +#[derive(Default)] +struct IndexByParquetPathName { + name_to_id: HashMap<String, i32>, + + field_names: Vec<String>, + + field_id: i32, +} + +impl IndexByParquetPathName { + pub fn indexes(self) -> HashMap<String, i32> { + self.name_to_id + } +} + +impl SchemaVisitor for IndexByParquetPathName { + type T = (); + + fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.push(field.name.to_string()); + self.field_id = field.id; + Ok(()) + } + + fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + Ok(()) + } + + fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names.push(format!("list.{}", field.name)); + self.field_id = field.id; + Ok(()) + } + + fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + Ok(()) + } + + fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names + .push(format!("{DEFAULT_MAP_FIELD_NAME}.key")); + self.field_id = field.id; + Ok(()) + } + + fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + Ok(()) + } + + fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_names + .push(format!("{DEFAULT_MAP_FIELD_NAME}.value")); + self.field_id = field.id; + Ok(()) + } + + fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_names.pop(); + Ok(()) + } + + fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> { + Ok(()) + } + + fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> { + Ok(()) + } + + fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> { + Ok(()) + } + + fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> { + Ok(()) + } + + fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> { + Ok(()) + } + + fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> { + let full_name = self.field_names.iter().map(String::as_str).join("."); + let field_id = self.field_id; + if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { + return Err(Error::new(ErrorKind::DataInvalid, format!("Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"))); + } else { + self.name_to_id.insert(full_name, field_id); + } + + Ok(()) + } +} + /// `ParquetWriter`` is used to write arrow data into parquet file on storage. pub struct ParquetWriter { + schema: SchemaRef, out_file: OutputFile, writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>, written_size: Arc<AtomicI64>, current_row_num: usize, - field_ids: Vec<i32>, +} + +/// Used to aggregate min and max value of each column. +struct MinMaxColAggregator { + lower_bounds: HashMap<i32, Datum>, + upper_bounds: HashMap<i32, Datum>, + schema: SchemaRef, +} + +impl MinMaxColAggregator { + fn new(schema: SchemaRef) -> Self { + Self { + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + schema, + } + } + + fn update(&mut self, col_id: i32, value: Statistics) -> Result<()> { + let crate::spec::Type::Primitive(ty) = &self Review Comment: Is there a reason why references to this are always fully-qualified, ie `crate::spec::Type::Primitive`, rather than having a `use crate::spec::Type::Primitive` statement at the top of the file and writing `let Primitive(ty) = &self`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org