This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3cfaae0031 [Improvement](sort) Use heap sort to optimize sort node (#12700) 3cfaae0031 is described below commit 3cfaae0031f2a4260528cd4d1b1855af82bf0c3b Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Sep 21 10:01:52 2022 +0800 [Improvement](sort) Use heap sort to optimize sort node (#12700) --- be/src/util/simd/bits.h | 2 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/columns/column.cpp | 21 ++++ be/src/vec/columns/column.h | 10 ++ be/src/vec/columns/column_decimal.cpp | 26 +++++ be/src/vec/columns/column_decimal.h | 4 + be/src/vec/columns/column_string.cpp | 24 +++++ be/src/vec/columns/column_string.h | 4 + be/src/vec/columns/column_vector.cpp | 25 +++++ be/src/vec/columns/column_vector.h | 4 + be/src/vec/common/sort/heap_sorter.cpp | 165 ++++++++++++++++++++++++++++ be/src/vec/common/sort/heap_sorter.h | 90 ++++++++++++++++ be/src/vec/common/sort/sorter.cpp | 85 ++------------- be/src/vec/common/sort/sorter.h | 47 +++----- be/src/vec/common/sort/topn_sorter.cpp | 92 ++++++++++++++++ be/src/vec/common/sort/topn_sorter.h | 47 ++++++++ be/src/vec/core/sort_block.h | 2 +- be/src/vec/core/sort_cursor.h | 172 +++++++++++++++++++++++++----- be/src/vec/exec/vsort_node.cpp | 36 ++++--- be/src/vec/exec/vsort_node.h | 2 - be/src/vec/runtime/vsorted_run_merger.cpp | 6 +- be/src/vec/runtime/vsorted_run_merger.h | 6 +- 22 files changed, 704 insertions(+), 168 deletions(-) diff --git a/be/src/util/simd/bits.h b/be/src/util/simd/bits.h index 8edbe72f4a..df91e63c61 100644 --- a/be/src/util/simd/bits.h +++ b/be/src/util/simd/bits.h @@ -112,7 +112,7 @@ inline static size_t find_byte(const std::vector<T>& vec, size_t start, T byte) return (T*)p - vec.data(); } -inline size_t find_nonzero(const std::vector<uint8_t>& vec, size_t start) { +inline size_t find_one(const std::vector<uint8_t>& vec, size_t start) { return find_byte<uint8_t>(vec, start, 1); } diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 01f20742e6..0d0497add1 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -54,7 +54,9 @@ set(VEC_FILES common/exception.cpp common/mremap.cpp common/pod_array.cpp + common/sort/heap_sorter.cpp common/sort/sorter.cpp + common/sort/topn_sorter.cpp common/sort/vsort_exec_exprs.cpp common/string_utils/string_utils.cpp core/block.cpp diff --git a/be/src/vec/columns/column.cpp b/be/src/vec/columns/column.cpp index 845600bc1b..44521cc98c 100644 --- a/be/src/vec/columns/column.cpp +++ b/be/src/vec/columns/column.cpp @@ -52,6 +52,27 @@ void IColumn::sort_column(const ColumnSorter* sorter, EqualFlags& flags, sorter->sort_column(static_cast<const IColumn&>(*this), flags, perms, range, last_column); } +void IColumn::compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, + int direction, std::vector<uint8>& cmp_res, + uint8* __restrict filter) const { + auto sz = this->size(); + DCHECK(cmp_res.size() == sz); + size_t begin = simd::find_zero(cmp_res, 0); + while (begin < sz) { + size_t end = simd::find_one(cmp_res, begin + 1); + for (size_t row_id = begin; row_id < end; row_id++) { + int res = this->compare_at(row_id, rhs_row_id, rhs, nan_direction_hint); + if (res * direction < 0) { + filter[row_id] = 1; + cmp_res[row_id] = 1; + } else if (res * direction > 0) { + cmp_res[row_id] = 1; + } + } + begin = simd::find_zero(cmp_res, end + 1); + } +} + bool is_column_nullable(const IColumn& column) { return check_column<ColumnNullable>(column); } diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index f58cfa8c0e..94b39ba1e0 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -402,6 +402,16 @@ public: virtual int compare_at(size_t n, size_t m, const IColumn& rhs, int nan_direction_hint) const = 0; + /** + * To compare all rows in this column with another row (with row_id = rhs_row_id in column rhs) + * @param nan_direction_hint and direction indicates the ordering. + * @param cmp_res if we already has a comparison result for row i, e.g. cmp_res[i] = 1, we can skip row i + * @param filter this stores comparison results for all rows. filter[i] = 1 means row i is less than row rhs_row_id in rhs + */ + virtual void compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, + int direction, std::vector<uint8>& cmp_res, + uint8* __restrict filter) const; + /** Returns a permutation that sorts elements of this column, * i.e. perm[i]-th element of source column should be i-th element of sorted column. * reverse - reverse ordering (acsending). diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 690714b2ba..ca7260b16c 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -381,6 +381,32 @@ void ColumnDecimal<T>::sort_column(const ColumnSorter* sorter, EqualFlags& flags sorter->template sort_column(static_cast<const Self&>(*this), flags, perms, range, last_column); } +template <typename T> +void ColumnDecimal<T>::compare_internal(size_t rhs_row_id, const IColumn& rhs, + int nan_direction_hint, int direction, + std::vector<uint8>& cmp_res, + uint8* __restrict filter) const { + auto sz = this->size(); + DCHECK(cmp_res.size() == sz); + const auto& cmp_base = assert_cast<const ColumnDecimal<T>&>(rhs).get_data()[rhs_row_id]; + + size_t begin = simd::find_zero(cmp_res, 0); + while (begin < sz) { + size_t end = simd::find_one(cmp_res, begin + 1); + for (size_t row_id = begin; row_id < end; row_id++) { + auto value_a = get_data()[row_id]; + int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0); + if (res * direction < 0) { + filter[row_id] = 1; + cmp_res[row_id] = 1; + } else if (res * direction > 0) { + cmp_res[row_id] = 1; + } + } + begin = simd::find_zero(cmp_res, end + 1); + } +} + template <> Decimal32 ColumnDecimal<Decimal32>::get_scale_multiplier() const { return common::exp10_i32(scale); diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 9957742c0d..bd1d90e63c 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -233,6 +233,10 @@ public: void sort_column(const ColumnSorter* sorter, EqualFlags& flags, IColumn::Permutation& perms, EqualRange& range, bool last_column) const override; + void compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, + int direction, std::vector<uint8>& cmp_res, + uint8* __restrict filter) const override; + UInt32 get_scale() const { return scale; } T get_scale_multiplier() const; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 87fa8da649..d988791ca0 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -487,4 +487,28 @@ void ColumnString::protect() { get_offsets().protect(); } +void ColumnString::compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, + int direction, std::vector<uint8>& cmp_res, + uint8* __restrict filter) const { + auto sz = this->size(); + DCHECK(cmp_res.size() == sz); + const auto& cmp_base = assert_cast<const ColumnString&>(rhs).get_data_at(rhs_row_id); + size_t begin = simd::find_zero(cmp_res, 0); + while (begin < sz) { + size_t end = simd::find_one(cmp_res, begin + 1); + for (size_t row_id = begin; row_id < end; row_id++) { + auto value_a = get_data_at(row_id); + int res = memcmp_small_allow_overflow15(value_a.data, value_a.size, cmp_base.data, + cmp_base.size); + if (res * direction < 0) { + filter[row_id] = 1; + cmp_res[row_id] = 1; + } else if (res * direction > 0) { + cmp_res[row_id] = 1; + } + } + begin = simd::find_zero(cmp_res, end + 1); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 56344a85e9..783d79cf07 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -371,6 +371,10 @@ public: offsets[self_row] = offsets[self_row - 1]; } } + + void compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, + int direction, std::vector<uint8>& cmp_res, + uint8* __restrict filter) const override; }; } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index e80b4009c6..2ca35f6948 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -128,6 +128,31 @@ void ColumnVector<T>::sort_column(const ColumnSorter* sorter, EqualFlags& flags, sorter->template sort_column(static_cast<const Self&>(*this), flags, perms, range, last_column); } +template <typename T> +void ColumnVector<T>::compare_internal(size_t rhs_row_id, const IColumn& rhs, + int nan_direction_hint, int direction, + std::vector<uint8>& cmp_res, + uint8* __restrict filter) const { + auto sz = this->size(); + DCHECK(cmp_res.size() == sz); + const auto& cmp_base = assert_cast<const ColumnVector<T>&>(rhs).get_data()[rhs_row_id]; + size_t begin = simd::find_zero(cmp_res, 0); + while (begin < sz) { + size_t end = simd::find_one(cmp_res, begin + 1); + for (size_t row_id = begin; row_id < end; row_id++) { + auto value_a = get_data()[row_id]; + int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0); + if (res * direction < 0) { + filter[row_id] = 1; + cmp_res[row_id] = 1; + } else if (res * direction > 0) { + cmp_res[row_id] = 1; + } + } + begin = simd::find_zero(cmp_res, end + 1); + } +} + template <typename T> void ColumnVector<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 2867d64c65..d22e8f4b69 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -393,6 +393,10 @@ public: void sort_column(const ColumnSorter* sorter, EqualFlags& flags, IColumn::Permutation& perms, EqualRange& range, bool last_column) const override; + void compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, + int direction, std::vector<uint8>& cmp_res, + uint8* __restrict filter) const override; + protected: Container data; }; diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp new file mode 100644 index 0000000000..795bd66941 --- /dev/null +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/common/sort/heap_sorter.h" + +#include "util/defer_op.h" + +namespace doris::vectorized { +HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, + ObjectPool* pool, std::vector<bool>& is_asc_order, + std::vector<bool>& nulls_first, const RowDescriptor& row_desc) + : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), + _heap_size(limit + offset), + _heap(std::make_unique<SortingHeap>()), + _topn_filter_rows(0), + _init_sort_descs(false) {} + +Status HeapSorter::append_block(Block* block, bool* mem_reuse) { + DCHECK(block->rows() > 0); + { + SCOPED_TIMER(_materialize_timer); + if (_vsort_exec_exprs.need_materialize_tuple()) { + auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); + std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size()); + for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) { + RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block, &valid_column_ids[i])); + } + + Block new_block; + for (auto column_id : valid_column_ids) { + new_block.insert(block->get_by_position(column_id)); + } + block->swap(new_block); + } + } + if (!_init_sort_descs) { + RETURN_IF_ERROR(_prepare_sort_descs(block)); + } + Block tmp_block = block->clone_empty(); + tmp_block.swap(*block); + HeapSortCursorBlockView block_view_val(std::move(tmp_block), _sort_description); + SharedHeapSortCursorBlockView* block_view = + new SharedHeapSortCursorBlockView(std::move(block_view_val)); + block_view->ref(); + Defer defer([&] { block_view->unref(); }); + size_t num_rows = tmp_block.rows(); + if (_heap_size == _heap->size()) { + { + SCOPED_TIMER(_topn_filter_timer); + _do_filter(block_view->value(), num_rows); + } + size_t remain_rows = block_view->value().block.rows(); + _topn_filter_rows += (num_rows - remain_rows); + COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows); + for (size_t i = 0; i < remain_rows; ++i) { + HeapSortCursorImpl cursor(i, block_view); + _heap->replace_top_if_less(std::move(cursor)); + } + } else { + size_t free_slots = std::min<size_t>(_heap_size - _heap->size(), num_rows); + + size_t i = 0; + for (; i < free_slots; ++i) { + HeapSortCursorImpl cursor(i, block_view); + _heap->push(std::move(cursor)); + } + + for (; i < num_rows; ++i) { + HeapSortCursorImpl cursor(i, block_view); + _heap->replace_top_if_less(std::move(cursor)); + } + } + return Status::OK(); +} + +Status HeapSorter::prepare_for_read() { + if (!_heap->empty() && _heap->size() > _offset) { + const auto& top = _heap->top(); + size_t num_columns = top.block()->columns(); + MutableColumns result_columns = top.block()->clone_empty_columns(); + + size_t init_size = std::min((size_t)_limit, _heap->size()); + result_columns.reserve(init_size); + + DCHECK(_heap->size() <= _heap_size); + // Use a vector to reverse elements in heap + std::vector<HeapSortCursorImpl> vector_to_reverse; + vector_to_reverse.reserve(init_size); + size_t capacity = 0; + while (!_heap->empty()) { + auto current = _heap->top(); + _heap->pop(); + vector_to_reverse.emplace_back(std::move(current)); + capacity++; + if (_offset != 0 && _heap->size() == _offset) { + break; + } + } + for (int i = capacity - 1; i >= 0; i--) { + auto rid = vector_to_reverse[i].row_id(); + const auto cur_block = vector_to_reverse[i].block(); + for (size_t j = 0; j < num_columns; ++j) { + result_columns[j]->insert_from(*(cur_block->get_columns()[j]), rid); + } + } + _return_block = vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns)); + } + return Status::OK(); +} + +Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) { + _return_block.swap(*block); + *eos = true; + return Status::OK(); +} + +void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t num_rows) { + const auto& top_cursor = _heap->top(); + const int cursor_rid = top_cursor.row_id(); + + IColumn::Filter filter(num_rows); + for (size_t i = 0; i < num_rows; ++i) { + filter[i] = 0; + } + + std::vector<uint8_t> cmp_res(num_rows, 0); + + for (size_t col_id = 0; col_id < _sort_description.size(); ++col_id) { + block_view.sort_columns[col_id]->compare_internal( + cursor_rid, *top_cursor.sort_columns()[col_id], + _sort_description[col_id].nulls_direction, _sort_description[col_id].direction, + cmp_res, filter.data()); + } + block_view.filter_block(filter); +} + +Status HeapSorter::_prepare_sort_descs(Block* block) { + _sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size()); + for (int i = 0; i < _sort_description.size(); i++) { + const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i]; + RETURN_IF_ERROR(ordering_expr->execute(block, &_sort_description[i].column_number)); + + _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; + _sort_description[i].nulls_direction = + _nulls_first[i] ? -_sort_description[i].direction : _sort_description[i].direction; + } + _init_sort_descs = true; + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h new file mode 100644 index 0000000000..f725d585c2 --- /dev/null +++ b/be/src/vec/common/sort/heap_sorter.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <queue> + +#include "vec/common/sort/sorter.h" + +namespace doris::vectorized { + +class SortingHeap { +public: + const HeapSortCursorImpl& top() { return _queue.top(); } + + size_t size() { return _queue.size(); } + + bool empty() { return _queue.empty(); } + + void pop() { _queue.pop(); } + + void replace_top(HeapSortCursorImpl&& top) { + _queue.pop(); + _queue.push(std::move(top)); + } + + void push(HeapSortCursorImpl&& cursor) { _queue.push(std::move(cursor)); } + + void replace_top_if_less(HeapSortCursorImpl&& val) { + if (val < top()) { + replace_top(std::move(val)); + } + } + +private: + std::priority_queue<HeapSortCursorImpl> _queue; +}; + +class HeapSorter final : public Sorter { +public: + HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, + std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, + const RowDescriptor& row_desc); + + ~HeapSorter() override = default; + + void init_profile(RuntimeProfile* runtime_profile) override { + _topn_filter_timer = ADD_TIMER(runtime_profile, "TopNFilterTime"); + _topn_filter_rows_counter = ADD_COUNTER(runtime_profile, "TopNFilterRows", TUnit::UNIT); + _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime"); + } + + Status append_block(Block* block, bool* mem_reuse) override; + + Status prepare_for_read() override; + + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + + static constexpr size_t HEAP_SORT_THRESHOLD = 1024; + +private: + void _do_filter(HeapSortCursorBlockView& block_view, size_t num_rows); + + Status _prepare_sort_descs(Block* block); + + size_t _heap_size; + std::unique_ptr<SortingHeap> _heap; + Block _return_block; + int64_t _topn_filter_rows; + bool _init_sort_descs; + + RuntimeProfile::Counter* _topn_filter_timer = nullptr; + RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr; + RuntimeProfile::Counter* _materialize_timer = nullptr; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 86de482333..9b5641075d 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -25,7 +25,7 @@ void MergeSorterState::build_merge_tree(SortDescription& sort_description) { } if (sorted_blocks.size() > 1) { - for (auto& cursor : cursors) priority_queue.push(SortCursor(&cursor)); + for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor)); } } @@ -105,11 +105,10 @@ Status Sorter::partial_sort(Block& block) { return Status::OK(); } -FullSorter::FullSorter(SortDescription& sort_description, VSortExecExprs& vsort_exec_exprs, - int limit, int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order, +FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, + ObjectPool* pool, std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, const RowDescriptor& row_desc) - : Sorter(sort_description, vsort_exec_exprs, limit, offset, pool, is_asc_order, - nulls_first), + : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), _state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {} Status FullSorter::append_block(Block* block, bool* mem_reuse) { @@ -160,9 +159,10 @@ Status FullSorter::_do_sort() { _state->sorted_blocks.emplace_back(std::move(block)); _state->num_rows += block.rows(); _block_priority_queue.emplace(_pool->add( - new SortCursorImpl(_state->sorted_blocks.back(), _sort_description))); + new MergeSortCursorImpl(_state->sorted_blocks.back(), _sort_description))); } else { - SortBlockCursor block_cursor(_pool->add(new SortCursorImpl(block, _sort_description))); + MergeSortBlockCursor block_cursor( + _pool->add(new MergeSortCursorImpl(block, _sort_description))); if (!block_cursor.totally_greater(_block_priority_queue.top())) { _state->sorted_blocks.emplace_back(std::move(block)); _block_priority_queue.push(block_cursor); @@ -176,75 +176,4 @@ Status FullSorter::_do_sort() { return Status::OK(); } -TopNSorter::TopNSorter(SortDescription& sort_description, VSortExecExprs& vsort_exec_exprs, - int limit, int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order, - std::vector<bool>& nulls_first, const RowDescriptor& row_desc) - : Sorter(sort_description, vsort_exec_exprs, limit, offset, pool, is_asc_order, - nulls_first), - _state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {} - -Status TopNSorter::append_block(Block* block, bool* mem_reuse) { - DCHECK(block->rows() > 0); - RETURN_IF_ERROR(_do_sort(block, mem_reuse)); - return Status::OK(); -} - -Status TopNSorter::prepare_for_read() { - _state->build_merge_tree(_sort_description); - return Status::OK(); -} - -Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->sorted_blocks.empty()) { - *eos = true; - } else if (_state->sorted_blocks.size() == 1) { - if (_offset != 0) { - _state->sorted_blocks[0].skip_num_rows(_offset); - } - block->swap(_state->sorted_blocks[0]); - *eos = true; - } else { - RETURN_IF_ERROR(_state->merge_sort_read(state, block, eos)); - } - return Status::OK(); -} - -Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) { - *mem_reuse = false; - RETURN_IF_ERROR(partial_sort(*block)); - // dispose TOP-N logic - if (_limit != -1) { - // Here is a little opt to reduce the mem uasge, we build a max heap - // to order the block in _block_priority_queue. - // if one block totally greater the heap top of _block_priority_queue - // we can throw the block data directly. - if (_state->num_rows < _limit) { - Block sorted_block; - sorted_block.swap(*block); - _state->sorted_blocks.emplace_back(std::move(sorted_block)); - _state->num_rows += sorted_block.rows(); - _block_priority_queue.emplace(_pool->add( - new SortCursorImpl(_state->sorted_blocks.back(), _sort_description))); - } else { - Block sorted_block; - sorted_block.swap(*block); - SortBlockCursor block_cursor( - _pool->add(new SortCursorImpl(sorted_block, _sort_description))); - if (!block_cursor.totally_greater(_block_priority_queue.top())) { - _state->sorted_blocks.emplace_back(std::move(sorted_block)); - _block_priority_queue.push(block_cursor); - } else { - *mem_reuse = true; - block->clear_column_data(); - } - } - } else { - Block sorted_block; - sorted_block.swap(*block); - // dispose normal sort logic - _state->sorted_blocks.emplace_back(std::move(sorted_block)); - } - return Status::OK(); -} - } // namespace doris::vectorized diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 9d881538a0..2e56f8012a 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -47,8 +47,8 @@ public: Status merge_sort_read(doris::RuntimeState* state, doris::vectorized::Block* block, bool* eos); - std::priority_queue<SortCursor> priority_queue; - std::vector<SortCursorImpl> cursors; + std::priority_queue<MergeSortCursor> priority_queue; + std::vector<MergeSortCursorImpl> cursors; std::unique_ptr<MutableBlock> unsorted_block; std::vector<Block> sorted_blocks; uint64_t num_rows = 0; @@ -60,11 +60,9 @@ private: class Sorter { public: - Sorter(SortDescription& sort_description, VSortExecExprs& vsort_exec_exprs, int limit, - int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order, - std::vector<bool>& nulls_first) - : _sort_description(sort_description), - _vsort_exec_exprs(vsort_exec_exprs), + Sorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, + std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first) + : _vsort_exec_exprs(vsort_exec_exprs), _limit(limit), _offset(offset), _pool(pool), @@ -73,10 +71,10 @@ public: virtual ~Sorter() = default; - void init_profile(RuntimeProfile* runtime_profile) { + virtual void init_profile(RuntimeProfile* runtime_profile) { _partial_sort_timer = ADD_TIMER(runtime_profile, "PartialSortTime"); _merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime"); - } + }; virtual Status append_block(Block* block, bool* mem_reuse) = 0; @@ -87,7 +85,7 @@ public: protected: Status partial_sort(Block& block); - SortDescription& _sort_description; + SortDescription _sort_description; VSortExecExprs& _vsort_exec_exprs; int _limit; int64_t _offset; @@ -95,16 +93,17 @@ protected: std::vector<bool>& _is_asc_order; std::vector<bool>& _nulls_first; - std::priority_queue<SortBlockCursor> _block_priority_queue; RuntimeProfile::Counter* _partial_sort_timer = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; + + std::priority_queue<MergeSortBlockCursor> _block_priority_queue; }; class FullSorter final : public Sorter { public: - FullSorter(SortDescription& sort_description, VSortExecExprs& vsort_exec_exprs, int limit, - int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order, - std::vector<bool>& nulls_first, const RowDescriptor& row_desc); + FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, + std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, + const RowDescriptor& row_desc); ~FullSorter() override = default; @@ -128,24 +127,4 @@ private: static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20; }; -class TopNSorter final : public Sorter { -public: - TopNSorter(SortDescription& sort_description, VSortExecExprs& vsort_exec_exprs, int limit, - int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order, - std::vector<bool>& nulls_first, const RowDescriptor& row_desc); - - ~TopNSorter() override = default; - - Status append_block(Block* block, bool* mem_reuse) override; - - Status prepare_for_read() override; - - Status get_next(RuntimeState* state, Block* block, bool* eos) override; - -private: - Status _do_sort(Block* block, bool* mem_reuse); - - std::unique_ptr<MergeSorterState> _state; -}; - } // namespace doris::vectorized diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp new file mode 100644 index 0000000000..4ed7af6d04 --- /dev/null +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/common/sort/topn_sorter.h" + +namespace doris::vectorized { + +TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, + ObjectPool* pool, std::vector<bool>& is_asc_order, + std::vector<bool>& nulls_first, const RowDescriptor& row_desc) + : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), + _state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {} + +Status TopNSorter::append_block(Block* block, bool* mem_reuse) { + DCHECK(block->rows() > 0); + RETURN_IF_ERROR(_do_sort(block, mem_reuse)); + return Status::OK(); +} + +Status TopNSorter::prepare_for_read() { + _state->build_merge_tree(_sort_description); + return Status::OK(); +} + +Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) { + if (_state->sorted_blocks.empty()) { + *eos = true; + } else if (_state->sorted_blocks.size() == 1) { + if (_offset != 0) { + _state->sorted_blocks[0].skip_num_rows(_offset); + } + block->swap(_state->sorted_blocks[0]); + *eos = true; + } else { + RETURN_IF_ERROR(_state->merge_sort_read(state, block, eos)); + } + return Status::OK(); +} + +Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) { + *mem_reuse = false; + RETURN_IF_ERROR(partial_sort(*block)); + // dispose TOP-N logic + if (_limit != -1) { + // Here is a little opt to reduce the mem uasge, we build a max heap + // to order the block in _block_priority_queue. + // if one block totally greater the heap top of _block_priority_queue + // we can throw the block data directly. + if (_state->num_rows < _limit) { + Block sorted_block; + sorted_block.swap(*block); + _state->sorted_blocks.emplace_back(std::move(sorted_block)); + _state->num_rows += sorted_block.rows(); + _block_priority_queue.emplace(_pool->add( + new MergeSortCursorImpl(_state->sorted_blocks.back(), _sort_description))); + } else { + Block sorted_block; + sorted_block.swap(*block); + MergeSortBlockCursor block_cursor( + _pool->add(new MergeSortCursorImpl(sorted_block, _sort_description))); + if (!block_cursor.totally_greater(_block_priority_queue.top())) { + _state->sorted_blocks.emplace_back(std::move(sorted_block)); + _block_priority_queue.push(block_cursor); + } else { + *mem_reuse = true; + block->clear_column_data(); + } + } + } else { + Block sorted_block; + sorted_block.swap(*block); + // dispose normal sort logic + _state->sorted_blocks.emplace_back(std::move(sorted_block)); + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/common/sort/topn_sorter.h b/be/src/vec/common/sort/topn_sorter.h new file mode 100644 index 0000000000..675442f5a1 --- /dev/null +++ b/be/src/vec/common/sort/topn_sorter.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <queue> + +#include "vec/common/sort/sorter.h" + +namespace doris::vectorized { + +class TopNSorter final : public Sorter { +public: + TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, + std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, + const RowDescriptor& row_desc); + + ~TopNSorter() override = default; + + Status append_block(Block* block, bool* mem_reuse) override; + + Status prepare_for_read() override; + + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + + static constexpr size_t TOPN_SORT_THRESHOLD = 256; + +private: + Status _do_sort(Block* block, bool* mem_reuse); + + std::unique_ptr<MergeSorterState> _state; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/core/sort_block.h b/be/src/vec/core/sort_block.h index 2a2babf46f..cc791881c1 100644 --- a/be/src/vec/core/sort_block.h +++ b/be/src/vec/core/sort_block.h @@ -77,7 +77,7 @@ struct EqualRangeIterator { // should continue to sort this row according to current column. Using the first non-zero // value and first zero value after first non-zero value as two bounds, we can get an equal range here if (!(_cur_range_begin == 0) || !(_flags[_cur_range_begin] == 1)) { - _cur_range_begin = simd::find_nonzero(_flags, _cur_range_begin + 1); + _cur_range_begin = simd::find_one(_flags, _cur_range_begin + 1); if (_cur_range_begin >= _end) { return false; } diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index d6c8613bb1..b13316fe40 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -21,22 +21,133 @@ #pragma once #include "vec/columns/column.h" -#include "vec/columns/column_string.h" -#include "vec/common/assert_cast.h" -#include "vec/common/typeid_cast.h" #include "vec/core/block.h" -#include "vec/core/column_numbers.h" #include "vec/core/sort_description.h" #include "vec/exprs/vexpr_context.h" -#include "vec/runtime/vdata_stream_recvr.h" namespace doris::vectorized { +struct HeapSortCursorBlockView { +public: + Block block; + ColumnRawPtrs sort_columns; + SortDescription& desc; + + HeapSortCursorBlockView(Block&& cur_block, SortDescription& sort_desc) + : block(cur_block), desc(sort_desc) { + _reset(); + } + + void filter_block(IColumn::Filter& filter) { + Block::filter_block_internal(&block, filter, block.columns()); + _reset(); + } + +private: + void _reset() { + sort_columns.clear(); + auto columns = block.get_columns(); + for (size_t j = 0, size = desc.size(); j < size; ++j) { + auto& column_desc = desc[j]; + size_t column_number = !column_desc.column_name.empty() + ? block.get_position_by_name(column_desc.column_name) + : column_desc.column_number; + sort_columns.push_back(columns[column_number].get()); + } + } +}; + +// Use `SharedHeapSortCursorBlockView` for `HeapSortCursorBlockView` instead of shared_ptr because there will be no +// concurrent operation for `HeapSortCursorBlockView` and we don't need the lock inside shared_ptr +class SharedHeapSortCursorBlockView { +public: + SharedHeapSortCursorBlockView(HeapSortCursorBlockView&& reference) + : _ref_count(0), _reference(std::move(reference)) {} + SharedHeapSortCursorBlockView(const SharedHeapSortCursorBlockView&) = delete; + void unref() noexcept { + DCHECK_GT(_ref_count, 0); + _ref_count--; + if (_ref_count == 0) { + delete this; + } + } + void ref() noexcept { _ref_count++; } + + HeapSortCursorBlockView& value() { return _reference; } + +private: + ~SharedHeapSortCursorBlockView() noexcept = default; + int _ref_count; + HeapSortCursorBlockView _reference; +}; + +struct HeapSortCursorImpl { +public: + HeapSortCursorImpl(int row_id, SharedHeapSortCursorBlockView* block_view) + : _row_id(row_id), _block_view(block_view) { + block_view->ref(); + } + + HeapSortCursorImpl(const HeapSortCursorImpl& other) { + _row_id = other._row_id; + _block_view = other._block_view; + _block_view->ref(); + } + + HeapSortCursorImpl(HeapSortCursorImpl&& other) { + _row_id = other._row_id; + _block_view = other._block_view; + other._block_view = nullptr; + } + + HeapSortCursorImpl& operator=(HeapSortCursorImpl&& other) { + std::swap(_row_id, other._row_id); + std::swap(_block_view, other._block_view); + return *this; + } + + ~HeapSortCursorImpl() { + if (_block_view) { + _block_view->unref(); + } + }; + + const size_t row_id() const { return _row_id; } + + const ColumnRawPtrs& sort_columns() const { return _block_view->value().sort_columns; } + + const Block* block() const { return &_block_view->value().block; } + + const SortDescription& sort_desc() const { return _block_view->value().desc; } + + bool operator<(const HeapSortCursorImpl& rhs) const { + for (size_t i = 0; i < sort_desc().size(); ++i) { + int direction = sort_desc()[i].direction; + int nulls_direction = sort_desc()[i].nulls_direction; + int res = direction * sort_columns()[i]->compare_at(row_id(), rhs.row_id(), + *(rhs.sort_columns()[i]), + nulls_direction); + // ASC: direction == 1. If bigger, res > 0. So we return true. + if (res < 0) { + return true; + } + if (res > 0) { + return false; + } + } + return false; + } + +private: + size_t _row_id; + SharedHeapSortCursorBlockView* _block_view; +}; + /** Cursor allows to compare rows in different blocks (and parts). * Cursor moves inside single block. * It is used in priority queue. */ -struct SortCursorImpl { +struct MergeSortCursorImpl { ColumnRawPtrs all_columns; ColumnRawPtrs sort_columns; SortDescription desc; @@ -44,20 +155,21 @@ struct SortCursorImpl { size_t pos = 0; size_t rows = 0; - SortCursorImpl() = default; - virtual ~SortCursorImpl() = default; + MergeSortCursorImpl() = default; + virtual ~MergeSortCursorImpl() = default; - SortCursorImpl(const Block& block, const SortDescription& desc_) + MergeSortCursorImpl(const Block& block, const SortDescription& desc_) : desc(desc_), sort_columns_size(desc.size()) { reset(block); } - SortCursorImpl(const Columns& columns, const SortDescription& desc_) + MergeSortCursorImpl(const Columns& columns, const SortDescription& desc_) : desc(desc_), sort_columns_size(desc.size()) { for (auto& column_desc : desc) { if (!column_desc.column_name.empty()) { - LOG(FATAL) << "SortDesctiption should contain column position if SortCursor was " - "used without header."; + LOG(FATAL) + << "SortDesctiption should contain column position if MergeSortCursor was " + "used without header."; } } reset(columns, {}); @@ -101,7 +213,7 @@ struct SortCursorImpl { using BlockSupplier = std::function<Status(Block**)>; -struct ReceiveQueueSortCursorImpl : public SortCursorImpl { +struct ReceiveQueueSortCursorImpl : public MergeSortCursorImpl { ReceiveQueueSortCursorImpl(const BlockSupplier& block_supplier, const std::vector<VExprContext*>& ordering_expr, const std::vector<bool>& is_asc_order, @@ -123,7 +235,7 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl { for (int i = 0; i < desc.size(); ++i) { _ordering_expr[i]->execute(_block_ptr, &desc[i].column_number); } - SortCursorImpl::reset(*_block_ptr); + MergeSortCursorImpl::reset(*_block_ptr); return true; } _block_ptr = nullptr; @@ -150,14 +262,14 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl { }; /// For easy copying. -struct SortCursor { - SortCursorImpl* impl; +struct MergeSortCursor { + MergeSortCursorImpl* impl; - SortCursor(SortCursorImpl* impl_) : impl(impl_) {} - SortCursorImpl* operator->() const { return impl; } + MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl; } /// The specified row of this cursor is greater than the specified row of another cursor. - int8_t greater_at(const SortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { + int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { for (size_t i = 0; i < impl->sort_columns_size; ++i) { int direction = impl->desc[i].direction; int nulls_direction = impl->desc[i].nulls_direction; @@ -175,7 +287,7 @@ struct SortCursor { } /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. - bool totally_less(const SortCursor& rhs) const { + bool totally_less(const MergeSortCursor& rhs) const { if (impl->rows == 0 || rhs.impl->rows == 0) { return false; } @@ -184,23 +296,23 @@ struct SortCursor { return greater_at(rhs, impl->rows - 1, 0) == -1; } - bool greater(const SortCursor& rhs) const { + bool greater(const MergeSortCursor& rhs) const { return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0; } /// Inverted so that the priority queue elements are removed in ascending order. - bool operator<(const SortCursor& rhs) const { return greater(rhs); } + bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); } }; /// For easy copying. -struct SortBlockCursor { - SortCursorImpl* impl; +struct MergeSortBlockCursor { + MergeSortCursorImpl* impl; - SortBlockCursor(SortCursorImpl* impl_) : impl(impl_) {} - SortCursorImpl* operator->() const { return impl; } + MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl; } /// The specified row of this cursor is greater than the specified row of another cursor. - int8_t less_at(const SortBlockCursor& rhs, int rows) const { + int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const { for (size_t i = 0; i < impl->sort_columns_size; ++i) { int direction = impl->desc[i].direction; int nulls_direction = impl->desc[i].nulls_direction; @@ -218,7 +330,7 @@ struct SortBlockCursor { } /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. - bool totally_greater(const SortBlockCursor& rhs) const { + bool totally_greater(const MergeSortBlockCursor& rhs) const { if (impl->rows == 0 || rhs.impl->rows == 0) { return false; } @@ -228,7 +340,9 @@ struct SortBlockCursor { } /// Inverted so that the priority queue elements are removed in ascending order. - bool operator<(const SortBlockCursor& rhs) const { return less_at(rhs, impl->rows - 1) == 1; } + bool operator<(const MergeSortBlockCursor& rhs) const { + return less_at(rhs, impl->rows - 1) == 1; + } }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index c090efa961..54323c561c 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -17,10 +17,13 @@ #include "vec/exec/vsort_node.h" +#include "common/config.h" #include "exec/sort_exec_exprs.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/debug_util.h" +#include "vec/common/sort/heap_sorter.h" +#include "vec/common/sort/topn_sorter.h" #include "vec/core/sort_block.h" #include "vec/utils/util.hpp" @@ -35,24 +38,23 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool)); _is_asc_order = tnode.sort_node.sort_info.is_asc_order; _nulls_first = tnode.sort_node.sort_info.nulls_first; - bool has_string_slot = false; - for (const auto& tuple_desc : child(0)->row_desc().tuple_descriptors()) { - for (const auto& slot : tuple_desc->slots()) { - if (slot->type().is_string_type()) { - has_string_slot = true; - break; - } - } - if (has_string_slot) { - break; - } - } - if (has_string_slot && _limit > 0 && _limit < ACCUMULATED_PARTIAL_SORT_THRESHOLD) { - _sorter.reset(new TopNSorter(_sort_description, _vsort_exec_exprs, _limit, _offset, _pool, - _is_asc_order, _nulls_first, child(0)->row_desc())); + const auto& row_desc = child(0)->row_desc(); + // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap sort in priority. + // To do heap sorting, each income block will be filtered by heap-top row. There will be some + // `memcpy` operations. To ensure heap sort will not incur performance fallback, we should + // exclude cases which incoming blocks has string column which is sensitive to operations like + // `filter` and `memcpy` + if (_limit > 0 && _limit + _offset < HeapSorter::HEAP_SORT_THRESHOLD && + !row_desc.has_varlen_slots()) { + _sorter.reset(new HeapSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, + _nulls_first, row_desc)); + } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit > 0 && + _limit + _offset < TopNSorter::TOPN_SORT_THRESHOLD) { + _sorter.reset(new TopNSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, + _nulls_first, row_desc)); } else { - _sorter.reset(new FullSorter(_sort_description, _vsort_exec_exprs, _limit, _offset, _pool, - _is_asc_order, _nulls_first, child(0)->row_desc())); + _sorter.reset(new FullSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, + _nulls_first, row_desc)); } _sorter->init_profile(_runtime_profile.get()); diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index e7cb12b437..ff640b6851 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -63,8 +63,6 @@ private: std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; - SortDescription _sort_description; - std::unique_ptr<Sorter> _sorter; static constexpr size_t ACCUMULATED_PARTIAL_SORT_THRESHOLD = 256; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index f7bd965299..937d743780 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -50,7 +50,7 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs, bool p } for (auto& _cursor : _cursors) { - if (!_cursor._is_eof) _priority_queue.push(SortCursor(&_cursor)); + if (!_cursor._is_eof) _priority_queue.push(MergeSortCursor(&_cursor)); } for (const auto& cursor : _cursors) { @@ -142,7 +142,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { return Status::OK(); } -void VSortedRunMerger::next_heap(SortCursor& current) { +void VSortedRunMerger::next_heap(MergeSortCursor& current) { if (!current->isLast()) { current->next(); _priority_queue.push(current); @@ -151,7 +151,7 @@ void VSortedRunMerger::next_heap(SortCursor& current) { } } -inline bool VSortedRunMerger::has_next_block(doris::vectorized::SortCursor& current) { +inline bool VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) { ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer); return current->has_next_block(); } diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 05c463fec3..e374f2cdc0 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -72,7 +72,7 @@ protected: size_t _offset = 0; std::vector<ReceiveQueueSortCursorImpl> _cursors; - std::priority_queue<SortCursor> _priority_queue; + std::priority_queue<MergeSortCursor> _priority_queue; Block _empty_block; @@ -83,8 +83,8 @@ protected: RuntimeProfile::Counter* _get_next_block_timer; private: - void next_heap(SortCursor& current); - bool has_next_block(SortCursor& current); + void next_heap(MergeSortCursor& current); + bool has_next_block(MergeSortCursor& current); }; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org