sdd commented on code in PR #367:
URL: https://github.com/apache/iceberg-rust/pull/367#discussion_r1596298705


##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -98,103 +101,249 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         Ok(self.field_summary_for_reference(reference).contains_null)
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_null(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && reference.field().field_type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = field.contains_nan.is_some_and(|x| x == false);
+        }
+
+        Ok(!all_null)
     }
 
     fn is_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            Ok(contains_nan)
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if field.contains_nan.is_some_and(|x| x == true)
+            && !field.contains_null
+            && field.lower_bound.is_none()
+        {
+            ROWS_CANNOT_MATCH
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
     fn less_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound < &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn less_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound <= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound > &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound >= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        Ok(self.less_than_or_eq(reference, datum, _predicate)?
+            && self.greater_than_or_eq(reference, datum, _predicate)?)
     }
 
     fn not_eq(
         &mut self,
-        reference: &BoundReference,
-        literal: &Datum,
+        _reference: &BoundReference,
+        _datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        ROWS_MIGHT_MATCH
     }
 
     fn starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        if field.lower_bound.is_none() || field.upper_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot perform starts_with on non-string value",
+            ));
+        };
+
+        let prefix_len = prefix.chars().count();
+
+        if let Some(lower_bound) = &field.lower_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string lower bound",
+                ));
+            };
+
+            let min_len = lower_bound.chars().count().min(prefix_len);
+            let truncated_lower_bound = String::from(&lower_bound[..min_len]);
+            if prefix < &truncated_lower_bound {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if let Some(upper_bound) = &field.upper_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(upper_bound)) = 
upper_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string upper bound",
+                ));
+            };
+
+            let min_len = upper_bound.chars().count().min(prefix_len);
+            let truncated_upper_bound = String::from(&upper_bound[..min_len]);
+            if prefix > &truncated_upper_bound {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn not_starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        if field.contains_null || field.lower_bound.is_none() || 
field.upper_bound.is_none() {
+            return ROWS_MIGHT_MATCH;
+        }
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot perform not_starts_with on non-string value",
+            ));
+        };
+
+        let prefix_len = prefix.chars().count();
+
+        // not_starts_with will match unless all values must start with the 
prefix. This happens when
+        // the lower and upper bounds both start with the prefix.
+        if let Some(lower_bound) = &field.lower_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform not_starts_with on non-string lower bound",
+                ));
+            };
+
+            // if lower is shorter than the prefix then lower doesn't start 
with the prefix
+            if prefix_len > lower_bound.chars().count() {
+                return ROWS_MIGHT_MATCH;
+            }
+
+            let truncated_lower_bound = 
String::from(&lower_bound[..prefix_len]);
+            if prefix == &truncated_lower_bound {
+                if let Some(upper_bound) = &field.upper_bound {
+                    let 
Literal::Primitive(PrimitiveLiteral::String(upper_bound)) = upper_bound
+                    else {
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            "Cannot perform not_starts_with on non-string 
upper bound",
+                        ));
+                    };
+
+                    // if upper is shorter than the prefix then upper can't 
start with the prefix
+                    if prefix_len > upper_bound.chars().count() {
+                        return ROWS_MIGHT_MATCH;
+                    }
+
+                    let truncated_upper_bound = 
String::from(&upper_bound[..prefix_len]);

Review Comment:
   You can't safely use `prefix_len` to index into `upper_bound` as there is no 
guarantee that the index will fall on a valid UTF8 character boundary, and if 
it doesn't then Rust will panic.
   
   See here for an example: 
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=1e4950cb298bd87560be74fa48576184
   
   You need to do this instead:
   
   ```rust
   let truncated_upper_bound = 
upper_bound.chars().take(prefix_len).collect::<String>();
   ```
   
   (See here for working example: 
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=86969f315ef1bdc4a0863bde9356c1f4)



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -98,103 +101,249 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         Ok(self.field_summary_for_reference(reference).contains_null)
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_null(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && reference.field().field_type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = field.contains_nan.is_some_and(|x| x == false);
+        }
+
+        Ok(!all_null)
     }
 
     fn is_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            Ok(contains_nan)
+        } else {
+            ROWS_MIGHT_MATCH

Review Comment:
   The Java implementation also tests if all values are null at this point - 
see here: 
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java#L151
   
   You could extract [some of the code in 
not_null](https://github.com/apache/iceberg-rust/pull/367/files?diff=unified&w=0#diff-f3185efcdea2022bdedc017f7859727f2e44ac0c004f08ad6a96fdfa5fa6f69aR112-R120)
 into a shared method, `contains_nulls_only`, and call that here.
   



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -203,16 +352,42 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
         literals: &FnvHashSet<Datum>,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if field.lower_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        if literals.len() > IN_PREDICATE_LIMIT {
+            return ROWS_MIGHT_MATCH;
+        }
+
+        if literals
+            .iter()
+            .all(|datum| field.lower_bound.as_ref().unwrap() > 
&Literal::from(datum.clone()))

Review Comment:
   We should not have any `unwrap`s in here. We don't want to panic if we 
encounter an unexpected situation, we should return an `Err` instead.
   
   Also, you're cloning `datum` and creating a `Literal` every time you go 
through the loop here.
   
   I'd suggest something more like this:
   
   ```rust
   if let Some(lower_bound) = &field_summary.lower_bound {
       let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
           return Err(Error::new(
               ErrorKind::Unexpected,
               "Cannot use StartsWith operator on non-string lower_bound value",
           ));
       };
   
       let lower_bound_lit = PrimitiveLiteral::String(lower_bound.to_string());
   
       let all_literals_less_than_lower_bound = literals
           .iter()
           .all(|lit| lit.literal().lt(&lower_bound_lit));
   
       if all_literals_less_than_lower_bound {
           return ROWS_CANT_MATCH;
       }
   }
   ```



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -98,103 +101,249 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         Ok(self.field_summary_for_reference(reference).contains_null)
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_null(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && reference.field().field_type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = field.contains_nan.is_some_and(|x| x == false);
+        }
+
+        Ok(!all_null)
     }
 
     fn is_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            Ok(contains_nan)
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if field.contains_nan.is_some_and(|x| x == true)
+            && !field.contains_null
+            && field.lower_bound.is_none()
+        {
+            ROWS_CANNOT_MATCH
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
     fn less_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound < &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn less_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound <= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound > &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound >= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        Ok(self.less_than_or_eq(reference, datum, _predicate)?
+            && self.greater_than_or_eq(reference, datum, _predicate)?)
     }
 
     fn not_eq(
         &mut self,
-        reference: &BoundReference,
-        literal: &Datum,
+        _reference: &BoundReference,
+        _datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        ROWS_MIGHT_MATCH
     }
 
     fn starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        if field.lower_bound.is_none() || field.upper_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot perform starts_with on non-string value",
+            ));
+        };
+
+        let prefix_len = prefix.chars().count();
+
+        if let Some(lower_bound) = &field.lower_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string lower bound",
+                ));
+            };
+
+            let min_len = lower_bound.chars().count().min(prefix_len);
+            let truncated_lower_bound = String::from(&lower_bound[..min_len]);
+            if prefix < &truncated_lower_bound {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if let Some(upper_bound) = &field.upper_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(upper_bound)) = 
upper_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string upper bound",
+                ));
+            };
+
+            let min_len = upper_bound.chars().count().min(prefix_len);
+            let truncated_upper_bound = String::from(&upper_bound[..min_len]);

Review Comment:
   String indexing



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -98,103 +101,249 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         Ok(self.field_summary_for_reference(reference).contains_null)
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_null(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && reference.field().field_type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = field.contains_nan.is_some_and(|x| x == false);
+        }
+
+        Ok(!all_null)
     }
 
     fn is_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            Ok(contains_nan)
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if field.contains_nan.is_some_and(|x| x == true)
+            && !field.contains_null
+            && field.lower_bound.is_none()
+        {
+            ROWS_CANNOT_MATCH
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
     fn less_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound < &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn less_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound <= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound > &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound >= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        Ok(self.less_than_or_eq(reference, datum, _predicate)?
+            && self.greater_than_or_eq(reference, datum, _predicate)?)
     }
 
     fn not_eq(
         &mut self,
-        reference: &BoundReference,
-        literal: &Datum,
+        _reference: &BoundReference,
+        _datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        ROWS_MIGHT_MATCH
     }
 
     fn starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        if field.lower_bound.is_none() || field.upper_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot perform starts_with on non-string value",
+            ));
+        };
+
+        let prefix_len = prefix.chars().count();
+
+        if let Some(lower_bound) = &field.lower_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string lower bound",
+                ));
+            };
+
+            let min_len = lower_bound.chars().count().min(prefix_len);
+            let truncated_lower_bound = String::from(&lower_bound[..min_len]);

Review Comment:
   as with earlier comment, indexing into strings will not work in many 
situations.



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -98,103 +101,249 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         Ok(self.field_summary_for_reference(reference).contains_null)
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_null(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && reference.field().field_type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = field.contains_nan.is_some_and(|x| x == false);
+        }
+
+        Ok(!all_null)
     }
 
     fn is_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            Ok(contains_nan)
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if field.contains_nan.is_some_and(|x| x == true)
+            && !field.contains_null
+            && field.lower_bound.is_none()
+        {
+            ROWS_CANNOT_MATCH
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
     fn less_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound < &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn less_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound <= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound > &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound >= &Literal::from(datum.clone()))

Review Comment:
   You can avoid the clone here, and in the other inequality methods:
   
   ```rust
   if let Some(Literal::Primitive(upper_bound)) = &field.upper_bound {
       Ok(upper_bound >= datum.literal())
   } else {
       ROWS_CANNOT_MATCH
   }
   ```



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -98,103 +101,249 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         Ok(self.field_summary_for_reference(reference).contains_null)
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_null(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && reference.field().field_type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = field.contains_nan.is_some_and(|x| x == false);
+        }
+
+        Ok(!all_null)
     }
 
     fn is_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            Ok(contains_nan)
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
+    #[allow(clippy::bool_comparison)]
     fn not_nan(
         &mut self,
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if field.contains_nan.is_some_and(|x| x == true)
+            && !field.contains_null
+            && field.lower_bound.is_none()
+        {
+            ROWS_CANNOT_MATCH
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
     fn less_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound < &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn less_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(lower_bound) = &field.lower_bound {
+            Ok(lower_bound <= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound > &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn greater_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+        if let Some(upper_bound) = &field.upper_bound {
+            Ok(upper_bound >= &Literal::from(datum.clone()))
+        } else {
+            ROWS_CANNOT_MATCH
+        }
     }
 
     fn eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        Ok(self.less_than_or_eq(reference, datum, _predicate)?
+            && self.greater_than_or_eq(reference, datum, _predicate)?)
     }
 
     fn not_eq(
         &mut self,
-        reference: &BoundReference,
-        literal: &Datum,
+        _reference: &BoundReference,
+        _datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        ROWS_MIGHT_MATCH
     }
 
     fn starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        if field.lower_bound.is_none() || field.upper_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot perform starts_with on non-string value",
+            ));
+        };
+
+        let prefix_len = prefix.chars().count();
+
+        if let Some(lower_bound) = &field.lower_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string lower bound",
+                ));
+            };
+
+            let min_len = lower_bound.chars().count().min(prefix_len);
+            let truncated_lower_bound = String::from(&lower_bound[..min_len]);
+            if prefix < &truncated_lower_bound {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if let Some(upper_bound) = &field.upper_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(upper_bound)) = 
upper_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform starts_with on non-string upper bound",
+                ));
+            };
+
+            let min_len = upper_bound.chars().count().min(prefix_len);
+            let truncated_upper_bound = String::from(&upper_bound[..min_len]);
+            if prefix > &truncated_upper_bound {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn not_starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field: &FieldSummary = self.field_summary_for_reference(reference);
+
+        if field.contains_null || field.lower_bound.is_none() || 
field.upper_bound.is_none() {
+            return ROWS_MIGHT_MATCH;
+        }
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot perform not_starts_with on non-string value",
+            ));
+        };
+
+        let prefix_len = prefix.chars().count();
+
+        // not_starts_with will match unless all values must start with the 
prefix. This happens when
+        // the lower and upper bounds both start with the prefix.
+        if let Some(lower_bound) = &field.lower_bound {
+            let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = 
lower_bound else {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Cannot perform not_starts_with on non-string lower bound",
+                ));
+            };
+
+            // if lower is shorter than the prefix then lower doesn't start 
with the prefix
+            if prefix_len > lower_bound.chars().count() {
+                return ROWS_MIGHT_MATCH;
+            }
+
+            let truncated_lower_bound = 
String::from(&lower_bound[..prefix_len]);

Review Comment:
   String indexing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to