This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b6cd23052d4 [fix](spill) disable partitioned agg when group by limit opt is set (#37873) b6cd23052d4 is described below commit b6cd23052d4d55daa296f45333938043faa03bef Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Thu Jul 18 10:16:24 2024 +0800 [fix](spill) disable partitioned agg when group by limit opt is set (#37873) ## Proposed changes Disable partitioned agg when group by limit opt is set(#29641) --- be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 39555d3614e..1ab40723641 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1182,11 +1182,19 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) + ": group by and output is empty"); } - if (tnode.agg_node.aggregate_functions.empty() && !_runtime_state->enable_agg_spill() && + + const bool group_by_limit_opt = + tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0; + + /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet. + /// If `group_by_limit_opt` is true, then it might not need to spill at all. + const bool enable_spill = _runtime_state->enable_agg_spill() && + !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt; + + if (tnode.agg_node.aggregate_functions.empty() && !enable_spill && request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && - !tnode.agg_node.grouping_exprs.empty() && - !tnode.agg_node.__isset.agg_sort_info_by_group_key) { + !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); _require_bucket_distribution = @@ -1198,7 +1206,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else { - if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { + if (enable_spill) { op.reset(new PartitionedAggSourceOperatorX(pool, tnode, next_operator_id(), descs)); } else { op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); @@ -1213,7 +1221,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { + if (enable_spill) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org