This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 51a6b14eb6c [refactor](merger) Simplify sort merger (#47689) 51a6b14eb6c is described below commit 51a6b14eb6cf0e8edebc2d298b9549189ab891e9 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Tue Feb 11 14:13:20 2025 +0800 [refactor](merger) Simplify sort merger (#47689) 1. Delete unused variable in `VSortExecExprs`. 2. De-couple sort operator and local exchange operator. 3. Use local exchange 's profile to collect sort merger's metrics instead of sort operator's. --- be/src/pipeline/exec/exchange_source_operator.cpp | 2 +- be/src/pipeline/exec/sort_source_operator.cpp | 13 ----------- be/src/pipeline/exec/sort_source_operator.h | 5 ++-- be/src/pipeline/local_exchange/local_exchanger.cpp | 10 ++++++-- be/src/pipeline/local_exchange/local_exchanger.h | 16 +++++++++---- be/src/pipeline/pipeline_fragment_context.cpp | 5 +++- be/src/vec/common/sort/heap_sorter.cpp | 4 ++-- be/src/vec/common/sort/sorter.cpp | 4 ++-- be/src/vec/common/sort/vsort_exec_exprs.cpp | 27 +++++----------------- be/src/vec/common/sort/vsort_exec_exprs.h | 16 +++---------- be/test/vec/exec/sort/sort_test.cpp | 2 +- 11 files changed, 40 insertions(+), 64 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 4f12a8ef38e..762e108fd48 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -151,7 +151,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block if (_is_merging && !local_state.is_ready) { SCOPED_TIMER(local_state.create_merger_timer); RETURN_IF_ERROR(local_state.stream_recvr->create_merger( - local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first, + local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first, state->batch_size(), _limit, _offset)); local_state.is_ready = true; return Status::OK(); diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 2fb09d7278f..808f6533d6e 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -67,18 +67,5 @@ const vectorized::SortDescription& SortSourceOperatorX::get_sort_description( return local_state._shared_state->sorter->get_sort_description(); } -Status SortSourceOperatorX::build_merger(RuntimeState* state, - std::unique_ptr<vectorized::VSortedRunMerger>& merger, - RuntimeProfile* profile) { - // now only use in LocalMergeSortExchanger::get_block - vectorized::VSortExecExprs vsort_exec_exprs; - // clone vsort_exec_exprs in LocalMergeSortExchanger - RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs)); - merger = std::make_unique<vectorized::VSortedRunMerger>( - vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first, - state->batch_size(), _limit, _offset, profile); - return Status::OK(); -} - #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index a638b04b368..7902e4815bf 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -56,11 +56,10 @@ public: bool use_local_merge() const { return _merge_by_exchange; } const vectorized::SortDescription& get_sort_description(RuntimeState* state) const; - Status build_merger(RuntimeState* state, std::unique_ptr<vectorized::VSortedRunMerger>& merger, - RuntimeProfile* profile); - private: + friend class PipelineFragmentContext; friend class SortLocalState; + const bool _merge_by_exchange; std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index c768668acc4..0fb4db625a5 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -20,7 +20,6 @@ #include "common/cast_set.h" #include "common/status.h" #include "pipeline/exec/sort_sink_operator.h" -#include "pipeline/exec/sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" #include "pipeline/local_exchange/local_exchange_source_operator.h" #include "vec/runtime/partitioner.h" @@ -410,7 +409,14 @@ void LocalMergeSortExchanger::finalize() { Status LocalMergeSortExchanger::build_merger(RuntimeState* state, LocalExchangeSourceLocalState* local_state) { - RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state->profile())); + vectorized::VExprContextSPtrs ordering_expr_ctxs; + ordering_expr_ctxs.resize(_merge_info.ordering_expr_ctxs.size()); + for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(_merge_info.ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i])); + } + _merger = std::make_unique<vectorized::VSortedRunMerger>( + ordering_expr_ctxs, _merge_info.is_asc_order, _merge_info.nulls_first, + state->batch_size(), _merge_info.limit, _merge_info.offset, local_state->profile()); std::vector<vectorized::BlockSupplier> child_block_suppliers; for (int channel_id = 0; channel_id < _num_partitions; channel_id++) { vectorized::BlockSupplier block_supplier = [&, local_state, id = channel_id]( diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 90edeca07e8..7f87289e413 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -29,7 +29,6 @@ namespace pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; class BlockWrapper; -class SortSourceOperatorX; struct Profile { RuntimeProfile::Counter* compute_hash_value_timer = nullptr; @@ -335,11 +334,18 @@ public: class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> { public: + struct MergeInfo { + const std::vector<bool>& is_asc_order; + const std::vector<bool>& nulls_first; + const int64_t limit; + const int64_t offset; + const vectorized::VExprContextSPtrs& ordering_expr_ctxs; + }; ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger); - LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source, - int running_sink_operators, int num_partitions, int free_block_limit) + LocalMergeSortExchanger(MergeInfo&& merge_info, int running_sink_operators, int num_partitions, + int free_block_limit) : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit), - _sort_source(std::move(sort_source)) {} + _merge_info(std::move(merge_info)) {} ~LocalMergeSortExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, SinkInfo&& sink_info) override; @@ -355,7 +361,7 @@ public: private: std::unique_ptr<vectorized::VSortedRunMerger> _merger; - std::shared_ptr<SortSourceOperatorX> _sort_source; + MergeInfo _merge_info; std::vector<std::atomic_int64_t> _queues_mem_usege; }; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 30f1a437ff0..ac45128ad5f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -818,7 +818,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl( child_op->get_name()); } shared_state->exchanger = LocalMergeSortExchanger::create_unique( - sort_source, cur_pipe->num_tasks(), _num_instances, + LocalMergeSortExchanger::MergeInfo { + sort_source->_is_asc_order, sort_source->_nulls_first, sort_source->_limit, + sort_source->_offset, sort_source->_vsort_exec_exprs.ordering_expr_ctxs()}, + cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? cast_set<int>( _runtime_state->query_options().local_exchange_free_blocks_limit) diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index 01db368e980..f9e3f28cd93 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -195,9 +195,9 @@ void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t num_rows } Status HeapSorter::_prepare_sort_descs(Block* block) { - _sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size()); + _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size()); for (int i = 0; i < _sort_description.size(); i++) { - const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i]; + const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i]; RETURN_IF_ERROR(ordering_expr->execute(block, &_sort_description[i].column_number)); _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index f491311a8f7..82b59cd6717 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -180,10 +180,10 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) { dest_block.swap(new_block); } - _sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size()); + _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size()); Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block; for (int i = 0; i < _sort_description.size(); i++) { - const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i]; + const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i]; RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number)); _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp b/be/src/vec/common/sort/vsort_exec_exprs.cpp index cb3aaa6d654..4f5e44a12bd 100644 --- a/be/src/vec/common/sort/vsort_exec_exprs.cpp +++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp @@ -48,7 +48,7 @@ Status VSortExecExprs::init(const TSortInfo& sort_info, ObjectPool* pool) { Status VSortExecExprs::init(const std::vector<TExpr>& ordering_exprs, const std::vector<TExpr>* sort_tuple_slot_exprs, ObjectPool* pool) { - RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, _lhs_ordering_expr_ctxs)); + RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, _ordering_expr_ctxs)); if (sort_tuple_slot_exprs != NULL) { _materialize_tuple = true; RETURN_IF_ERROR( @@ -59,19 +59,12 @@ Status VSortExecExprs::init(const std::vector<TExpr>& ordering_exprs, return Status::OK(); } -Status VSortExecExprs::init(const VExprContextSPtrs& lhs_ordering_expr_ctxs, - const VExprContextSPtrs& rhs_ordering_expr_ctxs) { - _lhs_ordering_expr_ctxs = lhs_ordering_expr_ctxs; - _rhs_ordering_expr_ctxs = rhs_ordering_expr_ctxs; - return Status::OK(); -} - Status VSortExecExprs::prepare(RuntimeState* state, const RowDescriptor& child_row_desc, const RowDescriptor& output_row_desc) { if (_materialize_tuple) { RETURN_IF_ERROR(VExpr::prepare(_sort_tuple_slot_expr_ctxs, state, child_row_desc)); } - RETURN_IF_ERROR(VExpr::prepare(_lhs_ordering_expr_ctxs, state, output_row_desc)); + RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, output_row_desc)); return Status::OK(); } @@ -79,24 +72,16 @@ Status VSortExecExprs::open(RuntimeState* state) { if (_materialize_tuple) { RETURN_IF_ERROR(VExpr::open(_sort_tuple_slot_expr_ctxs, state)); } - RETURN_IF_ERROR(VExpr::open(_lhs_ordering_expr_ctxs, state)); - RETURN_IF_ERROR( - VExpr::clone_if_not_exists(_lhs_ordering_expr_ctxs, state, _rhs_ordering_expr_ctxs)); + RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state)); return Status::OK(); } void VSortExecExprs::close(RuntimeState* state) {} Status VSortExecExprs::clone(RuntimeState* state, VSortExecExprs& new_exprs) { - new_exprs._lhs_ordering_expr_ctxs.resize(_lhs_ordering_expr_ctxs.size()); - new_exprs._rhs_ordering_expr_ctxs.resize(_rhs_ordering_expr_ctxs.size()); - for (size_t i = 0; i < _lhs_ordering_expr_ctxs.size(); i++) { - RETURN_IF_ERROR( - _lhs_ordering_expr_ctxs[i]->clone(state, new_exprs._lhs_ordering_expr_ctxs[i])); - } - for (size_t i = 0; i < _rhs_ordering_expr_ctxs.size(); i++) { - RETURN_IF_ERROR( - _rhs_ordering_expr_ctxs[i]->clone(state, new_exprs._rhs_ordering_expr_ctxs[i])); + new_exprs._ordering_expr_ctxs.resize(_ordering_expr_ctxs.size()); + for (size_t i = 0; i < _ordering_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(_ordering_expr_ctxs[i]->clone(state, new_exprs._ordering_expr_ctxs[i])); } new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size()); for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) { diff --git a/be/src/vec/common/sort/vsort_exec_exprs.h b/be/src/vec/common/sort/vsort_exec_exprs.h index 8b6a11f9a33..c92c20f9e7d 100644 --- a/be/src/vec/common/sort/vsort_exec_exprs.h +++ b/be/src/vec/common/sort/vsort_exec_exprs.h @@ -58,10 +58,7 @@ public: } // Can only be used after calling prepare() - const VExprContextSPtrs& lhs_ordering_expr_ctxs() const { return _lhs_ordering_expr_ctxs; } - - // Can only be used after calling open() - const VExprContextSPtrs& rhs_ordering_expr_ctxs() const { return _rhs_ordering_expr_ctxs; } + const VExprContextSPtrs& ordering_expr_ctxs() const { return _ordering_expr_ctxs; } bool need_materialize_tuple() const { return _materialize_tuple; } @@ -73,8 +70,7 @@ public: private: // Create two VExprContexts for evaluating over the TupleRows. - VExprContextSPtrs _lhs_ordering_expr_ctxs; - VExprContextSPtrs _rhs_ordering_expr_ctxs; + VExprContextSPtrs _ordering_expr_ctxs; // If true, the tuples to be sorted are materialized by // _sort_tuple_slot_exprs before the actual sort is performed. @@ -85,16 +81,10 @@ private: // _materialize_tuple is true. VExprContextSPtrs _sort_tuple_slot_expr_ctxs; - // for some reason, _sort_tuple_slot_expr_ctxs is not-null but _lhs_ordering_expr_ctxs is nullable + // for some reason, _sort_tuple_slot_expr_ctxs is not-null but _ordering_expr_ctxs is nullable // this flag list would be used to convert column to nullable. std::vector<bool> _need_convert_to_nullable_flags; - // Initialize directly from already-created VExprContexts. Callers should manually call - // Prepare(), Open(), and Close() on input VExprContexts (instead of calling the - // analogous functions in this class). Used for testing. - Status init(const VExprContextSPtrs& lhs_ordering_expr_ctxs, - const VExprContextSPtrs& rhs_ordering_expr_ctxs); - // Initialize the ordering and (optionally) materialization expressions from the thrift // TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the tuple is not // materialized. diff --git a/be/test/vec/exec/sort/sort_test.cpp b/be/test/vec/exec/sort/sort_test.cpp index a9e06507f40..e774bcdb037 100644 --- a/be/test/vec/exec/sort/sort_test.cpp +++ b/be/test/vec/exec/sort/sort_test.cpp @@ -59,7 +59,7 @@ public: sort_exec_exprs._materialize_tuple = false; - sort_exec_exprs._lhs_ordering_expr_ctxs.push_back( + sort_exec_exprs._ordering_expr_ctxs.push_back( VExprContext::create_shared(std::make_shared<MockSlotRef>(0))); switch (sort_type) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org