liurenjie1024 commented on code in PR #295:
URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1583008556


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +219,634 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            // Collect Parquet column indices from field ids.
+            // If the field id is not found in Parquet schema, it will be 
ignored due to schema evolution.
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| field_id_map.get(field_id).cloned())
+                .flatten()
+                .collect::<Vec<_>>();
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column idx: {}, name: {}, type {:?} in 
schema doesn't have field id",
+                            idx,
+                            basic_info.name(),
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: HashSet<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn is_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch. Return None 
if the field id
+    /// is not found in the column map, which is possible due to schema 
evolution.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Option<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = self.column_map.get(&reference.field().id)?;
+
+        Some(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![*column_idx],
+        ))
+    }
+
+    /// Build an Arrow predicate that always returns true.
+    fn build_always_true(&self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),
+        )))
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        self.build_always_true()
+    }
+
+    fn always_false(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])),
+        )))
+    }
+
+    fn and(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                and(&left, &right)
+            },
+        )))
+    }
+
+    fn or(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                or(&left, &right)
+            },
+        )))
+    }
+
+    fn not(&mut self, mut inner: Box<dyn ArrowPredicate>) -> Result<Box<dyn 
ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let pred_ret = inner.evaluate(batch)?;
+                not(&pred_ret)
+            },
+        )))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_not_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()

Review Comment:
   When a column is missing, I think we should treat it as null, so this should 
be false?



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +219,634 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            // Collect Parquet column indices from field ids.
+            // If the field id is not found in Parquet schema, it will be 
ignored due to schema evolution.
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| field_id_map.get(field_id).cloned())
+                .flatten()
+                .collect::<Vec<_>>();
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column idx: {}, name: {}, type {:?} in 
schema doesn't have field id",
+                            idx,
+                            basic_info.name(),
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: HashSet<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn is_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch. Return None 
if the field id
+    /// is not found in the column map, which is possible due to schema 
evolution.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Option<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = self.column_map.get(&reference.field().id)?;
+
+        Some(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![*column_idx],
+        ))
+    }
+
+    /// Build an Arrow predicate that always returns true.
+    fn build_always_true(&self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),
+        )))
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        self.build_always_true()
+    }
+
+    fn always_false(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])),
+        )))
+    }
+
+    fn and(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                and(&left, &right)
+            },
+        )))
+    }
+
+    fn or(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                or(&left, &right)
+            },
+        )))
+    }
+
+    fn not(&mut self, mut inner: Box<dyn ArrowPredicate>) -> Result<Box<dyn 
ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let pred_ret = inner.evaluate(batch)?;
+                not(&pred_ret)
+            },
+        )))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_not_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    lt(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    lt_eq(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    gt(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    gt_eq(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()

Review Comment:
   As above.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +219,634 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            // Collect Parquet column indices from field ids.
+            // If the field id is not found in Parquet schema, it will be 
ignored due to schema evolution.
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| field_id_map.get(field_id).cloned())
+                .flatten()
+                .collect::<Vec<_>>();
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column idx: {}, name: {}, type {:?} in 
schema doesn't have field id",
+                            idx,
+                            basic_info.name(),
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: HashSet<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn is_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch. Return None 
if the field id
+    /// is not found in the column map, which is possible due to schema 
evolution.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Option<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = self.column_map.get(&reference.field().id)?;
+
+        Some(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![*column_idx],
+        ))
+    }
+
+    /// Build an Arrow predicate that always returns true.
+    fn build_always_true(&self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),
+        )))
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        self.build_always_true()
+    }
+
+    fn always_false(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])),
+        )))
+    }
+
+    fn and(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                and(&left, &right)
+            },
+        )))
+    }
+
+    fn or(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                or(&left, &right)
+            },
+        )))
+    }
+
+    fn not(&mut self, mut inner: Box<dyn ArrowPredicate>) -> Result<Box<dyn 
ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let pred_ret = inner.evaluate(batch)?;
+                not(&pred_ret)
+            },
+        )))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_not_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    lt(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    lt_eq(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    gt(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    gt_eq(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    eq(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()

Review Comment:
   Ditto, should we return false here?



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +219,634 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            // Collect Parquet column indices from field ids.
+            // If the field id is not found in Parquet schema, it will be 
ignored due to schema evolution.
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| field_id_map.get(field_id).cloned())
+                .flatten()
+                .collect::<Vec<_>>();
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column idx: {}, name: {}, type {:?} in 
schema doesn't have field id",
+                            idx,
+                            basic_info.name(),
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: HashSet<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn is_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch. Return None 
if the field id
+    /// is not found in the column map, which is possible due to schema 
evolution.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Option<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = self.column_map.get(&reference.field().id)?;
+
+        Some(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![*column_idx],
+        ))
+    }
+
+    /// Build an Arrow predicate that always returns true.
+    fn build_always_true(&self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),
+        )))
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        self.build_always_true()
+    }
+
+    fn always_false(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])),
+        )))
+    }
+
+    fn and(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                and(&left, &right)
+            },
+        )))
+    }
+
+    fn or(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                or(&left, &right)
+            },
+        )))
+    }
+
+    fn not(&mut self, mut inner: Box<dyn ArrowPredicate>) -> Result<Box<dyn 
ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let pred_ret = inner.evaluate(batch)?;
+                not(&pred_ret)
+            },
+        )))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_not_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    lt(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    lt_eq(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let left = get_leaf_column(batch.column(0))?;
+                    gt(&left, literal.as_ref())
+                },
+            )))
+        } else {
+            self.build_always_true()

Review Comment:
   Java impl comparator uses null first: 
https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/api/src/main/java/org/apache/iceberg/expressions/Literals.java#L616
   
   I think we should return false here?



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +219,634 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            // Collect Parquet column indices from field ids.
+            // If the field id is not found in Parquet schema, it will be 
ignored due to schema evolution.
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| field_id_map.get(field_id).cloned())
+                .flatten()
+                .collect::<Vec<_>>();
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column idx: {}, name: {}, type {:?} in 
schema doesn't have field id",
+                            idx,
+                            basic_info.name(),
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: HashSet<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: ()) -> Result<()> {
+        Ok(())
+    }
+
+    fn is_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<()> {
+        self.field_ids.insert(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch. Return None 
if the field id
+    /// is not found in the column map, which is possible due to schema 
evolution.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Option<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = self.column_map.get(&reference.field().id)?;
+
+        Some(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![*column_idx],
+        ))
+    }
+
+    /// Build an Arrow predicate that always returns true.
+    fn build_always_true(&self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),
+        )))
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        self.build_always_true()
+    }
+
+    fn always_false(&mut self) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])),
+        )))
+    }
+
+    fn and(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                and(&left, &right)
+            },
+        )))
+    }
+
+    fn or(
+        &mut self,
+        mut lhs: Box<dyn ArrowPredicate>,
+        mut rhs: Box<dyn ArrowPredicate>,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                or(&left, &right)
+            },
+        )))
+    }
+
+    fn not(&mut self, mut inner: Box<dyn ArrowPredicate>) -> Result<Box<dyn 
ArrowPredicate>> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let pred_ret = inner.evaluate(batch)?;
+                not(&pred_ret)
+            },
+        )))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(
+                projected_mask,
+                move |batch| {
+                    let column = get_leaf_column(batch.column(0))?;
+                    is_not_null(&column)
+                },
+            )))
+        } else {
+            self.build_always_true()
+        }
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Box<dyn ArrowPredicate>> {
+        if let Some(projected_mask) = self.bound_reference(reference) {
+            Ok(Box::new(ArrowPredicateFn::new(projected_mask, |batch| {
+                Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+            })))
+        } else {
+            self.build_always_true()

Review Comment:
   Iceberg spec has no definition for `NULL is_nan`, but  java defines it as 
false: 
https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/api/src/main/java/org/apache/iceberg/util/NaNUtil.java#L25



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to