This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9e0a2e861cf [pipelineX](refactor) rename functions (#28846) 9e0a2e861cf is described below commit 9e0a2e861cf05fcb7cb22fbcfe95955b16ab7f73 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Dec 22 17:24:39 2023 +0800 [pipelineX](refactor) rename functions (#28846) --- be/src/pipeline/exec/aggregation_sink_operator.h | 4 ++-- be/src/pipeline/exec/analytic_sink_operator.h | 4 ++-- be/src/pipeline/exec/assert_num_rows_operator.h | 2 +- be/src/pipeline/exec/exchange_source_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/nested_loop_join_build_operator.h | 2 +- be/src/pipeline/exec/nested_loop_join_probe_operator.h | 2 +- be/src/pipeline/exec/partition_sort_sink_operator.h | 4 ++-- be/src/pipeline/exec/scan_operator.h | 2 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 4 ++-- be/src/pipeline/exec/streaming_aggregation_sink_operator.h | 2 +- be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_x/operator.h | 4 ++-- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++---- 17 files changed, 25 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 2cd6ef50939..97be9dcd6a3 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -366,12 +366,12 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x ->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataSinkOperatorX<LocalStateType>::get_local_exchange_type(); + : DataSinkOperatorX<LocalStateType>::required_data_distribution(); } return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 14ed8c815b1..3e0eb85f76d 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -107,7 +107,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_partition_by_eq_expr_ctxs.empty()) { return {ExchangeType::PASSTHROUGH}; } else if (_order_by_eq_expr_ctxs.empty()) { @@ -115,7 +115,7 @@ public: ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - return DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type(); + return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution(); } private: diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 1e796b622dc..bb5e65168b6 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -57,7 +57,7 @@ public: [[nodiscard]] bool is_source() const override { return false; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return {ExchangeType::PASSTHROUGH}; } diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 221a43779a1..b621da38072 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -117,7 +117,7 @@ public: return _sub_plan_query_statistics_recvr; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index ecf0a4a3122..24faa4115dd 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -156,7 +156,7 @@ public: ._should_build_hash_table; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 16b455e4f6c..5dde597ec76 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -163,7 +163,7 @@ public: SourceState& source_state) const override; bool need_more_input_data(RuntimeState* state) const override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index daa976b4e78..ea0820253cc 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -102,7 +102,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 5e57399eae8..bc8913f5d08 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -227,7 +227,7 @@ public: return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 1a47e0fa133..486e7056213 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -105,9 +105,9 @@ public: Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { - return DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type(); + return DataSinkOperatorX<PartitionSortSinkLocalState>::required_data_distribution(); } return {ExchangeType::PASSTHROUGH}; } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 9bc42453c79..3690e9eb39c 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -434,7 +434,7 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_col_distribute_ids.empty() || OperatorX<LocalStateType>::ignore_data_distribution()) { // 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle. // 2. `ignore_data_distribution()` returns true means we ignore the distribution. diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index a86bf491721..6f453ff31fc 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -144,7 +144,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 375906b5aa3..635d1ee8675 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -129,7 +129,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 3146e915eef..2f5512e108b 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -93,12 +93,12 @@ public: Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_merge_by_exchange) { // The current sort node is used for the ORDER BY return {ExchangeType::PASSTHROUGH}; } - return DataSinkOperatorX<SortSinkLocalState>::get_local_exchange_type(); + return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution(); } private: diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index ef7f71b7e29..a7fcdcf847b 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -120,7 +120,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return {ExchangeType::PASSTHROUGH}; } }; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 305676856a0..2775c45019e 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -148,7 +148,7 @@ public: } } void init_data_distribution() { - set_data_distribution(operatorXs.front()->get_local_exchange_type()); + set_data_distribution(operatorXs.front()->required_data_distribution()); } void set_data_distribution(const DataDistribution& data_distribution) { _data_distribution = data_distribution; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index fc95785924b..da52706b56c 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -181,7 +181,7 @@ public: } [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; - [[nodiscard]] virtual DataDistribution get_local_exchange_type() const { + [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); @@ -481,7 +481,7 @@ public: } virtual void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) = 0; - virtual DataDistribution get_local_exchange_type() const { + virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index fe7388735ed..7efe476c6de 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -288,10 +288,10 @@ Status PipelineXFragmentContext::_plan_local_exchange( do_local_exchange = false; // Plan local exchange for each operator. for (; idx < ops.size();) { - if (ops[idx]->get_local_exchange_type().need_local_exchange()) { + if (ops[idx]->required_data_distribution().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, - ops[idx]->get_local_exchange_type(), &do_local_exchange, num_buckets, + ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets, bucket_seq_to_instance_idx, ignore_data_hash_distribution)); } if (do_local_exchange) { @@ -305,10 +305,10 @@ Status PipelineXFragmentContext::_plan_local_exchange( idx++; } } while (do_local_exchange); - if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) { + if (pip->sink_x()->required_data_distribution().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, - pip->sink_x()->get_local_exchange_type(), &do_local_exchange, num_buckets, + pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets, bucket_seq_to_instance_idx, ignore_data_hash_distribution)); } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org