liurenjie1024 commented on code in PR #295: URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1567468384
########## crates/iceberg/src/arrow/reader.rs: ########## @@ -186,4 +216,399 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); Review Comment: We don't need to do this for every file? ########## crates/iceberg/src/arrow/reader.rs: ########## @@ -186,4 +216,399 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); + let column_indices = collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::<Result<Vec<_>>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut converter = PredicateConverter { + columns: &column_indices, + projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = visit_predicate(&mut converter, predicates)?; + Ok(Some(RowFilter::new(vec![arrow_predicate]))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { Review Comment: ```suggestion fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { ``` I think we don't need `self` parameter? ########## crates/iceberg/src/arrow/reader.rs: ########## @@ -186,4 +216,399 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); + let column_indices = collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::<Result<Vec<_>>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut converter = PredicateConverter { + columns: &column_indices, + projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = visit_predicate(&mut converter, predicates)?; + Ok(Some(RowFilter::new(vec![arrow_predicate]))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column {:?} in schema doesn't have field id", + field_type + ), + )); + } + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; + } + + Ok(column_map) + } +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { + field_ids: Vec<i32>, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { Review Comment: #334 has got merged, do you mind to rewrite this? ########## crates/iceberg/src/arrow/reader.rs: ########## @@ -186,4 +216,399 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); + let column_indices = collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::<Result<Vec<_>>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut converter = PredicateConverter { + columns: &column_indices, + projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = visit_predicate(&mut converter, predicates)?; + Ok(Some(RowFilter::new(vec![arrow_predicate]))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column {:?} in schema doesn't have field id", + field_type + ), + )); + } + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; + } + + Ok(column_map) + } +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { + field_ids: Vec<i32>, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { + type T = (); + type U = (); + + fn and(&mut self, _predicates: Vec<Self::T>) -> Result<Self::T> { + Ok(()) + } + + fn or(&mut self, _predicates: Vec<Self::T>) -> Result<Self::T> { + Ok(()) + } + + fn not(&mut self, _predicate: Self::T) -> Result<Self::T> { + Ok(()) + } + + fn visit_always_true(&mut self) -> Result<Self::T> { + Ok(()) + } + + fn visit_always_false(&mut self) -> Result<Self::T> { + Ok(()) + } + + fn visit_unary(&mut self, predicate: &UnaryExpression<BoundReference>) -> Result<Self::T> { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn visit_binary(&mut self, predicate: &BinaryExpression<BoundReference>) -> Result<Self::T> { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn visit_set(&mut self, predicate: &SetExpression<BoundReference>) -> Result<Self::T> { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn bound_reference(&mut self, reference: &BoundReference) -> Result<Self::T> { + self.field_ids.push(reference.field().id); + Ok(()) + } +} + +/// A visitor to convert Iceberg bound predicates to Arrow predicates. +struct PredicateConverter<'a> { + /// The leaf column indices used in the predicates. + pub columns: &'a Vec<usize>, + /// The projection mask for the Arrow predicates. + pub projection_mask: ProjectionMask, + /// The Parquet schema descriptor. + pub parquet_schema: &'a SchemaDescriptor, + /// The map between field id and leaf column index in Parquet schema. + pub column_map: &'a HashMap<i32, usize>, +} + +fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> { + match datum.literal() { + PrimitiveLiteral::Boolean(value) => Ok(Box::new(BooleanArray::new_scalar(*value))), + PrimitiveLiteral::Int(value) => Ok(Box::new(Int32Array::new_scalar(*value))), + PrimitiveLiteral::Long(value) => Ok(Box::new(Int64Array::new_scalar(*value))), + PrimitiveLiteral::Float(value) => Ok(Box::new(Float32Array::new_scalar(value.as_f32()))), + PrimitiveLiteral::Double(value) => Ok(Box::new(Float64Array::new_scalar(value.as_f64()))), + l => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported literal type: {:?}", l), + )), + } +} + +/// Recursively get the leaf column from the record batch. Assume that the nested columns in +/// struct is projected to a single column. +fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, ArrowError> { + match column.data_type() { + DataType::Struct(fields) => { + if fields.len() != 1 { + return Err(ArrowError::SchemaError( + "Struct column should have only one field after projection" + .parse() + .unwrap(), + )); + } + let struct_array = column.as_any().downcast_ref::<StructArray>().unwrap(); + get_leaf_column(struct_array.column(0)) + } + _ => Ok(column.clone()), + } +} + +impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { Review Comment: Do you mind to rewrite it using new api? ########## crates/iceberg/src/arrow/reader.rs: ########## @@ -186,4 +216,399 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); + let column_indices = collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { Review Comment: This doesn't have to be an error? Missing some field maybe valid due to schema evolution. ########## crates/iceberg/src/arrow/reader.rs: ########## @@ -186,4 +216,399 @@ impl ArrowReader { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } + + fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> Result<Option<RowFilter>> { + if let Some(predicates) = &self.predicates { + let field_id_map = self.build_field_id_map(parquet_schema)?; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { field_ids: vec![] }; + visit_predicate(&mut collector, predicates).unwrap(); + let column_indices = collector + .field_ids + .iter() + .map(|field_id| { + field_id_map.get(field_id).cloned().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Field id not found in schema") + }) + }) + .collect::<Result<Vec<_>>>()?; + + // Convert BoundPredicates to ArrowPredicates + let mut converter = PredicateConverter { + columns: &column_indices, + projection_mask: ProjectionMask::leaves(parquet_schema, column_indices.clone()), + parquet_schema, + column_map: &field_id_map, + }; + let arrow_predicate = visit_predicate(&mut converter, predicates)?; + Ok(Some(RowFilter::new(vec![arrow_predicate]))) + } else { + Ok(None) + } + } + + /// Build the map of field id to Parquet column index in the schema. + fn build_field_id_map(&self, parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> { + let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { + let field_type = field.self_type(); + match field_type { + ParquetType::PrimitiveType { basic_info, .. } => { + if !basic_info.has_id() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column {:?} in schema doesn't have field id", + field_type + ), + )); + } + column_map.insert(basic_info.id(), idx); + } + ParquetType::GroupType { .. } => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column in schema should be primitive type but got {:?}", + field_type + ), + )); + } + }; + } + + Ok(column_map) + } +} + +/// A visitor to collect field ids from bound predicates. +struct CollectFieldIdVisitor { + field_ids: Vec<i32>, +} + +impl BoundPredicateVisitor for CollectFieldIdVisitor { + type T = (); + type U = (); + + fn and(&mut self, _predicates: Vec<Self::T>) -> Result<Self::T> { + Ok(()) + } + + fn or(&mut self, _predicates: Vec<Self::T>) -> Result<Self::T> { + Ok(()) + } + + fn not(&mut self, _predicate: Self::T) -> Result<Self::T> { + Ok(()) + } + + fn visit_always_true(&mut self) -> Result<Self::T> { + Ok(()) + } + + fn visit_always_false(&mut self) -> Result<Self::T> { + Ok(()) + } + + fn visit_unary(&mut self, predicate: &UnaryExpression<BoundReference>) -> Result<Self::T> { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn visit_binary(&mut self, predicate: &BinaryExpression<BoundReference>) -> Result<Self::T> { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn visit_set(&mut self, predicate: &SetExpression<BoundReference>) -> Result<Self::T> { + self.bound_reference(predicate.term())?; + Ok(()) + } + + fn bound_reference(&mut self, reference: &BoundReference) -> Result<Self::T> { + self.field_ids.push(reference.field().id); + Ok(()) + } +} + +/// A visitor to convert Iceberg bound predicates to Arrow predicates. +struct PredicateConverter<'a> { + /// The leaf column indices used in the predicates. + pub columns: &'a Vec<usize>, + /// The projection mask for the Arrow predicates. + pub projection_mask: ProjectionMask, + /// The Parquet schema descriptor. + pub parquet_schema: &'a SchemaDescriptor, + /// The map between field id and leaf column index in Parquet schema. + pub column_map: &'a HashMap<i32, usize>, +} + +fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> { + match datum.literal() { + PrimitiveLiteral::Boolean(value) => Ok(Box::new(BooleanArray::new_scalar(*value))), + PrimitiveLiteral::Int(value) => Ok(Box::new(Int32Array::new_scalar(*value))), + PrimitiveLiteral::Long(value) => Ok(Box::new(Int64Array::new_scalar(*value))), + PrimitiveLiteral::Float(value) => Ok(Box::new(Float32Array::new_scalar(value.as_f32()))), + PrimitiveLiteral::Double(value) => Ok(Box::new(Float64Array::new_scalar(value.as_f64()))), + l => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported literal type: {:?}", l), + )), + } +} + +/// Recursively get the leaf column from the record batch. Assume that the nested columns in +/// struct is projected to a single column. +fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, ArrowError> { + match column.data_type() { + DataType::Struct(fields) => { + if fields.len() != 1 { + return Err(ArrowError::SchemaError( + "Struct column should have only one field after projection" + .parse() + .unwrap(), + )); + } + let struct_array = column.as_any().downcast_ref::<StructArray>().unwrap(); + get_leaf_column(struct_array.column(0)) + } + _ => Ok(column.clone()), Review Comment: I also think it doesn't support it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org