This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b6cd23052d4 [fix](spill) disable partitioned agg when group by limit 
opt is set (#37873)
b6cd23052d4 is described below

commit b6cd23052d4d55daa296f45333938043faa03bef
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Thu Jul 18 10:16:24 2024 +0800

    [fix](spill) disable partitioned agg when group by limit opt is set (#37873)
    
    ## Proposed changes
    
    Disable partitioned agg when group by limit opt is set(#29641)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 39555d3614e..1ab40723641 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1182,11 +1182,19 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             return Status::InternalError("Illegal aggregate node " + 
std::to_string(tnode.node_id) +
                                          ": group by and output is empty");
         }
-        if (tnode.agg_node.aggregate_functions.empty() && 
!_runtime_state->enable_agg_spill() &&
+
+        const bool group_by_limit_opt =
+                tnode.agg_node.__isset.agg_sort_info_by_group_key && 
tnode.limit > 0;
+
+        /// PartitionedAggSourceOperatorX does not support "group by limit 
opt(#29641)" yet.
+        /// If `group_by_limit_opt` is true, then it might not need to spill 
at all.
+        const bool enable_spill = _runtime_state->enable_agg_spill() &&
+                                  !tnode.agg_node.grouping_exprs.empty() && 
!group_by_limit_opt;
+
+        if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
             request.query_options.enable_distinct_streaming_aggregation &&
-            !tnode.agg_node.grouping_exprs.empty() &&
-            !tnode.agg_node.__isset.agg_sort_info_by_group_key) {
+            !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
             op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
                                                        
_require_bucket_distribution));
             _require_bucket_distribution =
@@ -1198,7 +1206,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
         } else {
-            if (_runtime_state->enable_agg_spill() && 
!tnode.agg_node.grouping_exprs.empty()) {
+            if (enable_spill) {
                 op.reset(new PartitionedAggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
             } else {
                 op.reset(new AggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
@@ -1213,7 +1221,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
             DataSinkOperatorXPtr sink;
-            if (_runtime_state->enable_agg_spill() && 
!tnode.agg_node.grouping_exprs.empty()) {
+            if (enable_spill) {
                 sink.reset(new PartitionedAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode,
                                                            descs, 
_require_bucket_distribution));
             } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to