This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 50d21c012e1 branch-3.0: [opt](agg function) Signature verification of aggregate function. (#43191) 50d21c012e1 is described below commit 50d21c012e1fc9d448f5425eae8d0b5cfc4afe19 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Sat Nov 9 10:28:23 2024 +0800 branch-3.0: [opt](agg function) Signature verification of aggregate function. (#43191) Cherry-picked from #40682 Co-authored-by: zhiqiang <seuhezhiqi...@163.com> --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 +- be/src/pipeline/exec/analytic_source_operator.cpp | 6 ++- .../distinct_streaming_aggregation_operator.cpp | 2 +- .../exec/streaming_aggregation_operator.cpp | 2 +- .../vec/aggregate_functions/aggregate_function.h | 44 ++++++++++++++++++++++ be/src/vec/exprs/vectorized_agg_fn.cpp | 17 +++++++-- be/src/vec/exprs/vectorized_agg_fn.h | 8 +++- 7 files changed, 71 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 83d566fac9a..627dbf4ef41 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -741,7 +741,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( _pool, tnode.agg_node.aggregate_functions[i], tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, - &evaluator)); + tnode.agg_node.grouping_exprs.empty(), &evaluator)); _aggregate_evaluators.push_back(evaluator); } diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index c365726a60f..643dbe82185 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -502,11 +502,13 @@ Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::init(tnode, state)); const TAnalyticNode& analytic_node = tnode.analytic_node; size_t agg_size = analytic_node.analytic_functions.size(); - for (int i = 0; i < agg_size; ++i) { vectorized::AggFnEvaluator* evaluator = nullptr; + // Window function treats all NullableAggregateFunction as AlwaysNullable. + // Its behavior is same with executed without group by key. + // https://github.com/apache/doris/pull/40693 RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( - _pool, analytic_node.analytic_functions[i], {}, &evaluator)); + _pool, analytic_node.analytic_functions[i], {}, /*wihout_key*/ true, &evaluator)); _agg_functions.emplace_back(evaluator); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 7cea16ad633..75b26c3ed18 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -362,7 +362,7 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( _pool, tnode.agg_node.aggregate_functions[i], tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, - &evaluator)); + tnode.agg_node.grouping_exprs.empty(), &evaluator)); _aggregate_evaluators.push_back(evaluator); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 780bd194ac8..9ead9c37b17 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1148,7 +1148,7 @@ Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( _pool, tnode.agg_node.aggregate_functions[i], tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, - &evaluator)); + tnode.agg_node.grouping_exprs.empty(), &evaluator)); _aggregate_evaluators.push_back(evaluator); } diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index f67fe14fa42..39de0324d14 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -20,6 +20,8 @@ #pragma once +#include "common/exception.h" +#include "common/status.h" #include "util/defer_op.h" #include "vec/columns/column_complex.h" #include "vec/columns/column_string.h" @@ -30,6 +32,7 @@ #include "vec/core/column_numbers.h" #include "vec/core/field.h" #include "vec/core/types.h" +#include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_string.h" namespace doris::vectorized { @@ -222,6 +225,10 @@ public: virtual AggregateFunctionPtr transmit_to_stable() { return nullptr; } + /// Verify function signature + virtual Status verify_result_type(const bool without_key, const DataTypes& argument_types, + const DataTypePtr result_type) const = 0; + protected: DataTypes argument_types; int version {}; @@ -494,6 +501,43 @@ public: arena); assert_cast<const Derived*, TypeCheckOnRelease::DISABLE>(this)->merge(place, rhs, arena); } + + Status verify_result_type(const bool without_key, const DataTypes& argument_types_with_nullable, + const DataTypePtr result_type_with_nullable) const override { + DataTypePtr function_result_type = assert_cast<const Derived*>(this)->get_return_type(); + + if (function_result_type->equals(*result_type_with_nullable)) { + return Status::OK(); + } + + if (!remove_nullable(function_result_type) + ->equals(*remove_nullable(result_type_with_nullable))) { + return Status::InternalError( + "Result type of {} is not matched, planner expect {}, but get {}, with group " + "by: " + "{}", + get_name(), result_type_with_nullable->get_name(), + function_result_type->get_name(), !without_key); + } + + if (without_key == true) { + if (result_type_with_nullable->is_nullable()) { + // This branch is decicated for NullableAggregateFunction. + // When they are executed without group by key, the result from planner will be AlwaysNullable + // since Planer does not know whether there are any invalid input at runtime, if so, the result + // should be Null, so the result type must be nullable. + // Backend will wrap a ColumnNullable in this situation. For example: AggLocalState::_get_without_key_result + return Status::OK(); + } + } + + // Executed with group by key, result type must be exactly same with the return type from Planner. + return Status::InternalError( + "Result type of {} is not matched, planner expect {}, but get {}, with group by: " + "{}", + get_name(), result_type_with_nullable->get_name(), function_result_type->get_name(), + !without_key); + } }; /// Implements several methods for manipulation with data. T - type of structure with data for aggregation. diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 2d05632de3c..86f1b35e332 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -44,6 +44,8 @@ #include "vec/exprs/vexpr_context.h" #include "vec/utils/util.hpp" +static constexpr int64_t BE_VERSION_THAT_SUPPORT_NULLABLE_CHECK = 8; + namespace doris { class RowDescriptor; namespace vectorized { @@ -63,9 +65,10 @@ AggregateFunctionPtr get_agg_state_function(const DataTypes& argument_types, argument_types, return_type); } -AggFnEvaluator::AggFnEvaluator(const TExprNode& desc) +AggFnEvaluator::AggFnEvaluator(const TExprNode& desc, const bool without_key) : _fn(desc.fn), _is_merge(desc.agg_expr.is_merge_agg), + _without_key(without_key), _return_type(TypeDescriptor::from_thrift(desc.fn.ret_type)) { bool nullable = true; if (desc.__isset.is_nullable) { @@ -83,8 +86,8 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc) } Status AggFnEvaluator::create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info, - AggFnEvaluator** result) { - *result = pool->add(AggFnEvaluator::create_unique(desc.nodes[0]).release()); + const bool without_key, AggFnEvaluator** result) { + *result = pool->add(AggFnEvaluator::create_unique(desc.nodes[0], without_key).release()); auto& agg_fn_evaluator = *result; int node_idx = 0; for (int i = 0; i < desc.nodes[0].num_children; ++i) { @@ -213,6 +216,13 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, _function = transform_to_sort_agg_function(_function, _argument_types_with_sort, _sort_description, state); } + + if (!AggregateFunctionSimpleFactory::is_foreach(_fn.name.function_name)) { + if (state->be_exec_version() >= BE_VERSION_THAT_SUPPORT_NULLABLE_CHECK) { + RETURN_IF_ERROR( + _function->verify_result_type(_without_key, argument_types, _data_type)); + } + } _expr_name = fmt::format("{}({})", _fn.name.function_name, child_expr_name); return Status::OK(); } @@ -320,6 +330,7 @@ AggFnEvaluator* AggFnEvaluator::clone(RuntimeState* state, ObjectPool* pool) { AggFnEvaluator::AggFnEvaluator(AggFnEvaluator& evaluator, RuntimeState* state) : _fn(evaluator._fn), _is_merge(evaluator._is_merge), + _without_key(evaluator._without_key), _argument_types_with_sort(evaluator._argument_types_with_sort), _real_argument_types(evaluator._real_argument_types), _return_type(evaluator._return_type), diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index 7dcd1b3e02b..30983795e42 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -50,7 +50,7 @@ class AggFnEvaluator { public: static Status create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info, - AggFnEvaluator** result); + const bool without_key, AggFnEvaluator** result); Status prepare(RuntimeState* state, const RowDescriptor& desc, const SlotDescriptor* intermediate_slot_desc, @@ -109,8 +109,12 @@ private: const TFunction _fn; const bool _is_merge; + // We need this flag to distinguish between the two types of aggregation functions: + // 1. executed without group by key (agg function used with window function is also regarded as this type) + // 2. executed with group by key + const bool _without_key; - AggFnEvaluator(const TExprNode& desc); + AggFnEvaluator(const TExprNode& desc, const bool without_key); AggFnEvaluator(AggFnEvaluator& evaluator, RuntimeState* state); Status _calc_argument_columns(Block* block); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org