This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bffc246959 [Improvement](pipeline) Use hash shuffle for 1-phase 
Agg/Analytic ope rator #34122
9bffc246959 is described below

commit 9bffc246959af69af494b90a6fdf581bea4e5df5
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Apr 26 13:47:43 2024 +0800

    [Improvement](pipeline) Use hash shuffle for 1-phase Agg/Analytic ope rator 
#34122
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp            | 11 +++++++----
 be/src/pipeline/exec/aggregation_sink_operator.h              |  2 +-
 .../pipeline/exec/partitioned_aggregation_sink_operator.cpp   |  6 ++++--
 be/src/pipeline/exec/partitioned_aggregation_sink_operator.h  |  2 +-
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp    | 10 ++++++++--
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h      |  2 ++
 6 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index fd88b0d1521..6c9d27e2a2b 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -616,7 +616,7 @@ void AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs& p
 }
 
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                   const DescriptorTbl& descs)
+                                   const DescriptorTbl& descs, bool 
require_bucket_distribution)
         : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(nullptr),
@@ -629,9 +629,12 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, const TPla
           _limit(tnode.limit),
           _have_conjuncts((tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) ||
                           (tnode.__isset.conjuncts && 
!tnode.conjuncts.empty())),
-          _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
-                                                               : 
std::vector<TExpr> {}),
-          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
+          _partition_exprs(require_bucket_distribution ? 
(tnode.__isset.distribute_expr_lists
+                                                                  ? 
tnode.distribute_expr_lists[0]
+                                                                  : 
std::vector<TExpr> {})
+                                                       : 
tnode.agg_node.grouping_exprs),
+          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate &&
+                       require_bucket_distribution),
           _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples) {}
 
 Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index b3ffa19d6db..0c34acfd7df 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,7 +143,7 @@ protected:
 class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
 public:
     AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
-                     const DescriptorTbl& descs);
+                     const DescriptorTbl& descs, bool 
require_bucket_distribution);
     ~AggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 78079a0ddf8..7eb09555aa8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -122,9 +122,11 @@ void 
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
 
 PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int 
operator_id,
                                                          const TPlanNode& 
tnode,
-                                                         const DescriptorTbl& 
descs)
+                                                         const DescriptorTbl& 
descs,
+                                                         bool 
require_bucket_distribution)
         : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, 
tnode.node_id) {
-    _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, 
tnode, descs);
+    _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, 
tnode, descs,
+                                                            
require_bucket_distribution);
 }
 
 Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1755cd866f2..1233f66b562 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -294,7 +294,7 @@ public:
 class PartitionedAggSinkOperatorX : public 
DataSinkOperatorX<PartitionedAggSinkLocalState> {
 public:
     PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                const DescriptorTbl& descs);
+                                const DescriptorTbl& descs, bool 
require_bucket_distribution);
     ~PartitionedAggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index bf2c255a127..fc0234c6290 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1034,14 +1034,16 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             DataSinkOperatorXPtr sink;
             if (_runtime_state->enable_agg_spill() && 
!tnode.agg_node.grouping_exprs.empty()) {
                 sink.reset(new PartitionedAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode,
-                                                           descs));
+                                                           descs, 
_require_bucket_distribution));
             } else {
-                sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
+                sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs,
+                                                _require_bucket_distribution));
             }
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
         }
+        _require_bucket_distribution = true;
         break;
     }
     case TPlanNodeType::HASH_JOIN_NODE: {
@@ -1106,6 +1108,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             _pipeline_parent_map.push(op->node_id(), cur_pipe);
             _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         }
+        _require_bucket_distribution = true;
         break;
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1211,6 +1214,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
+        _require_bucket_distribution = true;
         break;
     }
     case TPlanNodeType::INTERSECT_NODE: {
@@ -1268,6 +1272,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                                      print_plan_node_type(tnode.node_type));
     }
 
+    _require_bucket_distribution = true;
+
     return Status::OK();
 }
 // NOLINTEND(readability-function-cognitive-complexity)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 31febc0d8aa..c87f8f4f784 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -239,6 +239,8 @@ private:
 
     // Total instance num running on all BEs
     int _total_instances = -1;
+
+    bool _require_bucket_distribution = false;
 };
 
 } // namespace pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to