This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new de8442bbef2 [refactor](pipeline) Refactor local exchange planning (#42482) de8442bbef2 is described below commit de8442bbef2e4ee91d2815f8bf2bca8886bf3235 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Oct 29 14:04:20 2024 +0800 [refactor](pipeline) Refactor local exchange planning (#42482) --- be/src/pipeline/exec/aggregation_sink_operator.h | 1 - be/src/pipeline/exec/analytic_sink_operator.h | 3 -- be/src/pipeline/exec/datagen_operator.cpp | 8 +-- .../exec/distinct_streaming_aggregation_operator.h | 3 -- be/src/pipeline/exec/exchange_source_operator.cpp | 4 +- be/src/pipeline/exec/exchange_source_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 7 +-- be/src/pipeline/exec/hashjoin_probe_operator.h | 3 -- .../exec/nested_loop_join_build_operator.h | 4 +- be/src/pipeline/exec/operator.cpp | 10 +++- be/src/pipeline/exec/operator.h | 21 +------- .../exec/partitioned_aggregation_sink_operator.h | 3 -- .../exec/partitioned_hash_join_probe_operator.h | 3 -- .../exec/partitioned_hash_join_sink_operator.h | 3 -- be/src/pipeline/exec/scan_operator.cpp | 6 ++- be/src/pipeline/exec/scan_operator.h | 4 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 - be/src/pipeline/exec/set_sink_operator.h | 1 - be/src/pipeline/exec/sort_sink_operator.h | 1 - .../local_exchange_source_operator.h | 3 -- be/src/pipeline/local_exchange/local_exchanger.cpp | 2 +- be/src/pipeline/local_exchange/local_exchanger.h | 11 ++-- be/src/pipeline/pipeline.cpp | 21 ++++---- be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 62 +++++----------------- be/src/pipeline/pipeline_fragment_context.h | 9 ++-- .../org/apache/doris/planner/PlanFragment.java | 1 - .../java/org/apache/doris/qe/SessionVariable.java | 4 +- gensrc/thrift/Planner.thrift | 4 -- .../distribute/local_shuffle.groovy | 2 +- 30 files changed, 64 insertions(+), 146 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 8271f1451b4..9ff3de99b22 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -152,7 +152,6 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } bool require_data_distribution() const override { return _is_colocate; } - bool require_shuffled_data_distribution() const override { return !_probe_expr_ctxs.empty(); } size_t get_revocable_mem_size(RuntimeState* state) const; AggregatedDataVariants* get_agg_data(RuntimeState* state) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 1a0a671cf9f..b35354107f6 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -88,9 +88,6 @@ public: } bool require_data_distribution() const override { return true; } - bool require_shuffled_data_distribution() const override { - return !_partition_by_eq_expr_ctxs.empty(); - } private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index faa6359e874..965092b7eef 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -36,7 +36,9 @@ DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool* pool, const TPlanNode : OperatorX<DataGenLocalState>(pool, tnode, operator_id, descs), _tuple_id(tnode.data_gen_scan_node.tuple_id), _tuple_desc(nullptr), - _runtime_filter_descs(tnode.runtime_filters) {} + _runtime_filter_descs(tnode.runtime_filters) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state)); @@ -87,8 +89,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { std::shared_ptr<IRuntimeFilter> runtime_filter; - RETURN_IF_ERROR(state->register_consumer_runtime_filter( - filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter)); + RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, p.is_serial_operator(), + p.node_id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 1f7a21190ad..4c5fcd5efa7 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -116,9 +116,6 @@ public: } bool require_data_distribution() const override { return _is_colocate; } - bool require_shuffled_data_distribution() const override { - return _needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg); - } private: friend class DistinctStreamingAggLocalState; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 844e6decd64..c9eebc5d2e4 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -105,7 +105,9 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo std::vector<bool>(tnode.nullable_tuples.begin(), tnode.nullable_tuples.begin() + tnode.exchange_node.input_row_tuples.size())), - _offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0) {} + _offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::init(tnode, state)); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 0fe3dcbb590..c8ef674d269 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -81,7 +81,7 @@ public: [[nodiscard]] bool is_merging() const { return _is_merging; } DataDistribution required_data_distribution() const override { - if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) { + if (OperatorX<ExchangeLocalState>::is_serial_operator()) { return {ExchangeType::NOOP}; } return _partition_type == TPartitionType::HASH_PARTITIONED diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 69aa6843b84..83755d7f730 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -130,8 +130,8 @@ public: if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { - return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::PASS_TO_ONE) - : DataDistribution(ExchangeType::NOOP); + return _child->is_serial_operator() ? DataDistribution(ExchangeType::PASS_TO_ONE) + : DataDistribution(ExchangeType::NOOP); } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE @@ -139,9 +139,6 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - bool require_shuffled_data_distribution() const override { - return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join; - } bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 917c2692b44..7da7a3b238d 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -152,9 +152,6 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); } - bool require_shuffled_data_distribution() const override { - return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join; - } bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index f2ca259754b..d6e72799f97 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -76,8 +76,8 @@ public: if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } - return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST) - : DataDistribution(ExchangeType::NOOP); + return _child->is_serial_operator() ? DataDistribution(ExchangeType::BROADCAST) + : DataDistribution(ExchangeType::NOOP); } private: diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 6e3099db748..fb2dd828c39 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -74,6 +74,7 @@ #include "pipeline/exec/union_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" #include "pipeline/local_exchange/local_exchange_source_operator.h" +#include "pipeline/pipeline.h" #include "util/debug_util.h" #include "util/runtime_profile.h" #include "util/string_util.h" @@ -116,11 +117,16 @@ std::string PipelineXSinkLocalState<SharedStateArg>::name_suffix() { }() + ")"; } -DataDistribution DataSinkOperatorXBase::required_data_distribution() const { - return _child && _child->ignore_data_distribution() +DataDistribution OperatorBase::required_data_distribution() const { + return _child && _child->is_serial_operator() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } + +bool OperatorBase::require_shuffled_data_distribution() const { + return Pipeline::is_hash_exchange(required_data_distribution().distribution_type); +} + const RowDescriptor& OperatorBase::row_desc() const { return _child->row_desc(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 5df0a19498f..2a2b3fdd3b9 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -118,7 +118,8 @@ public: _followed_by_shuffled_operator = followed_by_shuffled_operator; } [[nodiscard]] virtual bool is_shuffled_operator() const { return false; } - [[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; } + [[nodiscard]] virtual DataDistribution required_data_distribution() const; + [[nodiscard]] virtual bool require_shuffled_data_distribution() const; protected: OperatorPtr _child = nullptr; @@ -483,7 +484,6 @@ public: } [[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0; - [[nodiscard]] virtual DataDistribution required_data_distribution() const; Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); @@ -496,8 +496,6 @@ public: [[nodiscard]] bool is_sink() const override { return true; } - [[nodiscard]] bool is_source() const override { return false; } - static Status close(RuntimeState* state, Status exec_status) { auto result = state->get_sink_local_state_result(); if (!result) { @@ -652,19 +650,7 @@ public: throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name); } [[nodiscard]] std::string get_name() const override { return _op_name; } - [[nodiscard]] virtual DataDistribution required_data_distribution() const { - return _child && _child->ignore_data_distribution() && !is_source() - ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataDistribution(ExchangeType::NOOP); - } - [[nodiscard]] virtual bool ignore_data_distribution() const { - return _child ? _child->ignore_data_distribution() : _ignore_data_distribution; - } - [[nodiscard]] bool ignore_data_hash_distribution() const { - return _child ? _child->ignore_data_hash_distribution() : _ignore_data_distribution; - } [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } - void set_ignore_data_distribution() { _ignore_data_distribution = true; } Status open(RuntimeState* state) override; @@ -735,8 +721,6 @@ public: bool has_output_row_desc() const { return _output_row_descriptor != nullptr; } - [[nodiscard]] bool is_source() const override { return false; } - [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos); @@ -779,7 +763,6 @@ protected: uint32_t _debug_point_count = 0; std::string _op_name; - bool _ignore_data_distribution = false; int _parallel_tasks = 0; //_keep_origin is used to avoid copying during projection, diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 6b3a74c83df..15f6b22387a 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -309,9 +309,6 @@ public: bool require_data_distribution() const override { return _agg_sink_operator->require_data_distribution(); } - bool require_shuffled_data_distribution() const override { - return _agg_sink_operator->require_shuffled_data_distribution(); - } Status set_child(OperatorPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 3aab11f62d8..f8fc0780b6f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -165,9 +165,6 @@ public: _distribution_partition_exprs)); } - bool require_shuffled_data_distribution() const override { - return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; - } bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index c768d7518b9..8e89763b50a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -115,9 +115,6 @@ public: _distribution_partition_exprs); } - bool require_shuffled_data_distribution() const override { - return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; - } bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4f3c97bab71..be940e8c89c 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -73,7 +73,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast<typename Derived::Parent>(); - RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, p.ignore_data_distribution())); + RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, p.is_serial_operator())); // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), @@ -990,7 +990,7 @@ Status ScanLocalState<Derived>::_start_scanners( auto& p = _parent->cast<typename Derived::Parent>(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.ignore_data_distribution()); + _scan_dependency, p.is_serial_operator()); return Status::OK(); } @@ -1145,6 +1145,8 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode& : OperatorX<LocalStateType>(pool, tnode, operator_id, descs), _runtime_filter_descs(tnode.runtime_filters), _parallel_tasks(parallel_tasks) { + OperatorX<LocalStateType>::_is_serial_operator = + tnode.__isset.is_serial_operator && tnode.is_serial_operator; if (tnode.__isset.push_down_count) { _push_down_count = tnode.push_down_count; } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index bf650cb8495..e4f8a828c6e 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -383,8 +383,8 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } DataDistribution required_data_distribution() const override { - if (OperatorX<LocalStateType>::ignore_data_distribution()) { - // `ignore_data_distribution()` returns true means we ignore the distribution. + if (OperatorX<LocalStateType>::is_serial_operator()) { + // `is_serial_operator()` returns true means we ignore the distribution. return {ExchangeType::NOOP}; } return {ExchangeType::BUCKET_HASH_SHUFFLE}; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index ab53f5358c2..f320c8e89cd 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -96,8 +96,6 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - bool require_shuffled_data_distribution() const override { return true; } - std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } private: diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 65c33795e5d..8e3c264f267 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -94,7 +94,6 @@ public: return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - bool require_shuffled_data_distribution() const override { return true; } private: template <class HashTableContext, bool is_intersected> diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 0829c38b40f..a5a24e37163 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -73,7 +73,6 @@ public: return {ExchangeType::NOOP}; } } - bool require_shuffled_data_distribution() const override { return _is_analytic_sort; } bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index c0da5c8120c..3c706d50182 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -81,9 +81,6 @@ public: bool is_source() const override { return true; } - // If input data distribution is ignored by this fragment, this first local exchange source in this fragment will re-assign all data. - bool ignore_data_distribution() const override { return false; } - private: friend class LocalExchangeSourceLocalState; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index da27a39772d..c5f99ca5d6a 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -226,7 +226,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); } } - } else if (_num_senders != _num_sources || _ignore_source_data_distribution) { + } else if (_num_senders != _num_sources) { // In this branch, data just should be distributed equally into all instances. new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index b3731638cb3..bf052ac3b92 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -218,24 +218,21 @@ public: protected: ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - bool ignore_source_data_distribution, int free_block_limit) + int free_block_limit) : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, - free_block_limit), - _ignore_source_data_distribution(ignore_source_data_distribution) { + free_block_limit) { _data_queue.resize(num_partitions); } Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, LocalExchangeSinkLocalState& local_state); - - const bool _ignore_source_data_distribution = false; }; class BucketShuffleExchanger final : public ShuffleExchanger { ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - bool ignore_source_data_distribution, int free_block_limit) + int free_block_limit) : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, - ignore_source_data_distribution, free_block_limit) {} + free_block_limit) {} ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 5b93fbdf1f8..96da754daa5 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -39,6 +39,7 @@ bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distrib [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { return false; } + // If all operators are serial and sink is not serial, we should improve parallelism for sink. if (std::all_of(_operators.begin(), _operators.end(), [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { if (!_sink->is_serial_operator()) { @@ -46,21 +47,22 @@ bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distrib } } else if (std::any_of(_operators.begin(), _operators.end(), [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + // If non-serial operators exist, we should improve parallelism for those. return true; } if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { + // Always do local exchange if non-hash-partition exchanger is required. + // For example, `PASSTHROUGH` exchanger is always required to distribute data evenly. return true; - } else if (_operators.front()->ignore_data_hash_distribution()) { - if (_data_distribution.distribution_type == target_data_distribution.distribution_type && - (_data_distribution.partition_exprs.empty() || - target_data_distribution.partition_exprs.empty())) { - return true; - } - return _data_distribution.distribution_type != target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); + } else if (_operators.front()->is_serial_operator()) { + DCHECK(std::all_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); }) && + _sink->is_serial_operator()) + << debug_string(); + // All operators and sink are serial in this path. + return false; } else { return _data_distribution.distribution_type != target_data_distribution.distribution_type && !(is_hash_exchange(_data_distribution.distribution_type) && @@ -71,7 +73,6 @@ bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distrib Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) { if (parallelism > 0 && op->is_serial_operator()) { set_num_tasks(parallelism); - op->set_ignore_data_distribution(); } op->set_parallel_tasks(num_tasks()); _operators.emplace_back(op); diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 9554537ca16..98e52ec5271 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -115,7 +115,7 @@ public: int num_tasks() const { return _num_tasks; } bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } - std::string debug_string() { + std::string debug_string() const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}]", _pipeline_id, diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ef856da5135..bd45016adf5 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -236,8 +236,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (request.__isset.query_options && request.query_options.__isset.execution_timeout) { _timeout = request.query_options.execution_timeout; } - _use_serial_source = - request.fragment.__isset.use_serial_source && request.fragment.use_serial_source; _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext"); _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime"); @@ -704,6 +702,9 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; + if (num_children == 0) { + _use_serial_source = op->is_serial_operator(); + } // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; @@ -736,8 +737,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, - const std::map<int, int>& shuffle_idx_to_instance_idx, - const bool ignore_data_hash_distribution) { + const std::map<int, int>& shuffle_idx_to_instance_idx) { auto& operators = cur_pipe->operators(); const auto downstream_pipeline_id = cur_pipe->id(); auto local_exchange_id = next_operator_id(); @@ -785,7 +785,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl( case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, - ignore_data_hash_distribution, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? cast_set<int>( _runtime_state->query_options().local_exchange_free_blocks_limit) @@ -922,8 +921,7 @@ Status PipelineFragmentContext::_add_local_exchange( int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, - const std::map<int, int>& shuffle_idx_to_instance_idx, - const bool ignore_data_distribution) { + const std::map<int, int>& shuffle_idx_to_instance_idx) { if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) { return Status::OK(); } @@ -938,7 +936,7 @@ Status PipelineFragmentContext::_add_local_exchange( auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); RETURN_IF_ERROR(_add_local_exchange_impl( idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size()) << "total_op_num: " << total_op_num @@ -952,7 +950,7 @@ Status PipelineFragmentContext::_add_local_exchange( cast_set<int>(new_pip->operators().size()), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets, bucket_seq_to_instance_idx, - shuffle_idx_to_instance_idx, ignore_data_distribution)); + shuffle_idx_to_instance_idx)); } return Status::OK(); } @@ -978,13 +976,8 @@ Status PipelineFragmentContext::_plan_local_exchange( // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0 // still keep colocate plan after local shuffle RETURN_IF_ERROR(_plan_local_exchange( - _pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution() || - num_buckets == 0 - ? _num_instances - : num_buckets, - pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx, - shuffle_idx_to_instance_idx, - _pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution())); + _use_serial_source || num_buckets == 0 ? _num_instances : num_buckets, pip_idx, + _pipelines[pip_idx], bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); } return Status::OK(); } @@ -992,8 +985,7 @@ Status PipelineFragmentContext::_plan_local_exchange( Status PipelineFragmentContext::_plan_local_exchange( int num_buckets, int pip_idx, PipelinePtr pip, const std::map<int, int>& bucket_seq_to_instance_idx, - const std::map<int, int>& shuffle_idx_to_instance_idx, - const bool ignore_data_hash_distribution) { + const std::map<int, int>& shuffle_idx_to_instance_idx) { int idx = 1; bool do_local_exchange = false; do { @@ -1005,8 +997,7 @@ Status PipelineFragmentContext::_plan_local_exchange( RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, - ignore_data_hash_distribution)); + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); } if (do_local_exchange) { // If local exchange is needed for current operator, we will split this pipeline to @@ -1023,8 +1014,7 @@ Status PipelineFragmentContext::_plan_local_exchange( RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip, pip->sink()->required_data_distribution(), &do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, - ignore_data_hash_distribution)); + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); } return Status::OK(); } @@ -1215,10 +1205,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {})); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - op->set_ignore_data_distribution(); - } break; } case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: { @@ -1229,10 +1215,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - op->set_ignore_data_distribution(); - } break; } case doris::TPlanNodeType::JDBC_SCAN_NODE: { @@ -1245,20 +1227,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo "Jdbc scan node is disabled, you can change be config enable_java_support " "to true and restart be."); } - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - op->set_ignore_data_distribution(); - } break; } case doris::TPlanNodeType::FILE_SCAN_NODE: { op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - op->set_ignore_data_distribution(); - } break; } case TPlanNodeType::ES_SCAN_NODE: @@ -1266,10 +1240,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - op->set_ignore_data_distribution(); - } break; } case TPlanNodeType::EXCHANGE_NODE: { @@ -1278,10 +1248,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); - if (request.__isset.parallel_instances) { - op->set_ignore_data_distribution(); - cur_pipe->set_num_tasks(request.parallel_instances); - } break; } case TPlanNodeType::AGGREGATION_NODE: { @@ -1643,10 +1609,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - op->set_ignore_data_distribution(); - } break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 6caa0e5c106..289f5c82365 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -153,22 +153,19 @@ private: const std::map<int, int>& shuffle_idx_to_instance_idx); Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip, const std::map<int, int>& bucket_seq_to_instance_idx, - const std::map<int, int>& shuffle_idx_to_instance_idx, - const bool ignore_data_distribution); + const std::map<int, int>& shuffle_idx_to_instance_idx); void _inherit_pipeline_properties(const DataDistribution& data_distribution, PipelinePtr pipe_with_source, PipelinePtr pipe_with_sink); Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, - const std::map<int, int>& shuffle_idx_to_instance_idx, - const bool ignore_data_distribution); + const std::map<int, int>& shuffle_idx_to_instance_idx); Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, - const std::map<int, int>& shuffle_idx_to_instance_idx, - const bool ignore_data_hash_distribution); + const std::map<int, int>& shuffle_idx_to_instance_idx); Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index c5a6ec55f63..0ebd023ed41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -342,7 +342,6 @@ public class PlanFragment extends TreeNode<PlanFragment> { // TODO chenhao , calculated by cost result.setMinReservationBytes(0); result.setInitialReservationTotalClaims(0); - result.setUseSerialSource(useSerialSource(ConnectContext.get())); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 0c755b9aae9..52ea334a142 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -4351,7 +4351,7 @@ public class SessionVariable implements Serializable, Writable { } public boolean isIgnoreStorageDataDistribution() { - return ignoreStorageDataDistribution && enableLocalShuffle; + return ignoreStorageDataDistribution && enableLocalShuffle && enableNereidsPlanner; } public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) { @@ -4389,7 +4389,7 @@ public class SessionVariable implements Serializable, Writable { } public boolean isForceToLocalShuffle() { - return enableLocalShuffle && forceToLocalShuffle; + return enableLocalShuffle && forceToLocalShuffle && enableNereidsPlanner; } public void setForceToLocalShuffle(boolean forceToLocalShuffle) { diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift index ffcc33638db..866d8d45320 100644 --- a/gensrc/thrift/Planner.thrift +++ b/gensrc/thrift/Planner.thrift @@ -64,10 +64,6 @@ struct TPlanFragment { 8: optional i64 initial_reservation_total_claims 9: optional QueryCache.TQueryCacheParam query_cache_param - - // Using serial source means a serial source operator will be used in this fragment (e.g. data will be shuffled to - // only 1 exchange operator) and then splitted by followed local exchanger - 10: optional bool use_serial_source } // location information for a single scan range diff --git a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy index 950b6171c7c..d701ad890d6 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy @@ -45,7 +45,7 @@ suite("local_shuffle") { insert into test_local_shuffle1 values (1, 1), (2, 2); insert into test_local_shuffle2 values (2, 2), (3, 3); - set enable_nereids_distribute_planner=true; + // set enable_nereids_distribute_planner=true; set enable_pipeline_x_engine=true; set disable_join_reorder=true; set enable_local_shuffle=true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org