Jinchul81 commented on code in PR #335: URL: https://github.com/apache/iceberg-cpp/pull/335#discussion_r2553196780
########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { Review Comment: Why don't we use `std::string_view` instead of `std::string` if the return values should be constant string literals? ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { + return CountNotNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountStar() { + auto agg = CountAggregate::CountStar(); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<ValueAggregate> Expressions::Max(std::string name) { Review Comment: Same above. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { + switch (op) { + case Expression::Operation::kMax: + return "max"; + case Expression::Operation::kMin: + return "min"; + case Expression::Operation::kCount: + case Expression::Operation::kCountStar: + return "count"; + default: + break; + } + return "aggregate"; +} + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + auto primitive = std::dynamic_pointer_cast<PrimitiveType>(term.type()); + if (primitive == nullptr) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return primitive; +} + +} // namespace + +CountAggregate::CountAggregate(Expression::Operation op, Mode mode, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), + mode_(mode), + term_(std::move(term)), + reference_(std::move(reference)) {} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::Count( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNonNull, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNull, std::move(term), std::move(ref))); +} + +std::unique_ptr<CountAggregate> CountAggregate::CountStar() { + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCountStar, Mode::kStar, nullptr, nullptr)); +} + +std::string CountAggregate::ToString() const { + if (mode_ == Mode::kStar) { + return "count(*)"; + } + ICEBERG_DCHECK(reference_ != nullptr, "Count aggregate should have reference"); + switch (mode_) { + case Mode::kNull: + return std::format("count_null({})", reference_->name()); + case Mode::kNonNull: + return std::format("count({})", reference_->name()); + case Mode::kStar: + break; + } + std::unreachable(); +} + +Result<std::shared_ptr<Expression>> CountAggregate::Bind(const Schema& schema, + bool case_sensitive) const { + std::shared_ptr<BoundTerm> bound_term; + if (term_ != nullptr) { + ICEBERG_ASSIGN_OR_THROW(auto bound, term_->Bind(schema, case_sensitive)); + bound_term = std::move(bound); + } + auto aggregate = + std::make_shared<BoundCountAggregate>(op(), mode_, std::move(bound_term)); + return aggregate; +} + +BoundCountAggregate::BoundCountAggregate(Expression::Operation op, + CountAggregate::Mode mode, + std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)), mode_(mode) {} + +std::string BoundCountAggregate::ToString() const { + if (mode_ == CountAggregate::Mode::kStar) { + return "count(*)"; + } + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + switch (mode_) { + case CountAggregate::Mode::kNull: + return std::format("count_null({})", term()->reference()->name()); + case CountAggregate::Mode::kNonNull: + return std::format("count({})", term()->reference()->name()); + case CountAggregate::Mode::kStar: + break; + } + std::unreachable(); +} + +ValueAggregate::ValueAggregate(Expression::Operation op, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), term_(std::move(term)), reference_(std::move(reference)) {} + +Result<std::unique_ptr<ValueAggregate>> ValueAggregate::Max( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<ValueAggregate>( + new ValueAggregate(Expression::Operation::kMax, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<ValueAggregate>> ValueAggregate::Min( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<ValueAggregate>( + new ValueAggregate(Expression::Operation::kMin, std::move(term), std::move(ref))); +} + +std::string ValueAggregate::ToString() const { + return std::format("{}({})", OperationToPrefix(op()), reference_->name()); +} + +Result<std::shared_ptr<Expression>> ValueAggregate::Bind(const Schema& schema, + bool case_sensitive) const { + ICEBERG_ASSIGN_OR_THROW(auto bound, term_->Bind(schema, case_sensitive)); + auto aggregate = std::make_shared<BoundValueAggregate>( + op(), std::shared_ptr<BoundTerm>(std::move(bound))); + return aggregate; +} + +BoundValueAggregate::BoundValueAggregate(Expression::Operation op, + std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} + +std::string BoundValueAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound value aggregate should have term"); + return std::format("{}({})", OperationToPrefix(op()), term()->reference()->name()); +} + +namespace { + +class CountEvaluator : public AggregateEvaluator { + public: + CountEvaluator(CountAggregate::Mode mode, std::shared_ptr<BoundTerm> term) + : mode_(mode), term_(std::move(term)) {} + + Status Add(const StructLike& row) override { + switch (mode_) { + case CountAggregate::Mode::kStar: + ++count_; + return {}; + case CountAggregate::Mode::kNonNull: { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + case CountAggregate::Mode::kNull: { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + } + std::unreachable(); + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + CountAggregate::Mode mode_; + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class ValueAggregateEvaluator : public AggregateEvaluator { + public: + ValueAggregateEvaluator(Expression::Operation op, std::shared_ptr<BoundTerm> term, + std::shared_ptr<PrimitiveType> type) + : op_(op), term_(std::move(term)), type_(std::move(type)) {} + + Status Add(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + return {}; + } + + if (!current_) { + current_ = std::move(literal); + return {}; + } + + auto ordering = literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + literal.type()->ToString()); + } + + if (op_ == Expression::Operation::kMax) { Review Comment: Please add a comment why `current_` should be overwritten here. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); Review Comment: It looks like return `std::move(agg));` is enough. ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" + +namespace iceberg { + +class AggregateEvaluator; + +/// \brief Base class for aggregate expressions. +class ICEBERG_EXPORT Aggregate : public Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + bool is_unbound_aggregate() const override { return false; } + bool is_bound_aggregate() const override { return false; } + + protected: + explicit Aggregate(Expression::Operation op) : operation_(op) {} + + private: + Expression::Operation operation_; +}; + +/// \brief Unbound aggregate with an optional term. +class ICEBERG_EXPORT UnboundAggregate : public Aggregate, public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } + + /// \brief Returns the unbound reference if the aggregate has a term. + virtual std::shared_ptr<NamedReference> reference() override = 0; + + protected: + explicit UnboundAggregate(Expression::Operation op) : Aggregate(op) {} +}; + +/// \brief Bound aggregate with an optional term. +class ICEBERG_EXPORT BoundAggregate : public Aggregate { + public: + ~BoundAggregate() override = default; + + bool is_bound_aggregate() const override { return true; } + + const std::shared_ptr<BoundTerm>& term() const { return term_; } + + protected: + BoundAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : Aggregate(op), term_(std::move(term)) {} + + private: + std::shared_ptr<BoundTerm> term_; +}; + +/// \brief COUNT aggregate variants. +class ICEBERG_EXPORT CountAggregate : public UnboundAggregate { + public: + enum class Mode { kNonNull, kNull, kStar }; + + static Result<std::unique_ptr<CountAggregate>> Count( + std::shared_ptr<UnboundTerm<BoundReference>> term); + + static Result<std::unique_ptr<CountAggregate>> CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> term); + + static std::unique_ptr<CountAggregate> CountStar(); + + ~CountAggregate() override = default; + + Mode mode() const { return mode_; } + + const std::shared_ptr<UnboundTerm<BoundReference>>& term() const { return term_; } + + std::shared_ptr<NamedReference> reference() override { return reference_; } + + std::string ToString() const override; + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + private: + CountAggregate(Expression::Operation op, Mode mode, Review Comment: Please add a comment why public ctor is not allowed. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { + switch (op) { + case Expression::Operation::kMax: + return "max"; + case Expression::Operation::kMin: + return "min"; + case Expression::Operation::kCount: + case Expression::Operation::kCountStar: + return "count"; + default: + break; + } + return "aggregate"; +} + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + auto primitive = std::dynamic_pointer_cast<PrimitiveType>(term.type()); + if (primitive == nullptr) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return primitive; +} + +} // namespace + +CountAggregate::CountAggregate(Expression::Operation op, Mode mode, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), + mode_(mode), + term_(std::move(term)), + reference_(std::move(reference)) {} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::Count( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNonNull, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNull, std::move(term), std::move(ref))); +} + +std::unique_ptr<CountAggregate> CountAggregate::CountStar() { + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCountStar, Mode::kStar, nullptr, nullptr)); +} + +std::string CountAggregate::ToString() const { + if (mode_ == Mode::kStar) { + return "count(*)"; + } + ICEBERG_DCHECK(reference_ != nullptr, "Count aggregate should have reference"); + switch (mode_) { + case Mode::kNull: + return std::format("count_null({})", reference_->name()); + case Mode::kNonNull: + return std::format("count({})", reference_->name()); + case Mode::kStar: + break; + } + std::unreachable(); +} + +Result<std::shared_ptr<Expression>> CountAggregate::Bind(const Schema& schema, + bool case_sensitive) const { + std::shared_ptr<BoundTerm> bound_term; + if (term_ != nullptr) { + ICEBERG_ASSIGN_OR_THROW(auto bound, term_->Bind(schema, case_sensitive)); + bound_term = std::move(bound); + } + auto aggregate = + std::make_shared<BoundCountAggregate>(op(), mode_, std::move(bound_term)); + return aggregate; +} + +BoundCountAggregate::BoundCountAggregate(Expression::Operation op, + CountAggregate::Mode mode, + std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)), mode_(mode) {} + +std::string BoundCountAggregate::ToString() const { + if (mode_ == CountAggregate::Mode::kStar) { Review Comment: Why did you special handling for `kStar`? ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { Review Comment: Same above. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { + return CountNotNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountStar() { + auto agg = CountAggregate::CountStar(); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<ValueAggregate> Expressions::Max(std::string name) { + return Max(Ref(std::move(name))); +} + +std::shared_ptr<ValueAggregate> Expressions::Max( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, ValueAggregate::Max(std::move(expr))); + return std::shared_ptr<ValueAggregate>(std::move(agg)); +} + +std::shared_ptr<ValueAggregate> Expressions::Min(std::string name) { + return Min(Ref(std::move(name))); +} + +std::shared_ptr<ValueAggregate> Expressions::Min( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, ValueAggregate::Min(std::move(expr))); + return std::shared_ptr<ValueAggregate>(std::move(agg)); Review Comment: It looks like `return std::move(agg));` is enough. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { Review Comment: You can change the parameter to `std::string&&` to avoid an unnecessary copy and clearly express that the function intends to take ownership of the string. Using an rvalue reference makes the API’s intent clearer and can reduce overhead in performance-critical paths. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { Review Comment: Same above. ########## src/iceberg/expression/expression_visitor.h: ########## @@ -77,6 +78,22 @@ class ICEBERG_EXPORT ExpressionVisitor { /// \brief Visit an unbound predicate. /// \param pred The unbound predicate to visit virtual Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) = 0; + + /// \brief Visit a bound aggregate. + /// \param aggregate The bound aggregate to visit. + virtual Result<R> Aggregate(const std::shared_ptr<BoundAggregate>& aggregate) { + ICEBERG_DCHECK(aggregate != nullptr, "Bound aggregate cannot be null"); + return NotSupported("Bound aggregate is not supported by this visitor: {}", + aggregate->ToString()); + } + + /// \brief Visit an unbound aggregate. + /// \param aggregate The unbound aggregate to visit. + virtual Result<R> Aggregate(const std::shared_ptr<UnboundAggregate>& aggregate) { Review Comment: Please add comment for the return. ########## src/iceberg/expression/expression_visitor.h: ########## @@ -77,6 +78,22 @@ class ICEBERG_EXPORT ExpressionVisitor { /// \brief Visit an unbound predicate. /// \param pred The unbound predicate to visit virtual Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) = 0; + + /// \brief Visit a bound aggregate. + /// \param aggregate The bound aggregate to visit. Review Comment: Please add comment for the return. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { + switch (op) { + case Expression::Operation::kMax: + return "max"; + case Expression::Operation::kMin: + return "min"; + case Expression::Operation::kCount: + case Expression::Operation::kCountStar: + return "count"; + default: + break; + } + return "aggregate"; +} + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + auto primitive = std::dynamic_pointer_cast<PrimitiveType>(term.type()); + if (primitive == nullptr) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return primitive; +} + +} // namespace + +CountAggregate::CountAggregate(Expression::Operation op, Mode mode, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), + mode_(mode), + term_(std::move(term)), + reference_(std::move(reference)) {} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::Count( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNonNull, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNull, std::move(term), std::move(ref))); +} + +std::unique_ptr<CountAggregate> CountAggregate::CountStar() { + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCountStar, Mode::kStar, nullptr, nullptr)); +} + +std::string CountAggregate::ToString() const { + if (mode_ == Mode::kStar) { + return "count(*)"; + } + ICEBERG_DCHECK(reference_ != nullptr, "Count aggregate should have reference"); + switch (mode_) { + case Mode::kNull: + return std::format("count_null({})", reference_->name()); + case Mode::kNonNull: + return std::format("count({})", reference_->name()); + case Mode::kStar: + break; + } + std::unreachable(); +} + +Result<std::shared_ptr<Expression>> CountAggregate::Bind(const Schema& schema, + bool case_sensitive) const { + std::shared_ptr<BoundTerm> bound_term; + if (term_ != nullptr) { + ICEBERG_ASSIGN_OR_THROW(auto bound, term_->Bind(schema, case_sensitive)); + bound_term = std::move(bound); + } + auto aggregate = + std::make_shared<BoundCountAggregate>(op(), mode_, std::move(bound_term)); + return aggregate; +} + +BoundCountAggregate::BoundCountAggregate(Expression::Operation op, + CountAggregate::Mode mode, + std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)), mode_(mode) {} + +std::string BoundCountAggregate::ToString() const { + if (mode_ == CountAggregate::Mode::kStar) { + return "count(*)"; + } + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + switch (mode_) { + case CountAggregate::Mode::kNull: + return std::format("count_null({})", term()->reference()->name()); + case CountAggregate::Mode::kNonNull: + return std::format("count({})", term()->reference()->name()); + case CountAggregate::Mode::kStar: + break; + } + std::unreachable(); +} + +ValueAggregate::ValueAggregate(Expression::Operation op, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), term_(std::move(term)), reference_(std::move(reference)) {} + +Result<std::unique_ptr<ValueAggregate>> ValueAggregate::Max( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<ValueAggregate>( + new ValueAggregate(Expression::Operation::kMax, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<ValueAggregate>> ValueAggregate::Min( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<ValueAggregate>( + new ValueAggregate(Expression::Operation::kMin, std::move(term), std::move(ref))); +} + +std::string ValueAggregate::ToString() const { + return std::format("{}({})", OperationToPrefix(op()), reference_->name()); +} + +Result<std::shared_ptr<Expression>> ValueAggregate::Bind(const Schema& schema, + bool case_sensitive) const { + ICEBERG_ASSIGN_OR_THROW(auto bound, term_->Bind(schema, case_sensitive)); + auto aggregate = std::make_shared<BoundValueAggregate>( + op(), std::shared_ptr<BoundTerm>(std::move(bound))); + return aggregate; +} + +BoundValueAggregate::BoundValueAggregate(Expression::Operation op, + std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} + +std::string BoundValueAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound value aggregate should have term"); + return std::format("{}({})", OperationToPrefix(op()), term()->reference()->name()); +} + +namespace { + +class CountEvaluator : public AggregateEvaluator { + public: + CountEvaluator(CountAggregate::Mode mode, std::shared_ptr<BoundTerm> term) + : mode_(mode), term_(std::move(term)) {} + + Status Add(const StructLike& row) override { + switch (mode_) { + case CountAggregate::Mode::kStar: + ++count_; + return {}; + case CountAggregate::Mode::kNonNull: { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + case CountAggregate::Mode::kNull: { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + } + std::unreachable(); + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + CountAggregate::Mode mode_; + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class ValueAggregateEvaluator : public AggregateEvaluator { + public: + ValueAggregateEvaluator(Expression::Operation op, std::shared_ptr<BoundTerm> term, + std::shared_ptr<PrimitiveType> type) + : op_(op), term_(std::move(term)), type_(std::move(type)) {} + + Status Add(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + return {}; + } + + if (!current_) { + current_ = std::move(literal); + return {}; + } + + auto ordering = literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { Review Comment: Please pull up this condition. We can exit early if the conditions are not satisfied. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { + return CountNotNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountStar() { + auto agg = CountAggregate::CountStar(); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<ValueAggregate> Expressions::Max(std::string name) { + return Max(Ref(std::move(name))); +} + +std::shared_ptr<ValueAggregate> Expressions::Max( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, ValueAggregate::Max(std::move(expr))); + return std::shared_ptr<ValueAggregate>(std::move(agg)); +} + +std::shared_ptr<ValueAggregate> Expressions::Min(std::string name) { Review Comment: Same above. ########## src/iceberg/test/aggregate_test.cc: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <gtest/gtest.h> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/schema.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +class VectorStructLike : public StructLike { + public: + explicit VectorStructLike(std::vector<Scalar> fields) : fields_(std::move(fields)) {} + + Result<Scalar> GetField(size_t pos) const override { + if (pos >= fields_.size()) { + return InvalidArgument("Position {} out of range", pos); Review Comment: Please also show the number of fields to find out the cause easily. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { + return CountNotNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountStar() { + auto agg = CountAggregate::CountStar(); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<ValueAggregate> Expressions::Max(std::string name) { + return Max(Ref(std::move(name))); +} + +std::shared_ptr<ValueAggregate> Expressions::Max( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, ValueAggregate::Max(std::move(expr))); + return std::shared_ptr<ValueAggregate>(std::move(agg)); Review Comment: It looks like return `std::move(agg));` is enough. ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" + +namespace iceberg { + +class AggregateEvaluator; + +/// \brief Base class for aggregate expressions. +class ICEBERG_EXPORT Aggregate : public Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + bool is_unbound_aggregate() const override { return false; } + bool is_bound_aggregate() const override { return false; } + + protected: + explicit Aggregate(Expression::Operation op) : operation_(op) {} + + private: + Expression::Operation operation_; +}; + +/// \brief Unbound aggregate with an optional term. +class ICEBERG_EXPORT UnboundAggregate : public Aggregate, public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } + + /// \brief Returns the unbound reference if the aggregate has a term. + virtual std::shared_ptr<NamedReference> reference() override = 0; + + protected: + explicit UnboundAggregate(Expression::Operation op) : Aggregate(op) {} +}; + +/// \brief Bound aggregate with an optional term. Review Comment: What's the definition of term here? Why does aggregate optionally require term? Please put this into the comment. ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" + +namespace iceberg { + +class AggregateEvaluator; + +/// \brief Base class for aggregate expressions. +class ICEBERG_EXPORT Aggregate : public Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + bool is_unbound_aggregate() const override { return false; } + bool is_bound_aggregate() const override { return false; } + + protected: + explicit Aggregate(Expression::Operation op) : operation_(op) {} + + private: + Expression::Operation operation_; +}; + +/// \brief Unbound aggregate with an optional term. +class ICEBERG_EXPORT UnboundAggregate : public Aggregate, public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } + + /// \brief Returns the unbound reference if the aggregate has a term. + virtual std::shared_ptr<NamedReference> reference() override = 0; + + protected: + explicit UnboundAggregate(Expression::Operation op) : Aggregate(op) {} +}; + +/// \brief Bound aggregate with an optional term. +class ICEBERG_EXPORT BoundAggregate : public Aggregate { + public: + ~BoundAggregate() override = default; + + bool is_bound_aggregate() const override { return true; } + + const std::shared_ptr<BoundTerm>& term() const { return term_; } + + protected: + BoundAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : Aggregate(op), term_(std::move(term)) {} + + private: + std::shared_ptr<BoundTerm> term_; +}; + +/// \brief COUNT aggregate variants. +class ICEBERG_EXPORT CountAggregate : public UnboundAggregate { + public: + enum class Mode { kNonNull, kNull, kStar }; Review Comment: Please add a comment for the description for them. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { + switch (op) { + case Expression::Operation::kMax: + return "max"; + case Expression::Operation::kMin: + return "min"; + case Expression::Operation::kCount: + case Expression::Operation::kCountStar: + return "count"; + default: + break; + } + return "aggregate"; +} + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + auto primitive = std::dynamic_pointer_cast<PrimitiveType>(term.type()); + if (primitive == nullptr) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return primitive; +} + +} // namespace + +CountAggregate::CountAggregate(Expression::Operation op, Mode mode, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), + mode_(mode), + term_(std::move(term)), + reference_(std::move(reference)) {} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::Count( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNonNull, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNull, std::move(term), std::move(ref))); +} + +std::unique_ptr<CountAggregate> CountAggregate::CountStar() { Review Comment: Why does it return `std::unique_ptr<CountAggregate>` instead of `Result<std::unique_ptr<CountAggregate>>`? ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); Review Comment: It looks like return `std::move(agg));` is enough. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { + return CountNotNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountStar() { + auto agg = CountAggregate::CountStar(); + return std::shared_ptr<CountAggregate>(std::move(agg)); Review Comment: It looks like return `std::move(agg));` is enough. ########## src/iceberg/expression/expressions.cc: ########## @@ -81,6 +82,63 @@ std::shared_ptr<UnboundTransform> Expressions::Transform( return unbound_transform; } +// Aggregates + +std::shared_ptr<CountAggregate> Expressions::Count(std::string name) { + return Count(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::Count( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull(std::string name) { + return CountNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::CountNull(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull(std::string name) { + return CountNotNull(Ref(std::move(name))); +} + +std::shared_ptr<CountAggregate> Expressions::CountNotNull( + std::shared_ptr<UnboundTerm<BoundReference>> expr) { + ICEBERG_ASSIGN_OR_THROW(auto agg, CountAggregate::Count(std::move(expr))); + return std::shared_ptr<CountAggregate>(std::move(agg)); Review Comment: It looks like return `std::move(agg));` is enough. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { + switch (op) { + case Expression::Operation::kMax: + return "max"; + case Expression::Operation::kMin: + return "min"; + case Expression::Operation::kCount: + case Expression::Operation::kCountStar: + return "count"; + default: + break; + } + return "aggregate"; Review Comment: I don't think this is the right return value. Why do we have to map the others to `aggregate`? ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::string OperationToPrefix(Expression::Operation op) { + switch (op) { + case Expression::Operation::kMax: + return "max"; + case Expression::Operation::kMin: + return "min"; + case Expression::Operation::kCount: + case Expression::Operation::kCountStar: + return "count"; + default: + break; + } + return "aggregate"; +} + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + auto primitive = std::dynamic_pointer_cast<PrimitiveType>(term.type()); + if (primitive == nullptr) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return primitive; +} + +} // namespace + +CountAggregate::CountAggregate(Expression::Operation op, Mode mode, + std::shared_ptr<UnboundTerm<BoundReference>> term, + std::shared_ptr<NamedReference> reference) + : UnboundAggregate(op), + mode_(mode), + term_(std::move(term)), + reference_(std::move(reference)) {} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::Count( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNonNull, std::move(term), std::move(ref))); +} + +Result<std::unique_ptr<CountAggregate>> CountAggregate::CountNull( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + auto ref = term->reference(); + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCount, Mode::kNull, std::move(term), std::move(ref))); +} + +std::unique_ptr<CountAggregate> CountAggregate::CountStar() { + return std::unique_ptr<CountAggregate>(new CountAggregate( + Expression::Operation::kCountStar, Mode::kStar, nullptr, nullptr)); +} + +std::string CountAggregate::ToString() const { + if (mode_ == Mode::kStar) { Review Comment: Why did you special handling for `kStar`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
