This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 51a6b14eb6c [refactor](merger) Simplify sort merger (#47689)
51a6b14eb6c is described below

commit 51a6b14eb6cf0e8edebc2d298b9549189ab891e9
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Tue Feb 11 14:13:20 2025 +0800

    [refactor](merger) Simplify sort merger (#47689)
    
    1. Delete unused variable in `VSortExecExprs`.
    2. De-couple sort operator and local exchange operator.
    3. Use local exchange 's profile to collect sort merger's metrics
    instead of sort operator's.
---
 be/src/pipeline/exec/exchange_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/sort_source_operator.cpp      | 13 -----------
 be/src/pipeline/exec/sort_source_operator.h        |  5 ++--
 be/src/pipeline/local_exchange/local_exchanger.cpp | 10 ++++++--
 be/src/pipeline/local_exchange/local_exchanger.h   | 16 +++++++++----
 be/src/pipeline/pipeline_fragment_context.cpp      |  5 +++-
 be/src/vec/common/sort/heap_sorter.cpp             |  4 ++--
 be/src/vec/common/sort/sorter.cpp                  |  4 ++--
 be/src/vec/common/sort/vsort_exec_exprs.cpp        | 27 +++++-----------------
 be/src/vec/common/sort/vsort_exec_exprs.h          | 16 +++----------
 be/test/vec/exec/sort/sort_test.cpp                |  2 +-
 11 files changed, 40 insertions(+), 64 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 4f12a8ef38e..762e108fd48 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -151,7 +151,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
     if (_is_merging && !local_state.is_ready) {
         SCOPED_TIMER(local_state.create_merger_timer);
         RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
-                local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), 
_is_asc_order, _nulls_first,
+                local_state.vsort_exec_exprs.ordering_expr_ctxs(), 
_is_asc_order, _nulls_first,
                 state->batch_size(), _limit, _offset));
         local_state.is_ready = true;
         return Status::OK();
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 2fb09d7278f..808f6533d6e 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -67,18 +67,5 @@ const vectorized::SortDescription& 
SortSourceOperatorX::get_sort_description(
     return local_state._shared_state->sorter->get_sort_description();
 }
 
-Status SortSourceOperatorX::build_merger(RuntimeState* state,
-                                         
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
-                                         RuntimeProfile* profile) {
-    // now only use in LocalMergeSortExchanger::get_block
-    vectorized::VSortExecExprs vsort_exec_exprs;
-    // clone vsort_exec_exprs in LocalMergeSortExchanger
-    RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs));
-    merger = std::make_unique<vectorized::VSortedRunMerger>(
-            vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, 
_nulls_first,
-            state->batch_size(), _limit, _offset, profile);
-    return Status::OK();
-}
-
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h 
b/be/src/pipeline/exec/sort_source_operator.h
index a638b04b368..7902e4815bf 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -56,11 +56,10 @@ public:
     bool use_local_merge() const { return _merge_by_exchange; }
     const vectorized::SortDescription& get_sort_description(RuntimeState* 
state) const;
 
-    Status build_merger(RuntimeState* state, 
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
-                        RuntimeProfile* profile);
-
 private:
+    friend class PipelineFragmentContext;
     friend class SortLocalState;
+
     const bool _merge_by_exchange;
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index c768668acc4..0fb4db625a5 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -20,7 +20,6 @@
 #include "common/cast_set.h"
 #include "common/status.h"
 #include "pipeline/exec/sort_sink_operator.h"
-#include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_source_operator.h"
 #include "vec/runtime/partitioner.h"
@@ -410,7 +409,14 @@ void LocalMergeSortExchanger::finalize() {
 
 Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
                                              LocalExchangeSourceLocalState* 
local_state) {
-    RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, 
local_state->profile()));
+    vectorized::VExprContextSPtrs ordering_expr_ctxs;
+    ordering_expr_ctxs.resize(_merge_info.ordering_expr_ctxs.size());
+    for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(_merge_info.ordering_expr_ctxs[i]->clone(state, 
ordering_expr_ctxs[i]));
+    }
+    _merger = std::make_unique<vectorized::VSortedRunMerger>(
+            ordering_expr_ctxs, _merge_info.is_asc_order, 
_merge_info.nulls_first,
+            state->batch_size(), _merge_info.limit, _merge_info.offset, 
local_state->profile());
     std::vector<vectorized::BlockSupplier> child_block_suppliers;
     for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
         vectorized::BlockSupplier block_supplier = [&, local_state, id = 
channel_id](
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 90edeca07e8..7f87289e413 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -29,7 +29,6 @@ namespace pipeline {
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 class BlockWrapper;
-class SortSourceOperatorX;
 
 struct Profile {
     RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
@@ -335,11 +334,18 @@ public:
 
 class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
 public:
+    struct MergeInfo {
+        const std::vector<bool>& is_asc_order;
+        const std::vector<bool>& nulls_first;
+        const int64_t limit;
+        const int64_t offset;
+        const vectorized::VExprContextSPtrs& ordering_expr_ctxs;
+    };
     ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
-    LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
-                            int running_sink_operators, int num_partitions, 
int free_block_limit)
+    LocalMergeSortExchanger(MergeInfo&& merge_info, int 
running_sink_operators, int num_partitions,
+                            int free_block_limit)
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit),
-              _sort_source(std::move(sort_source)) {}
+              _merge_info(std::move(merge_info)) {}
     ~LocalMergeSortExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
                 SinkInfo&& sink_info) override;
@@ -355,7 +361,7 @@ public:
 
 private:
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
-    std::shared_ptr<SortSourceOperatorX> _sort_source;
+    MergeInfo _merge_info;
     std::vector<std::atomic_int64_t> _queues_mem_usege;
 };
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 30f1a437ff0..ac45128ad5f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -818,7 +818,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                     child_op->get_name());
         }
         shared_state->exchanger = LocalMergeSortExchanger::create_unique(
-                sort_source, cur_pipe->num_tasks(), _num_instances,
+                LocalMergeSortExchanger::MergeInfo {
+                        sort_source->_is_asc_order, sort_source->_nulls_first, 
sort_source->_limit,
+                        sort_source->_offset, 
sort_source->_vsort_exec_exprs.ordering_expr_ctxs()},
+                cur_pipe->num_tasks(), _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? cast_set<int>(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
diff --git a/be/src/vec/common/sort/heap_sorter.cpp 
b/be/src/vec/common/sort/heap_sorter.cpp
index 01db368e980..f9e3f28cd93 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -195,9 +195,9 @@ void HeapSorter::_do_filter(HeapSortCursorBlockView& 
block_view, size_t num_rows
 }
 
 Status HeapSorter::_prepare_sort_descs(Block* block) {
-    
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
+    _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
     for (int i = 0; i < _sort_description.size(); i++) {
-        const auto& ordering_expr = 
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
+        const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
         RETURN_IF_ERROR(ordering_expr->execute(block, 
&_sort_description[i].column_number));
 
         _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index f491311a8f7..82b59cd6717 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -180,10 +180,10 @@ Status Sorter::partial_sort(Block& src_block, Block& 
dest_block) {
         dest_block.swap(new_block);
     }
 
-    
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
+    _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
     Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block;
     for (int i = 0; i < _sort_description.size(); i++) {
-        const auto& ordering_expr = 
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
+        const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
         RETURN_IF_ERROR(ordering_expr->execute(result_block, 
&_sort_description[i].column_number));
 
         _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp 
b/be/src/vec/common/sort/vsort_exec_exprs.cpp
index cb3aaa6d654..4f5e44a12bd 100644
--- a/be/src/vec/common/sort/vsort_exec_exprs.cpp
+++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp
@@ -48,7 +48,7 @@ Status VSortExecExprs::init(const TSortInfo& sort_info, 
ObjectPool* pool) {
 
 Status VSortExecExprs::init(const std::vector<TExpr>& ordering_exprs,
                             const std::vector<TExpr>* sort_tuple_slot_exprs, 
ObjectPool* pool) {
-    RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, 
_lhs_ordering_expr_ctxs));
+    RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, 
_ordering_expr_ctxs));
     if (sort_tuple_slot_exprs != NULL) {
         _materialize_tuple = true;
         RETURN_IF_ERROR(
@@ -59,19 +59,12 @@ Status VSortExecExprs::init(const std::vector<TExpr>& 
ordering_exprs,
     return Status::OK();
 }
 
-Status VSortExecExprs::init(const VExprContextSPtrs& lhs_ordering_expr_ctxs,
-                            const VExprContextSPtrs& rhs_ordering_expr_ctxs) {
-    _lhs_ordering_expr_ctxs = lhs_ordering_expr_ctxs;
-    _rhs_ordering_expr_ctxs = rhs_ordering_expr_ctxs;
-    return Status::OK();
-}
-
 Status VSortExecExprs::prepare(RuntimeState* state, const RowDescriptor& 
child_row_desc,
                                const RowDescriptor& output_row_desc) {
     if (_materialize_tuple) {
         RETURN_IF_ERROR(VExpr::prepare(_sort_tuple_slot_expr_ctxs, state, 
child_row_desc));
     }
-    RETURN_IF_ERROR(VExpr::prepare(_lhs_ordering_expr_ctxs, state, 
output_row_desc));
+    RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, 
output_row_desc));
     return Status::OK();
 }
 
@@ -79,24 +72,16 @@ Status VSortExecExprs::open(RuntimeState* state) {
     if (_materialize_tuple) {
         RETURN_IF_ERROR(VExpr::open(_sort_tuple_slot_expr_ctxs, state));
     }
-    RETURN_IF_ERROR(VExpr::open(_lhs_ordering_expr_ctxs, state));
-    RETURN_IF_ERROR(
-            VExpr::clone_if_not_exists(_lhs_ordering_expr_ctxs, state, 
_rhs_ordering_expr_ctxs));
+    RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));
     return Status::OK();
 }
 
 void VSortExecExprs::close(RuntimeState* state) {}
 
 Status VSortExecExprs::clone(RuntimeState* state, VSortExecExprs& new_exprs) {
-    new_exprs._lhs_ordering_expr_ctxs.resize(_lhs_ordering_expr_ctxs.size());
-    new_exprs._rhs_ordering_expr_ctxs.resize(_rhs_ordering_expr_ctxs.size());
-    for (size_t i = 0; i < _lhs_ordering_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(
-                _lhs_ordering_expr_ctxs[i]->clone(state, 
new_exprs._lhs_ordering_expr_ctxs[i]));
-    }
-    for (size_t i = 0; i < _rhs_ordering_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(
-                _rhs_ordering_expr_ctxs[i]->clone(state, 
new_exprs._rhs_ordering_expr_ctxs[i]));
+    new_exprs._ordering_expr_ctxs.resize(_ordering_expr_ctxs.size());
+    for (size_t i = 0; i < _ordering_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(_ordering_expr_ctxs[i]->clone(state, 
new_exprs._ordering_expr_ctxs[i]));
     }
     
new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size());
     for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) {
diff --git a/be/src/vec/common/sort/vsort_exec_exprs.h 
b/be/src/vec/common/sort/vsort_exec_exprs.h
index 8b6a11f9a33..c92c20f9e7d 100644
--- a/be/src/vec/common/sort/vsort_exec_exprs.h
+++ b/be/src/vec/common/sort/vsort_exec_exprs.h
@@ -58,10 +58,7 @@ public:
     }
 
     // Can only be used after calling prepare()
-    const VExprContextSPtrs& lhs_ordering_expr_ctxs() const { return 
_lhs_ordering_expr_ctxs; }
-
-    // Can only be used after calling open()
-    const VExprContextSPtrs& rhs_ordering_expr_ctxs() const { return 
_rhs_ordering_expr_ctxs; }
+    const VExprContextSPtrs& ordering_expr_ctxs() const { return 
_ordering_expr_ctxs; }
 
     bool need_materialize_tuple() const { return _materialize_tuple; }
 
@@ -73,8 +70,7 @@ public:
 
 private:
     // Create two VExprContexts for evaluating over the TupleRows.
-    VExprContextSPtrs _lhs_ordering_expr_ctxs;
-    VExprContextSPtrs _rhs_ordering_expr_ctxs;
+    VExprContextSPtrs _ordering_expr_ctxs;
 
     // If true, the tuples to be sorted are materialized by
     // _sort_tuple_slot_exprs before the actual sort is performed.
@@ -85,16 +81,10 @@ private:
     // _materialize_tuple is true.
     VExprContextSPtrs _sort_tuple_slot_expr_ctxs;
 
-    // for some reason, _sort_tuple_slot_expr_ctxs is not-null but 
_lhs_ordering_expr_ctxs is nullable
+    // for some reason, _sort_tuple_slot_expr_ctxs is not-null but 
_ordering_expr_ctxs is nullable
     // this flag list would be used to convert column to nullable.
     std::vector<bool> _need_convert_to_nullable_flags;
 
-    // Initialize directly from already-created VExprContexts. Callers should 
manually call
-    // Prepare(), Open(), and Close() on input VExprContexts (instead of 
calling the
-    // analogous functions in this class). Used for testing.
-    Status init(const VExprContextSPtrs& lhs_ordering_expr_ctxs,
-                const VExprContextSPtrs& rhs_ordering_expr_ctxs);
-
     // Initialize the ordering and (optionally) materialization expressions 
from the thrift
     // TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the 
tuple is not
     // materialized.
diff --git a/be/test/vec/exec/sort/sort_test.cpp 
b/be/test/vec/exec/sort/sort_test.cpp
index a9e06507f40..e774bcdb037 100644
--- a/be/test/vec/exec/sort/sort_test.cpp
+++ b/be/test/vec/exec/sort/sort_test.cpp
@@ -59,7 +59,7 @@ public:
 
         sort_exec_exprs._materialize_tuple = false;
 
-        sort_exec_exprs._lhs_ordering_expr_ctxs.push_back(
+        sort_exec_exprs._ordering_expr_ctxs.push_back(
                 VExprContext::create_shared(std::make_shared<MockSlotRef>(0)));
 
         switch (sort_type) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to