This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 96c4fcfb206 [improve](node) refactor partition sort node to reduce 
memory use
96c4fcfb206 is described below

commit 96c4fcfb2064bc6ddc3d0f79d674ae07b3de252a
Author: zhangstar333 <2561612...@qq.com>
AuthorDate: Fri Jan 26 14:21:18 2024 +0800

    [improve](node) refactor partition sort node to reduce memory use
    
    pipelineX
---
 .../pipeline/exec/partition_sort_sink_operator.cpp | 13 +++-
 .../pipeline/exec/partition_sort_sink_operator.h   |  1 +
 be/src/vec/exec/vpartition_sort_node.cpp           | 77 +++++++++++++++++----
 be/src/vec/exec/vpartition_sort_node.h             | 80 +++++++++++++++++++---
 4 files changed, 148 insertions(+), 23 deletions(-)

diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index c09a6a90b95..1fe25ea1910 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -44,6 +44,10 @@ Status PartitionSortSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     _build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
     _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
     _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
+    _partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
+            &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, 
p._nulls_first,
+            p._child_x->row_desc(), state, _profile, p._has_global_limit, 
p._partition_inner_limit,
+            p._top_n_algorithm, _shared_state->previous_row.get(), 
p._topn_phase);
     _init_hash_method();
     return Status::OK();
 }
@@ -100,7 +104,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state.child_input_rows = local_state.child_input_rows + 
current_rows;
         if (UNLIKELY(_partition_exprs_num == 0)) {
             if (UNLIKELY(local_state._value_places.empty())) {
-                local_state._value_places.push_back(_pool->add(new 
vectorized::PartitionBlocks()));
+                local_state._value_places.push_back(_pool->add(new 
vectorized::PartitionBlocks(
+                        local_state._partition_sort_info, 
local_state._value_places.empty())));
             }
             //no partition key
             local_state._value_places[0]->append_whole_block(input_block, 
_child_x->row_desc());
@@ -187,13 +192,15 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table(
 
                 auto creator = [&](const auto& ctor, auto& key, auto& origin) {
                     HashMethodType::try_presis_key(key, origin, 
*local_state._agg_arena_pool);
-                    auto* aggregate_data = _pool->add(new 
vectorized::PartitionBlocks());
+                    auto* aggregate_data = _pool->add(new 
vectorized::PartitionBlocks(
+                            local_state._partition_sort_info, 
local_state._value_places.empty()));
                     local_state._value_places.push_back(aggregate_data);
                     ctor(key, aggregate_data);
                     local_state._num_partition++;
                 };
                 auto creator_for_null_key = [&](auto& mapped) {
-                    mapped = _pool->add(new vectorized::PartitionBlocks());
+                    mapped = _pool->add(new vectorized::PartitionBlocks(
+                            local_state._partition_sort_info, 
local_state._value_places.empty()));
                     local_state._value_places.push_back(mapped);
                     local_state._num_partition++;
                 };
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index a46dc46433c..21145189fc7 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -81,6 +81,7 @@ private:
     std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool;
     int _partition_exprs_num = 0;
+    std::shared_ptr<vectorized::PartitionSortInfo> _partition_sort_info = 
nullptr;
 
     RuntimeProfile::Counter* _build_timer = nullptr;
     RuntimeProfile::Counter* _emplace_key_timer = nullptr;
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp 
b/be/src/vec/exec/vpartition_sort_node.cpp
index 95c0abd72a8..694aa99a6cd 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -33,10 +33,50 @@
 #include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/hash_table/hash_set.h"
 #include "vec/common/hash_table/partitioned_hash_map.h"
+#include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
+Status PartitionBlocks::do_partition_topn_sort() {
+    if (_partition_topn_sorter == nullptr) {
+        _partition_topn_sorter = PartitionSorter::create_unique(
+                *_partition_sort_info->_vsort_exec_exprs, 
_partition_sort_info->_limit,
+                _partition_sort_info->_offset, _partition_sort_info->_pool,
+                _partition_sort_info->_is_asc_order, 
_partition_sort_info->_nulls_first,
+                _partition_sort_info->_row_desc, 
_partition_sort_info->_runtime_state, nullptr,
+                _partition_sort_info->_has_global_limit,
+                _partition_sort_info->_partition_inner_limit,
+                _partition_sort_info->_top_n_algorithm, 
_partition_sort_info->_previous_row);
+    }
+
+    for (const auto& block : blocks) {
+        RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get()));
+    }
+    blocks.clear();
+    
_partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile);
+    RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
+    bool current_eos = false;
+    size_t current_output_rows = 0;
+    while (!current_eos) {
+        // output_block maybe need better way
+        auto output_block = Block::create_unique(
+                
VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc));
+        
RETURN_IF_ERROR(_partition_topn_sorter->get_next(_partition_sort_info->_runtime_state,
+                                                         output_block.get(), 
&current_eos));
+        auto rows = output_block->rows();
+        if (rows > 0) {
+            current_output_rows += rows;
+            blocks.emplace_back(std::move(output_block));
+        }
+    }
+
+    _topn_filter_rows += (_current_input_rows - current_output_rows);
+    _partition_sort_info->_previous_row->reset();
+    _partition_topn_sorter.reset(nullptr);
+
+    return Status::OK();
+}
 
 VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& 
tnode,
                                        const DescriptorTbl& descs)
@@ -79,6 +119,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
     _get_sorted_timer = ADD_TIMER(runtime_profile(), "GetSortedTime");
     _selector_block_timer = ADD_TIMER(runtime_profile(), "SelectorBlockTime");
     _emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime");
+    runtime_profile()->add_info_string("CurrentTopNPhase", 
std::to_string(_topn_phase));
 
     RETURN_IF_ERROR(ExecNode::prepare(state));
     SCOPED_TIMER(_exec_timer);
@@ -86,6 +127,10 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
child(0)->row_desc()));
     _init_hash_method();
 
+    _partition_sort_info = std::make_shared<PartitionSortInfo>(
+            &_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, 
child(0)->row_desc(),
+            state, _runtime_profile.get(), _has_global_limit, 
_partition_inner_limit,
+            _top_n_algorithm, _previous_row.get(), _topn_phase);
     return Status::OK();
 }
 
@@ -116,13 +161,15 @@ void VPartitionSortNode::_emplace_into_hash_table(const 
ColumnRawPtrs& key_colum
 
                 auto creator = [&](const auto& ctor, auto& key, auto& origin) {
                     HashMethodType::try_presis_key(key, origin, 
*_agg_arena_pool);
-                    auto* aggregate_data = _pool->add(new PartitionBlocks());
+                    auto* aggregate_data = _pool->add(
+                            new PartitionBlocks(_partition_sort_info, 
_value_places.empty()));
                     _value_places.push_back(aggregate_data);
                     ctor(key, aggregate_data);
                     _num_partition++;
                 };
                 auto creator_for_null_key = [&](auto& mapped) {
-                    mapped = _pool->add(new PartitionBlocks());
+                    mapped = _pool->add(
+                            new PartitionBlocks(_partition_sort_info, 
_value_places.empty()));
                     _value_places.push_back(mapped);
                     _num_partition++;
                 };
@@ -135,7 +182,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const 
ColumnRawPtrs& key_colum
                 }
 
                 SCOPED_TIMER(_selector_block_timer);
-                for (auto place : _value_places) {
+                for (auto* place : _value_places) {
                     place->append_block_by_selector(input_block, 
child(0)->row_desc(),
                                                     _has_global_limit, 
_partition_inner_limit,
                                                     batch_size);
@@ -151,7 +198,8 @@ Status VPartitionSortNode::sink(RuntimeState* state, 
vectorized::Block* input_bl
         child_input_rows = child_input_rows + current_rows;
         if (UNLIKELY(_partition_exprs_num == 0)) {
             if (UNLIKELY(_value_places.empty())) {
-                _value_places.push_back(_pool->add(new PartitionBlocks()));
+                _value_places.push_back(_pool->add(
+                        new PartitionBlocks(_partition_sort_info, 
_value_places.empty())));
             }
             //no partition key
             _value_places[0]->append_whole_block(input_block, 
child(0)->row_desc());
@@ -195,9 +243,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, 
vectorized::Block* input_bl
             RETURN_IF_ERROR(sorter->prepare_for_read());
             _partition_sorts.push_back(std::move(sorter));
         }
-        if (state->enable_profile()) {
-            debug_profile();
-        }
+
         COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
         //so all data from child have sink completed
         _can_read = true;
@@ -312,14 +358,15 @@ Status VPartitionSortNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
-    if (state->enable_profile()) {
-        debug_profile();
-    }
+
     return ExecNode::close(state);
 }
 
 void VPartitionSortNode::release_resource(RuntimeState* state) {
     _vsort_exec_exprs.close(state);
+    if (state->enable_profile()) {
+        debug_profile();
+    }
     ExecNode::release_resource(state);
 }
 
@@ -389,19 +436,25 @@ void VPartitionSortNode::_init_hash_method() {
 }
 
 void VPartitionSortNode::debug_profile() {
-    fmt::memory_buffer partition_rows_read, partition_blocks_read;
+    fmt::memory_buffer partition_rows_read, partition_blocks_read, 
partition_filter_rows;
     fmt::format_to(partition_rows_read, "[");
     fmt::format_to(partition_blocks_read, "[");
-    for (auto place : _value_places) {
+    fmt::format_to(partition_filter_rows, "[");
+
+    for (auto* place : _value_places) {
         fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
+        fmt::format_to(partition_filter_rows, "{}, ", 
place->get_topn_filter_rows());
         fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size());
     }
     fmt::format_to(partition_rows_read, "]");
     fmt::format_to(partition_blocks_read, "]");
+    fmt::format_to(partition_filter_rows, "]");
 
     runtime_profile()->add_info_string("PerPartitionBlocksRead",
                                        fmt::to_string(partition_blocks_read));
     runtime_profile()->add_info_string("PerPartitionRowsRead", 
fmt::to_string(partition_rows_read));
+    runtime_profile()->add_info_string("PerPartitionFilterRows",
+                                       fmt::to_string(partition_filter_rows));
     fmt::memory_buffer partition_output_rows;
     fmt::format_to(partition_output_rows, "[");
     for (auto row : partition_profile_output_rows) {
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
index d369846df59..1ee7ffe26cb 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -23,6 +23,7 @@
 #include <memory>
 #include <mutex>
 
+#include "common/status.h"
 #include "exec/exec_node.h"
 #include "vec/columns/column.h"
 #include "vec/common/columns_hashing.h"
@@ -37,10 +38,55 @@
 namespace doris {
 namespace vectorized {
 static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20;
+static constexpr size_t PARTITION_SORT_ROWS_THRESHOLD = 20000;
+
+struct PartitionSortInfo {
+    ~PartitionSortInfo() = default;
+
+    PartitionSortInfo(VSortExecExprs* vsort_exec_exprs, int64_t limit, int64_t 
offset,
+                      ObjectPool* pool, const std::vector<bool>& is_asc_order,
+                      const std::vector<bool>& nulls_first, const 
RowDescriptor& row_desc,
+                      RuntimeState* runtime_state, RuntimeProfile* 
runtime_profile,
+                      bool has_global_limit, int64_t partition_inner_limit,
+                      TopNAlgorithm::type top_n_algorithm, SortCursorCmp* 
previous_row,
+                      TPartTopNPhase::type topn_phase)
+            : _vsort_exec_exprs(vsort_exec_exprs),
+              _limit(limit),
+              _offset(offset),
+              _pool(pool),
+              _is_asc_order(is_asc_order),
+              _nulls_first(nulls_first),
+              _row_desc(row_desc),
+              _runtime_state(runtime_state),
+              _runtime_profile(runtime_profile),
+              _has_global_limit(has_global_limit),
+              _partition_inner_limit(partition_inner_limit),
+              _top_n_algorithm(top_n_algorithm),
+              _previous_row(previous_row),
+              _topn_phase(topn_phase) {}
+
+public:
+    VSortExecExprs* _vsort_exec_exprs = nullptr;
+    int64_t _limit = -1;
+    int64_t _offset = -1;
+    ObjectPool* _pool = nullptr;
+    std::vector<bool> _is_asc_order;
+    std::vector<bool> _nulls_first;
+    const RowDescriptor& _row_desc;
+    RuntimeState* _runtime_state = nullptr;
+    RuntimeProfile* _runtime_profile = nullptr;
+    bool _has_global_limit = false;
+    int64_t _partition_inner_limit = 0;
+    TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
+    SortCursorCmp* _previous_row = nullptr;
+    TPartTopNPhase::type _topn_phase = TPartTopNPhase::TWO_PHASE_GLOBAL;
+};
 
 struct PartitionBlocks {
 public:
-    PartitionBlocks() = default;
+    PartitionBlocks() = default; //should fixed in pipelineX
+    PartitionBlocks(std::shared_ptr<PartitionSortInfo> partition_sort_info, 
bool is_first_sorter)
+            : _is_first_sorter(is_first_sorter), 
_partition_sort_info(partition_sort_info) {}
     ~PartitionBlocks() = default;
 
     void add_row_idx(size_t row) { selector.push_back(row); }
@@ -49,7 +95,7 @@ public:
                                   const RowDescriptor& row_desc, bool is_limit,
                                   int64_t partition_inner_limit, int 
batch_size) {
         if (blocks.empty() || reach_limit()) {
-            init_rows = batch_size;
+            _init_rows = batch_size;
             
blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc)));
         }
         auto columns = input_block->get_columns();
@@ -59,11 +105,21 @@ public:
             columns[i]->append_data_by_selector(mutable_columns[i], selector);
         }
         blocks.back()->set_columns(std::move(mutable_columns));
-        init_rows = init_rows - selector.size();
-        total_rows = total_rows + selector.size();
+        auto selector_rows = selector.size();
+        _init_rows = _init_rows - selector_rows;
+        _total_rows = _total_rows + selector_rows;
+        _current_input_rows = _current_input_rows + selector_rows;
         selector.clear();
+        // maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
+        if (_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
+            _partition_sort_info->_topn_phase != 
TPartTopNPhase::TWO_PHASE_GLOBAL) {
+            static_cast<void>(do_partition_topn_sort()); // fixed : should 
return status
+            _current_input_rows = 0;                     // reset record
+        }
     }
 
+    Status do_partition_topn_sort();
+
     void append_whole_block(vectorized::Block* input_block, const 
RowDescriptor& row_desc) {
         auto empty_block = 
Block::create_unique(VectorizedUtils::create_empty_block(row_desc));
         empty_block->swap(*input_block);
@@ -71,15 +127,22 @@ public:
     }
 
     bool reach_limit() {
-        return init_rows <= 0 || blocks.back()->bytes() > 
INITIAL_BUFFERED_BLOCK_BYTES;
+        return _init_rows <= 0 || blocks.back()->bytes() > 
INITIAL_BUFFERED_BLOCK_BYTES;
     }
 
-    size_t get_total_rows() const { return total_rows; }
+    size_t get_total_rows() const { return _total_rows; }
+    size_t get_topn_filter_rows() const { return _topn_filter_rows; }
 
     IColumn::Selector selector;
     std::vector<std::unique_ptr<Block>> blocks;
-    size_t total_rows = 0;
-    int init_rows = 4096;
+    size_t _total_rows = 0;
+    size_t _current_input_rows = 0;
+    size_t _topn_filter_rows = 0;
+    int _init_rows = 4096;
+    bool _is_first_sorter = false;
+
+    std::unique_ptr<PartitionSorter> _partition_topn_sorter = nullptr;
+    std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
 };
 
 using PartitionDataPtr = PartitionBlocks*;
@@ -213,6 +276,7 @@ private:
 
     std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts;
     std::vector<PartitionDataPtr> _value_places;
+    std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
     // Expressions and parameters used for build _sort_description
     VSortExecExprs _vsort_exec_exprs;
     std::vector<bool> _is_asc_order;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to