mapleFU commented on code in PR #177:
URL: https://github.com/apache/iceberg-cpp/pull/177#discussion_r2296122034


##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,

Review Comment:
   Would a `getType -> OperationType` helper better in this case rather than 
`BASE::op() == Expression::Operation::kIn || BASE::op() == 
Expression::Operation::kNotIn`?



##########
src/iceberg/expression/expressions.cc:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/expressions.h"
+
+#include "iceberg/exception.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+// Logical operations
+
+std::shared_ptr<Expression> Expressions::And(std::shared_ptr<Expression> left,
+                                             std::shared_ptr<Expression> 
right) {
+  if (left->op() == Expression::Operation::kFalse ||
+      right->op() == Expression::Operation::kFalse) {
+    return AlwaysFalse();
+  }
+
+  if (left->op() == Expression::Operation::kTrue) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kTrue) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::And>(std::move(left), std::move(right));
+}
+
+std::shared_ptr<Expression> Expressions::Or(std::shared_ptr<Expression> left,
+                                            std::shared_ptr<Expression> right) 
{
+  if (left->op() == Expression::Operation::kTrue ||
+      right->op() == Expression::Operation::kTrue) {
+    return AlwaysTrue();
+  }
+
+  if (left->op() == Expression::Operation::kFalse) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kFalse) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::Or>(std::move(left), std::move(right));
+}
+
+// Transform functions

Review Comment:
   Can these be member of transform? A root expression might be too flexible?



##########
src/iceberg/expression/term.cc:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/term.h"
+
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/transform.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Bound::~Bound() = default;
+
+BoundTerm::~BoundTerm() = default;
+
+Reference::~Reference() = default;
+
+template <typename B>
+Result<std::shared_ptr<B>> Unbound<B>::Bind(const Schema& schema) const {
+  return Bind(schema, /*case_sensitive=*/true);
+}
+
+// NamedReference implementation
+NamedReference::NamedReference(std::string field_name)
+    : field_name_(std::move(field_name)) {}
+
+NamedReference::~NamedReference() = default;
+
+Result<std::shared_ptr<BoundReference>> NamedReference::Bind(const Schema& 
schema,
+                                                             bool 
case_sensitive) const {
+  if (!case_sensitive) {
+    return NotImplemented("case-insensitive lookup is not implemented");
+  }
+
+  auto field_opt = schema.GetFieldByName(field_name_);
+  if (!field_opt.has_value()) {
+    return InvalidExpression("Cannot find field '{}' in struct: {}", 
field_name_,
+                             schema.ToString());
+  }
+
+  return std::make_shared<BoundReference>(field_opt->get());
+}
+
+std::string NamedReference::ToString() const {
+  return std::format("ref(name=\"{}\")", field_name_);
+}
+
+// BoundReference implementation
+BoundReference::BoundReference(SchemaField field) : field_(std::move(field)) {}
+
+BoundReference::~BoundReference() = default;
+
+std::string BoundReference::ToString() const {
+  return std::format("ref(id={}, type={})", field_.field_id(), 
field_.type()->ToString());
+}
+
+Result<Literal::Value> BoundReference::Evaluate(const StructLike& data) const {
+  return NotImplemented("BoundReference::Evaluate(StructLike) not 
implemented");
+}
+
+bool BoundReference::Equals(const BoundTerm& other) const {
+  if (other.kind() != Term::Kind::kReference) {
+    return false;
+  }
+
+  const auto& other_ref = internal::checked_cast<const BoundReference&>(other);
+  return field_.field_id() == other_ref.field_.field_id() &&
+         field_.optional() == other_ref.field_.optional() &&
+         *field_.type() == *other_ref.field_.type();
+}
+
+// UnboundTransform implementation
+UnboundTransform::UnboundTransform(std::shared_ptr<NamedReference> ref,
+                                   std::shared_ptr<Transform> transform)
+    : ref_(std::move(ref)), transform_(std::move(transform)) {}
+
+UnboundTransform::~UnboundTransform() = default;
+
+std::string UnboundTransform::ToString() const {
+  return std::format("{}({})", transform_->ToString(), ref_->ToString());
+}
+
+Result<std::shared_ptr<BoundTransform>> UnboundTransform::Bind(
+    const Schema& schema, bool case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_ref, ref_->Bind(schema, case_sensitive));
+  ICEBERG_ASSIGN_OR_RAISE(auto transform_func, 
transform_->Bind(bound_ref->type()));
+  return std::make_shared<BoundTransform>(std::move(bound_ref), transform_,
+                                          std::move(transform_func));
+}
+
+// BoundTransform implementation
+BoundTransform::BoundTransform(std::shared_ptr<BoundReference> ref,
+                               std::shared_ptr<Transform> transform,
+                               std::shared_ptr<TransformFunction> 
transform_func)
+    : ref_(std::move(ref)),
+      transform_(std::move(transform)),
+      transform_func_(std::move(transform_func)) {}
+
+BoundTransform::~BoundTransform() = default;
+
+std::string BoundTransform::ToString() const {
+  return std::format("{}({})", transform_->ToString(), ref_->ToString());
+}
+
+Result<Literal::Value> BoundTransform::Evaluate(const StructLike& data) const {
+  throw IcebergError("BoundTransform::Evaluate(StructLike) not implemented");
+}
+
+bool BoundTransform::MayProduceNull() const {
+  // transforms must produce null for null input values
+  // transforms may produce null for non-null inputs when not order-preserving
+  // FIXME: add Transform::is_order_preserving()

Review Comment:
   fixme with naming?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value");
+  }
+
+  const auto& literal = values_[0];
+
+  if (literal.IsNull()) {
+    return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                             *bound_term->type(), literal.ToString(), 
*literal.type());
+  } else if (literal.IsAboveMax()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  } else if (literal.IsBelowMin()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  }
+
+  // TODO(gangwu): translate truncate(col) == value to startsWith(value)
+  return std::make_shared<BoundLiteralPredicate>(BASE::op(), 
std::move(bound_term),
+                                                 literal);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindInOperation(
+    std::shared_ptr<B> bound_term) const {
+  std::vector<Literal> converted_literals;
+  for (const auto& literal : values_) {
+    auto primitive_type =

Review Comment:
   check literal not null before casting?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value");

Review Comment:
   Print the values_ size?



##########
src/iceberg/expression/expression.cc:
##########
@@ -87,4 +91,91 @@ bool Or::Equals(const Expression& expr) const {
   return false;
 }
 
+std::string_view ToString(Expression::Operation op) {
+  switch (op) {
+    case Expression::Operation::kAnd:

Review Comment:
   Suddenly I found isn't this a Predicate::Operation rather than 
Expression::Operation?



##########
src/iceberg/expression/predicate.h:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/expression/predicate.h
+/// Predicate interface for boolean expressions that test terms.
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/term.h"
+
+namespace iceberg {
+
+/// \brief A predicate is a boolean expression that tests a term against some 
criteria.
+///
+/// \tparam TermType The type of the term being tested
+template <typename TermType>
+class ICEBERG_EXPORT Predicate : public Expression {
+ public:
+  /// \brief Create a predicate with an operation and term.
+  ///
+  /// \param op The operation this predicate performs
+  /// \param term The term this predicate tests
+  Predicate(Expression::Operation op, std::shared_ptr<TermType> term);
+
+  ~Predicate() override = default;
+
+  Expression::Operation op() const override { return operation_; }
+
+  /// \brief Returns the term this predicate tests.
+  const std::shared_ptr<TermType>& term() const { return term_; }
+
+ protected:
+  Expression::Operation operation_;
+  std::shared_ptr<TermType> term_;
+};
+
+/// \brief Unbound predicates contain unbound terms and must be bound to a 
concrete schema
+/// before they can be evaluated.
+///
+/// \tparam B The bound type this predicate produces when binding is successful
+template <typename B>
+class ICEBERG_EXPORT UnboundPredicate : public Predicate<UnboundTerm<B>>,

Review Comment:
   Can we give `UnboundPredicate<BoundReference>` an alias for help 
understanding?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }

Review Comment:
   Would this hardly happens since base class might already handle this problem?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value");
+  }
+
+  const auto& literal = values_[0];
+
+  if (literal.IsNull()) {
+    return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                             *bound_term->type(), literal.ToString(), 
*literal.type());
+  } else if (literal.IsAboveMax()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  } else if (literal.IsBelowMin()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  }
+
+  // TODO(gangwu): translate truncate(col) == value to startsWith(value)
+  return std::make_shared<BoundLiteralPredicate>(BASE::op(), 
std::move(bound_term),
+                                                 literal);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindInOperation(
+    std::shared_ptr<B> bound_term) const {
+  std::vector<Literal> converted_literals;
+  for (const auto& literal : values_) {
+    auto primitive_type =
+        internal::checked_pointer_cast<PrimitiveType>(bound_term->type());
+    ICEBERG_ASSIGN_OR_RAISE(auto converted, literal.CastTo(primitive_type));
+    if (converted.IsNull()) {
+      return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                               *bound_term->type(), literal.ToString(), 
*literal.type());
+    }
+    if (!converted.IsBelowMin() && !converted.IsAboveMax()) {

Review Comment:
   Add a comment for reason here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to