This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 96c4fcfb206 [improve](node) refactor partition sort node to reduce memory use 96c4fcfb206 is described below commit 96c4fcfb2064bc6ddc3d0f79d674ae07b3de252a Author: zhangstar333 <2561612...@qq.com> AuthorDate: Fri Jan 26 14:21:18 2024 +0800 [improve](node) refactor partition sort node to reduce memory use pipelineX --- .../pipeline/exec/partition_sort_sink_operator.cpp | 13 +++- .../pipeline/exec/partition_sort_sink_operator.h | 1 + be/src/vec/exec/vpartition_sort_node.cpp | 77 +++++++++++++++++---- be/src/vec/exec/vpartition_sort_node.h | 80 +++++++++++++++++++--- 4 files changed, 148 insertions(+), 23 deletions(-) diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index c09a6a90b95..1fe25ea1910 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -44,6 +44,10 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); + _partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>( + &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, + p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, + p._top_n_algorithm, _shared_state->previous_row.get(), p._topn_phase); _init_hash_method(); return Status::OK(); } @@ -100,7 +104,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.child_input_rows = local_state.child_input_rows + current_rows; if (UNLIKELY(_partition_exprs_num == 0)) { if (UNLIKELY(local_state._value_places.empty())) { - local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks())); + local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks( + local_state._partition_sort_info, local_state._value_places.empty()))); } //no partition key local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc()); @@ -187,13 +192,15 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table( auto creator = [&](const auto& ctor, auto& key, auto& origin) { HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool); - auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks()); + auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks( + local_state._partition_sort_info, local_state._value_places.empty())); local_state._value_places.push_back(aggregate_data); ctor(key, aggregate_data); local_state._num_partition++; }; auto creator_for_null_key = [&](auto& mapped) { - mapped = _pool->add(new vectorized::PartitionBlocks()); + mapped = _pool->add(new vectorized::PartitionBlocks( + local_state._partition_sort_info, local_state._value_places.empty())); local_state._value_places.push_back(mapped); local_state._num_partition++; }; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index a46dc46433c..21145189fc7 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -81,6 +81,7 @@ private: std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data; std::unique_ptr<vectorized::Arena> _agg_arena_pool; int _partition_exprs_num = 0; + std::shared_ptr<vectorized::PartitionSortInfo> _partition_sort_info = nullptr; RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _emplace_key_timer = nullptr; diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 95c0abd72a8..694aa99a6cd 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -33,10 +33,50 @@ #include "vec/common/hash_table/hash_map_context_creator.h" #include "vec/common/hash_table/hash_set.h" #include "vec/common/hash_table/partitioned_hash_map.h" +#include "vec/core/block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { +Status PartitionBlocks::do_partition_topn_sort() { + if (_partition_topn_sorter == nullptr) { + _partition_topn_sorter = PartitionSorter::create_unique( + *_partition_sort_info->_vsort_exec_exprs, _partition_sort_info->_limit, + _partition_sort_info->_offset, _partition_sort_info->_pool, + _partition_sort_info->_is_asc_order, _partition_sort_info->_nulls_first, + _partition_sort_info->_row_desc, _partition_sort_info->_runtime_state, nullptr, + _partition_sort_info->_has_global_limit, + _partition_sort_info->_partition_inner_limit, + _partition_sort_info->_top_n_algorithm, _partition_sort_info->_previous_row); + } + + for (const auto& block : blocks) { + RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get())); + } + blocks.clear(); + _partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile); + RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read()); + bool current_eos = false; + size_t current_output_rows = 0; + while (!current_eos) { + // output_block maybe need better way + auto output_block = Block::create_unique( + VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc)); + RETURN_IF_ERROR(_partition_topn_sorter->get_next(_partition_sort_info->_runtime_state, + output_block.get(), ¤t_eos)); + auto rows = output_block->rows(); + if (rows > 0) { + current_output_rows += rows; + blocks.emplace_back(std::move(output_block)); + } + } + + _topn_filter_rows += (_current_input_rows - current_output_rows); + _partition_sort_info->_previous_row->reset(); + _partition_topn_sorter.reset(nullptr); + + return Status::OK(); +} VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -79,6 +119,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) { _get_sorted_timer = ADD_TIMER(runtime_profile(), "GetSortedTime"); _selector_block_timer = ADD_TIMER(runtime_profile(), "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime"); + runtime_profile()->add_info_string("CurrentTopNPhase", std::to_string(_topn_phase)); RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_TIMER(_exec_timer); @@ -86,6 +127,10 @@ Status VPartitionSortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, child(0)->row_desc())); _init_hash_method(); + _partition_sort_info = std::make_shared<PartitionSortInfo>( + &_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, child(0)->row_desc(), + state, _runtime_profile.get(), _has_global_limit, _partition_inner_limit, + _top_n_algorithm, _previous_row.get(), _topn_phase); return Status::OK(); } @@ -116,13 +161,15 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum auto creator = [&](const auto& ctor, auto& key, auto& origin) { HashMethodType::try_presis_key(key, origin, *_agg_arena_pool); - auto* aggregate_data = _pool->add(new PartitionBlocks()); + auto* aggregate_data = _pool->add( + new PartitionBlocks(_partition_sort_info, _value_places.empty())); _value_places.push_back(aggregate_data); ctor(key, aggregate_data); _num_partition++; }; auto creator_for_null_key = [&](auto& mapped) { - mapped = _pool->add(new PartitionBlocks()); + mapped = _pool->add( + new PartitionBlocks(_partition_sort_info, _value_places.empty())); _value_places.push_back(mapped); _num_partition++; }; @@ -135,7 +182,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum } SCOPED_TIMER(_selector_block_timer); - for (auto place : _value_places) { + for (auto* place : _value_places) { place->append_block_by_selector(input_block, child(0)->row_desc(), _has_global_limit, _partition_inner_limit, batch_size); @@ -151,7 +198,8 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl child_input_rows = child_input_rows + current_rows; if (UNLIKELY(_partition_exprs_num == 0)) { if (UNLIKELY(_value_places.empty())) { - _value_places.push_back(_pool->add(new PartitionBlocks())); + _value_places.push_back(_pool->add( + new PartitionBlocks(_partition_sort_info, _value_places.empty()))); } //no partition key _value_places[0]->append_whole_block(input_block, child(0)->row_desc()); @@ -195,9 +243,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl RETURN_IF_ERROR(sorter->prepare_for_read()); _partition_sorts.push_back(std::move(sorter)); } - if (state->enable_profile()) { - debug_profile(); - } + COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition)); //so all data from child have sink completed _can_read = true; @@ -312,14 +358,15 @@ Status VPartitionSortNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } - if (state->enable_profile()) { - debug_profile(); - } + return ExecNode::close(state); } void VPartitionSortNode::release_resource(RuntimeState* state) { _vsort_exec_exprs.close(state); + if (state->enable_profile()) { + debug_profile(); + } ExecNode::release_resource(state); } @@ -389,19 +436,25 @@ void VPartitionSortNode::_init_hash_method() { } void VPartitionSortNode::debug_profile() { - fmt::memory_buffer partition_rows_read, partition_blocks_read; + fmt::memory_buffer partition_rows_read, partition_blocks_read, partition_filter_rows; fmt::format_to(partition_rows_read, "["); fmt::format_to(partition_blocks_read, "["); - for (auto place : _value_places) { + fmt::format_to(partition_filter_rows, "["); + + for (auto* place : _value_places) { fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows()); + fmt::format_to(partition_filter_rows, "{}, ", place->get_topn_filter_rows()); fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size()); } fmt::format_to(partition_rows_read, "]"); fmt::format_to(partition_blocks_read, "]"); + fmt::format_to(partition_filter_rows, "]"); runtime_profile()->add_info_string("PerPartitionBlocksRead", fmt::to_string(partition_blocks_read)); runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read)); + runtime_profile()->add_info_string("PerPartitionFilterRows", + fmt::to_string(partition_filter_rows)); fmt::memory_buffer partition_output_rows; fmt::format_to(partition_output_rows, "["); for (auto row : partition_profile_output_rows) { diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index d369846df59..1ee7ffe26cb 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -23,6 +23,7 @@ #include <memory> #include <mutex> +#include "common/status.h" #include "exec/exec_node.h" #include "vec/columns/column.h" #include "vec/common/columns_hashing.h" @@ -37,10 +38,55 @@ namespace doris { namespace vectorized { static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20; +static constexpr size_t PARTITION_SORT_ROWS_THRESHOLD = 20000; + +struct PartitionSortInfo { + ~PartitionSortInfo() = default; + + PartitionSortInfo(VSortExecExprs* vsort_exec_exprs, int64_t limit, int64_t offset, + ObjectPool* pool, const std::vector<bool>& is_asc_order, + const std::vector<bool>& nulls_first, const RowDescriptor& row_desc, + RuntimeState* runtime_state, RuntimeProfile* runtime_profile, + bool has_global_limit, int64_t partition_inner_limit, + TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row, + TPartTopNPhase::type topn_phase) + : _vsort_exec_exprs(vsort_exec_exprs), + _limit(limit), + _offset(offset), + _pool(pool), + _is_asc_order(is_asc_order), + _nulls_first(nulls_first), + _row_desc(row_desc), + _runtime_state(runtime_state), + _runtime_profile(runtime_profile), + _has_global_limit(has_global_limit), + _partition_inner_limit(partition_inner_limit), + _top_n_algorithm(top_n_algorithm), + _previous_row(previous_row), + _topn_phase(topn_phase) {} + +public: + VSortExecExprs* _vsort_exec_exprs = nullptr; + int64_t _limit = -1; + int64_t _offset = -1; + ObjectPool* _pool = nullptr; + std::vector<bool> _is_asc_order; + std::vector<bool> _nulls_first; + const RowDescriptor& _row_desc; + RuntimeState* _runtime_state = nullptr; + RuntimeProfile* _runtime_profile = nullptr; + bool _has_global_limit = false; + int64_t _partition_inner_limit = 0; + TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER; + SortCursorCmp* _previous_row = nullptr; + TPartTopNPhase::type _topn_phase = TPartTopNPhase::TWO_PHASE_GLOBAL; +}; struct PartitionBlocks { public: - PartitionBlocks() = default; + PartitionBlocks() = default; //should fixed in pipelineX + PartitionBlocks(std::shared_ptr<PartitionSortInfo> partition_sort_info, bool is_first_sorter) + : _is_first_sorter(is_first_sorter), _partition_sort_info(partition_sort_info) {} ~PartitionBlocks() = default; void add_row_idx(size_t row) { selector.push_back(row); } @@ -49,7 +95,7 @@ public: const RowDescriptor& row_desc, bool is_limit, int64_t partition_inner_limit, int batch_size) { if (blocks.empty() || reach_limit()) { - init_rows = batch_size; + _init_rows = batch_size; blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc))); } auto columns = input_block->get_columns(); @@ -59,11 +105,21 @@ public: columns[i]->append_data_by_selector(mutable_columns[i], selector); } blocks.back()->set_columns(std::move(mutable_columns)); - init_rows = init_rows - selector.size(); - total_rows = total_rows + selector.size(); + auto selector_rows = selector.size(); + _init_rows = _init_rows - selector_rows; + _total_rows = _total_rows + selector_rows; + _current_input_rows = _current_input_rows + selector_rows; selector.clear(); + // maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD + if (_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD && + _partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) { + static_cast<void>(do_partition_topn_sort()); // fixed : should return status + _current_input_rows = 0; // reset record + } } + Status do_partition_topn_sort(); + void append_whole_block(vectorized::Block* input_block, const RowDescriptor& row_desc) { auto empty_block = Block::create_unique(VectorizedUtils::create_empty_block(row_desc)); empty_block->swap(*input_block); @@ -71,15 +127,22 @@ public: } bool reach_limit() { - return init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES; + return _init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES; } - size_t get_total_rows() const { return total_rows; } + size_t get_total_rows() const { return _total_rows; } + size_t get_topn_filter_rows() const { return _topn_filter_rows; } IColumn::Selector selector; std::vector<std::unique_ptr<Block>> blocks; - size_t total_rows = 0; - int init_rows = 4096; + size_t _total_rows = 0; + size_t _current_input_rows = 0; + size_t _topn_filter_rows = 0; + int _init_rows = 4096; + bool _is_first_sorter = false; + + std::unique_ptr<PartitionSorter> _partition_topn_sorter = nullptr; + std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr; }; using PartitionDataPtr = PartitionBlocks*; @@ -213,6 +276,7 @@ private: std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts; std::vector<PartitionDataPtr> _value_places; + std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr; // Expressions and parameters used for build _sort_description VSortExecExprs _vsort_exec_exprs; std::vector<bool> _is_asc_order; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org