This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9bffc246959 [Improvement](pipeline) Use hash shuffle for 1-phase Agg/Analytic ope rator #34122 9bffc246959 is described below commit 9bffc246959af69af494b90a6fdf581bea4e5df5 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Apr 26 13:47:43 2024 +0800 [Improvement](pipeline) Use hash shuffle for 1-phase Agg/Analytic ope rator #34122 --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 +++++++---- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../pipeline/exec/partitioned_aggregation_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/partitioned_aggregation_sink_operator.h | 2 +- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++-- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 ++ 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index fd88b0d1521..6c9d27e2a2b 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -616,7 +616,7 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -629,9 +629,12 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector<TExpr> {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists + ? tnode.distribute_expr_lists[0] + : std::vector<TExpr> {}) + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && + require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index b3ffa19d6db..0c34acfd7df 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,7 +143,7 @@ protected: class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> { public: AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 78079a0ddf8..7eb09555aa8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -122,9 +122,11 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id) { - _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs); + _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 1755cd866f2..1233f66b562 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -294,7 +294,7 @@ public: class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkLocalState> { public: PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~PartitionedAggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index bf2c255a127..fc0234c6290 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1034,14 +1034,16 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, - descs)); + descs, _require_bucket_distribution)); } else { - sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } + _require_bucket_distribution = true; break; } case TPlanNodeType::HASH_JOIN_NODE: { @@ -1106,6 +1108,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = true; break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1211,6 +1214,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + _require_bucket_distribution = true; break; } case TPlanNodeType::INTERSECT_NODE: { @@ -1268,6 +1272,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN print_plan_node_type(tnode.node_type)); } + _require_bucket_distribution = true; + return Status::OK(); } // NOLINTEND(readability-function-cognitive-complexity) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 31febc0d8aa..c87f8f4f784 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -239,6 +239,8 @@ private: // Total instance num running on all BEs int _total_instances = -1; + + bool _require_bucket_distribution = false; }; } // namespace pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org