This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a3279a26044 [refine](exchange) Use is_merge from FE for judgment instead of relying on the operator in BE. (#45592) a3279a26044 is described below commit a3279a260441692cd0494c0eb8a93e5f97aa2945 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Thu Dec 19 17:04:03 2024 +0800 [refine](exchange) Use is_merge from FE for judgment instead of relying on the operator in BE. (#45592) ### What problem does this PR solve? Previously, determining whether the receiver is a merge exchange relied on checking if the specific operator was a sort node. However, this approach is incorrect because there are many types of sort operators: regular sort, partitioned sort, and spill sort. --- be/src/pipeline/exec/exchange_sink_operator.cpp | 10 ++-------- be/src/pipeline/exec/exchange_sink_operator.h | 4 +++- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e7fed76be8f..cc789f6e25b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -32,7 +32,6 @@ #include "pipeline/exec/operator.h" #include "pipeline/exec/sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" -#include "pipeline/local_exchange/local_exchange_source_operator.h" #include "pipeline/pipeline_fragment_context.h" #include "util/runtime_profile.h" #include "util/uid_util.h" @@ -279,6 +278,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_txn_id(sink.tablet_sink_txn_id), _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()), + _dest_is_merge(sink.__isset.is_merge && sink.is_merge), _fragment_instance_ids(fragment_instance_ids) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || @@ -571,19 +571,13 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer( // Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs. // (Note: This does not reduce the total number of RPCs.) // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. -/// TODO: Modify this to let FE handle the judgment instead of BE. std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer( InstanceLoId sender_ins_id) { - if (!_child) { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "ExchangeSinkOperatorX did not correctly set the child."); - } // When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX, // it is an order-by scenario. // In this case, there is only one target instance, and no n * n RPC concurrency will occur. // Therefore, sharing a sink buffer is not necessary. - if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) || - std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) { + if (_dest_is_merge) { return _create_buffer({sender_ins_id}); } if (_state->enable_shared_exchange_sink_buffer()) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 85575beb9f7..3d6eeb4b39e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -205,7 +205,6 @@ public: // Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs. // (Note: This does not reduce the total number of RPCs.) // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. - /// TODO: Modify this to let FE handle the judgment instead of BE. std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId sender_ins_id); vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; } @@ -260,6 +259,9 @@ private: size_t _data_processed = 0; int _writer_count = 1; const bool _enable_local_merge_sort; + // If dest_is_merge is true, it indicates that the corresponding receiver is a VMERGING-EXCHANGE. + // The receiver will sort the collected data, so the sender must ensure that the data sent is ordered. + const bool _dest_is_merge; const std::vector<TUniqueId>& _fragment_instance_ids; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org