viirya commented on code in PR #295: URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1548810072
########## crates/iceberg/src/arrow.rs: ########## @@ -113,6 +143,405 @@ impl ArrowReader { // TODO: full implementation ProjectionMask::all() } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let column_indices = predicates + .iter() + .map(|predicate| { + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + collector.visit_predicate(predicate).unwrap(); + collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::<Result<Vec<_>>>() + }) + .collect::<Result<Vec<_>>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut arrow_predicates = vec![]; + for (predicate, columns) in predicates.iter().zip(column_indices.iter()) { + let mut converter = PredicateConverter { + columns, + projection_mask: ProjectionMask::leaves(parquet_schema, columns.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = converter.visit_predicate(predicate)?; + arrow_predicates.push(arrow_predicate); + } + Ok(Some(RowFilter::new(arrow_predicates))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column {:?} in schema doesn't have field id", + field_type + ), + )); + } + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; + } + + Ok(column_map) + } +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { Review Comment: OKay ########## crates/iceberg/src/arrow.rs: ########## @@ -113,6 +143,405 @@ impl ArrowReader { // TODO: full implementation ProjectionMask::all() } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let column_indices = predicates + .iter() + .map(|predicate| { + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + collector.visit_predicate(predicate).unwrap(); + collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::<Result<Vec<_>>>() + }) + .collect::<Result<Vec<_>>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut arrow_predicates = vec![]; + for (predicate, columns) in predicates.iter().zip(column_indices.iter()) { + let mut converter = PredicateConverter { + columns, + projection_mask: ProjectionMask::leaves(parquet_schema, columns.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = converter.visit_predicate(predicate)?; + arrow_predicates.push(arrow_predicate); + } + Ok(Some(RowFilter::new(arrow_predicates))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column {:?} in schema doesn't have field id", + field_type + ), + )); + } + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; + } + + Ok(column_map) + } +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { Review Comment: Okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org