This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1ba9e4b568 [Improvement](sort) Reuse memory in sort node (#12921) 1ba9e4b568 is described below commit 1ba9e4b5684c59db248b9bf17ab0fe54680bf9ed Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Sep 28 09:44:35 2022 +0800 [Improvement](sort) Reuse memory in sort node (#12921) --- .../aggregate_functions/aggregate_function_sort.h | 2 +- be/src/vec/common/sort/heap_sorter.cpp | 2 +- be/src/vec/common/sort/heap_sorter.h | 2 +- be/src/vec/common/sort/sorter.cpp | 50 ++++++++++++++-------- be/src/vec/common/sort/sorter.h | 24 ++++------- be/src/vec/common/sort/topn_sorter.cpp | 28 +++++------- be/src/vec/common/sort/topn_sorter.h | 5 ++- be/src/vec/core/sort_block.cpp | 24 ++++++----- be/src/vec/core/sort_block.h | 3 +- be/src/vec/exec/vsort_node.cpp | 15 ++++--- be/src/vec/exec/vsort_node.h | 2 + be/src/vec/utils/util.hpp | 12 ++++++ 12 files changed, 96 insertions(+), 73 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index 2db72a4c5c..201bd5df62 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -87,7 +87,7 @@ struct AggregateFunctionSortData { } } - void sort() { sort_block(block, sort_desc, block.rows()); } + void sort() { sort_block(block, block, sort_desc, block.rows()); } }; template <typename Data> diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index 795bd66941..6520b005a4 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -29,7 +29,7 @@ HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs _topn_filter_rows(0), _init_sort_descs(false) {} -Status HeapSorter::append_block(Block* block, bool* mem_reuse) { +Status HeapSorter::append_block(Block* block) { DCHECK(block->rows() > 0); { SCOPED_TIMER(_materialize_timer); diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h index f725d585c2..6f644a9d92 100644 --- a/be/src/vec/common/sort/heap_sorter.h +++ b/be/src/vec/common/sort/heap_sorter.h @@ -63,7 +63,7 @@ public: _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime"); } - Status append_block(Block* block, bool* mem_reuse) override; + Status append_block(Block* block) override; Status prepare_for_read() override; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 9b5641075d..5de7499a2e 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -72,25 +72,27 @@ Status MergeSorterState::merge_sort_read(doris::RuntimeState* state, return Status::OK(); } -Status Sorter::partial_sort(Block& block) { - if (_vsort_exec_exprs.need_materialize_tuple()) { +Status Sorter::partial_sort(Block& src_block, Block& dest_block) { + size_t num_cols = src_block.columns(); + if (_materialize_sort_exprs) { auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size()); for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) { - RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&block, &valid_column_ids[i])); + RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, &valid_column_ids[i])); } Block new_block; for (auto column_id : valid_column_ids) { - new_block.insert(block.get_by_position(column_id)); + new_block.insert(src_block.get_by_position(column_id)); } - block.swap(new_block); + dest_block.swap(new_block); } _sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size()); + Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block; for (int i = 0; i < _sort_description.size(); i++) { const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i]; - RETURN_IF_ERROR(ordering_expr->execute(&block, &_sort_description[i].column_number)); + RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number)); _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; _sort_description[i].nulls_direction = @@ -99,7 +101,12 @@ Status Sorter::partial_sort(Block& block) { { SCOPED_TIMER(_partial_sort_timer); - sort_block(block, _sort_description, _offset + _limit); + if (_materialize_sort_exprs) { + sort_block(dest_block, dest_block, _sort_description, _offset + _limit); + } else { + sort_block(src_block, dest_block, _sort_description, _offset + _limit); + } + src_block.clear_column_data(num_cols); } return Status::OK(); @@ -111,11 +118,19 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), _state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {} -Status FullSorter::append_block(Block* block, bool* mem_reuse) { +Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); { SCOPED_TIMER(_merge_block_timer); - _state->unsorted_block->merge(*block); + auto& data = _state->unsorted_block->get_columns_with_type_and_name(); + const auto& arrival_data = block->get_columns_with_type_and_name(); + auto sz = block->rows(); + for (int i = 0; i < data.size(); ++i) { + DCHECK(data[i].type->equals(*(arrival_data[i].type))); + data[i].column->assume_mutable()->insert_range_from( + *arrival_data[i].column->convert_to_full_column_if_const().get(), 0, sz); + } + block->clear_column_data(); } if (_reach_limit()) { RETURN_IF_ERROR(_do_sort()); @@ -147,8 +162,10 @@ Status FullSorter::get_next(RuntimeState* state, Block* block, bool* eos) { } Status FullSorter::_do_sort() { - Block block = _state->unsorted_block->to_block(0); - RETURN_IF_ERROR(partial_sort(block)); + Block* src_block = _state->unsorted_block.get(); + Block desc_block = src_block->clone_without_columns(); + RETURN_IF_ERROR(partial_sort(*src_block, desc_block)); + // dispose TOP-N logic if (_limit != -1) { // Here is a little opt to reduce the mem uasge, we build a max heap @@ -156,23 +173,22 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows < _limit) { - _state->sorted_blocks.emplace_back(std::move(block)); - _state->num_rows += block.rows(); + _state->num_rows += desc_block.rows(); + _state->sorted_blocks.emplace_back(std::move(desc_block)); _block_priority_queue.emplace(_pool->add( new MergeSortCursorImpl(_state->sorted_blocks.back(), _sort_description))); } else { MergeSortBlockCursor block_cursor( - _pool->add(new MergeSortCursorImpl(block, _sort_description))); + _pool->add(new MergeSortCursorImpl(desc_block, _sort_description))); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - _state->sorted_blocks.emplace_back(std::move(block)); + _state->sorted_blocks.emplace_back(std::move(desc_block)); _block_priority_queue.push(block_cursor); } } } else { // dispose normal sort logic - _state->sorted_blocks.emplace_back(std::move(block)); + _state->sorted_blocks.emplace_back(std::move(desc_block)); } - _state->reset_block(); return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 2e56f8012a..856d740dbe 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -31,31 +31,23 @@ namespace doris::vectorized { class MergeSorterState { public: MergeSorterState(const RowDescriptor& row_desc, int64_t offset) - : unsorted_block(new MutableBlock( - VectorizedUtils::create_empty_columnswithtypename(row_desc))), - _offset(offset), - _row_desc(row_desc) {} + : unsorted_block(new Block(VectorizedUtils::create_empty_block(row_desc))), + _offset(offset) {} ~MergeSorterState() = default; - void reset_block() { - unsorted_block.reset( - new MutableBlock(VectorizedUtils::create_empty_columnswithtypename(_row_desc))); - } - void build_merge_tree(SortDescription& sort_description); Status merge_sort_read(doris::RuntimeState* state, doris::vectorized::Block* block, bool* eos); std::priority_queue<MergeSortCursor> priority_queue; std::vector<MergeSortCursorImpl> cursors; - std::unique_ptr<MutableBlock> unsorted_block; + std::unique_ptr<Block> unsorted_block; std::vector<Block> sorted_blocks; uint64_t num_rows = 0; private: int64_t _offset; - const RowDescriptor& _row_desc; }; class Sorter { @@ -67,7 +59,8 @@ public: _offset(offset), _pool(pool), _is_asc_order(is_asc_order), - _nulls_first(nulls_first) {} + _nulls_first(nulls_first), + _materialize_sort_exprs(vsort_exec_exprs.need_materialize_tuple()) {} virtual ~Sorter() = default; @@ -76,14 +69,14 @@ public: _merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime"); }; - virtual Status append_block(Block* block, bool* mem_reuse) = 0; + virtual Status append_block(Block* block) = 0; virtual Status prepare_for_read() = 0; virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0; protected: - Status partial_sort(Block& block); + Status partial_sort(Block& src_block, Block& dest_block); SortDescription _sort_description; VSortExecExprs& _vsort_exec_exprs; @@ -97,6 +90,7 @@ protected: RuntimeProfile::Counter* _merge_block_timer = nullptr; std::priority_queue<MergeSortBlockCursor> _block_priority_queue; + bool _materialize_sort_exprs; }; class FullSorter final : public Sorter { @@ -107,7 +101,7 @@ public: ~FullSorter() override = default; - Status append_block(Block* block, bool* mem_reuse) override; + Status append_block(Block* block) override; Status prepare_for_read() override; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 4ed7af6d04..a12186fac5 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -23,11 +23,12 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs ObjectPool* pool, std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, const RowDescriptor& row_desc) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {} + _state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))), + _row_desc(row_desc) {} -Status TopNSorter::append_block(Block* block, bool* mem_reuse) { +Status TopNSorter::append_block(Block* block) { DCHECK(block->rows() > 0); - RETURN_IF_ERROR(_do_sort(block, mem_reuse)); + RETURN_IF_ERROR(_do_sort(block)); return Status::OK(); } @@ -51,9 +52,10 @@ Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) { return Status::OK(); } -Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) { - *mem_reuse = false; - RETURN_IF_ERROR(partial_sort(*block)); +Status TopNSorter::_do_sort(Block* block) { + Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); + RETURN_IF_ERROR(partial_sort(*block, sorted_block)); + // dispose TOP-N logic if (_limit != -1) { // Here is a little opt to reduce the mem uasge, we build a max heap @@ -61,30 +63,20 @@ Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows < _limit) { - Block sorted_block; - sorted_block.swap(*block); - _state->sorted_blocks.emplace_back(std::move(sorted_block)); _state->num_rows += sorted_block.rows(); + _state->sorted_blocks.emplace_back(std::move(sorted_block)); _block_priority_queue.emplace(_pool->add( new MergeSortCursorImpl(_state->sorted_blocks.back(), _sort_description))); } else { - Block sorted_block; - sorted_block.swap(*block); MergeSortBlockCursor block_cursor( _pool->add(new MergeSortCursorImpl(sorted_block, _sort_description))); if (!block_cursor.totally_greater(_block_priority_queue.top())) { _state->sorted_blocks.emplace_back(std::move(sorted_block)); _block_priority_queue.push(block_cursor); - } else { - *mem_reuse = true; - block->clear_column_data(); } } } else { - Block sorted_block; - sorted_block.swap(*block); - // dispose normal sort logic - _state->sorted_blocks.emplace_back(std::move(sorted_block)); + return Status::InternalError("Should not reach TopN sorter for full sort query"); } return Status::OK(); } diff --git a/be/src/vec/common/sort/topn_sorter.h b/be/src/vec/common/sort/topn_sorter.h index 675442f5a1..51df1bea5b 100644 --- a/be/src/vec/common/sort/topn_sorter.h +++ b/be/src/vec/common/sort/topn_sorter.h @@ -30,7 +30,7 @@ public: ~TopNSorter() override = default; - Status append_block(Block* block, bool* mem_reuse) override; + Status append_block(Block* block) override; Status prepare_for_read() override; @@ -39,9 +39,10 @@ public: static constexpr size_t TOPN_SORT_THRESHOLD = 256; private: - Status _do_sort(Block* block, bool* mem_reuse); + Status _do_sort(Block* block); std::unique_ptr<MergeSorterState> _state; + const RowDescriptor& _row_desc; }; } // namespace doris::vectorized diff --git a/be/src/vec/core/sort_block.cpp b/be/src/vec/core/sort_block.cpp index 657fd58d23..bf7ca56d95 100644 --- a/be/src/vec/core/sort_block.cpp +++ b/be/src/vec/core/sort_block.cpp @@ -63,8 +63,9 @@ struct PartialSortingLess { } }; -void sort_block(Block& block, const SortDescription& description, UInt64 limit) { - if (!block) { +void sort_block(Block& src_block, Block& dest_block, const SortDescription& description, + UInt64 limit) { + if (!src_block) { return; } @@ -74,18 +75,19 @@ void sort_block(Block& block, const SortDescription& description, UInt64 limit) const IColumn* column = !description[0].column_name.empty() - ? block.get_by_name(description[0].column_name).column.get() - : block.safe_get_by_position(description[0].column_number).column.get(); + ? src_block.get_by_name(description[0].column_name).column.get() + : src_block.safe_get_by_position(description[0].column_number).column.get(); IColumn::Permutation perm; column->get_permutation(reverse, limit, description[0].nulls_direction, perm); - size_t columns = block.columns(); + size_t columns = src_block.columns(); for (size_t i = 0; i < columns; ++i) { - block.get_by_position(i).column = block.get_by_position(i).column->permute(perm, limit); + dest_block.replace_by_position( + i, src_block.get_by_position(i).column->permute(perm, limit)); } } else { - size_t size = block.rows(); + size_t size = src_block.rows(); IColumn::Permutation perm(size); for (size_t i = 0; i < size; ++i) { perm[i] = i; @@ -96,20 +98,22 @@ void sort_block(Block& block, const SortDescription& description, UInt64 limit) } ColumnsWithSortDescriptions columns_with_sort_desc = - get_columns_with_sort_description(block, description); + get_columns_with_sort_description(src_block, description); { EqualFlags flags(size, 1); EqualRange range {0, size}; + // TODO: ColumnSorter should be constructed only once. for (size_t i = 0; i < columns_with_sort_desc.size(); i++) { ColumnSorter sorter(columns_with_sort_desc[i], limit); sorter.operator()(flags, perm, range, i == columns_with_sort_desc.size() - 1); } } - size_t columns = block.columns(); + size_t columns = src_block.columns(); for (size_t i = 0; i < columns; ++i) { - block.get_by_position(i).column = block.get_by_position(i).column->permute(perm, limit); + dest_block.replace_by_position( + i, src_block.get_by_position(i).column->permute(perm, limit)); } } } diff --git a/be/src/vec/core/sort_block.h b/be/src/vec/core/sort_block.h index cc791881c1..c7ff4c70e4 100644 --- a/be/src/vec/core/sort_block.h +++ b/be/src/vec/core/sort_block.h @@ -28,7 +28,8 @@ namespace doris::vectorized { /// Sort one block by `description`. If limit != 0, then the partial sort of the first `limit` rows is produced. -void sort_block(Block& block, const SortDescription& description, UInt64 limit = 0); +void sort_block(Block& src_block, Block& dest_block, const SortDescription& description, + UInt64 limit = 0); /** Used only in StorageMergeTree to sort the data with INSERT. * Sorting is stable. This is important for keeping the order of rows in the CollapsingMergeTree engine diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 54323c561c..cb1eb699ab 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -31,7 +31,8 @@ namespace doris::vectorized { VSortNode::VSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), - _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {} + _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), + _reuse_mem(true) {} Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -48,6 +49,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { !row_desc.has_varlen_slots()) { _sorter.reset(new HeapSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, _nulls_first, row_desc)); + _reuse_mem = false; } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit > 0 && _limit + _offset < TopNSorter::TOPN_SORT_THRESHOLD) { _sorter.reset(new TopNSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, @@ -84,19 +86,18 @@ Status VSortNode::open(RuntimeState* state) { // The child has been opened and the sorter created. Sort the input. // The final merge is done on-demand as rows are requested in get_next(). bool eos = false; - bool mem_reuse = false; - std::unique_ptr<Block> upstream_block; + std::unique_ptr<Block> upstream_block(new Block()); do { - if (!mem_reuse) { - upstream_block.reset(new Block()); - } RETURN_IF_ERROR_AND_CHECK_SPAN( child(0)->get_next_after_projects(state, upstream_block.get(), &eos), child(0)->get_next_span(), eos); if (upstream_block->rows() != 0) { - RETURN_IF_ERROR(_sorter->append_block(upstream_block.get(), &mem_reuse)); + RETURN_IF_ERROR(_sorter->append_block(upstream_block.get())); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); + if (!_reuse_mem) { + upstream_block.reset(new Block()); + } } } while (!eos); diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index ff640b6851..7b2834e4b4 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -63,6 +63,8 @@ private: std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; + bool _reuse_mem; + std::unique_ptr<Sorter> _sorter; static constexpr size_t ACCUMULATED_PARTIAL_SORT_THRESHOLD = 256; diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index 0df02ddd6a..da8e3d19af 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -45,6 +45,18 @@ public: return columns_with_type_and_name; } + static ColumnsWithTypeAndName create_empty_block(const RowDescriptor& row_desc) { + ColumnsWithTypeAndName columns_with_type_and_name; + for (const auto& tuple_desc : row_desc.tuple_descriptors()) { + for (const auto& slot_desc : tuple_desc->slots()) { + columns_with_type_and_name.emplace_back( + slot_desc->get_data_type_ptr()->create_column(), + slot_desc->get_data_type_ptr(), slot_desc->col_name()); + } + } + return columns_with_type_and_name; + } + static void update_null_map(NullMap& dst, const NullMap& src) { size_t size = dst.size(); auto* __restrict l = dst.data(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org