This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d0d9a6669c5 [Enchancement](sort) optimize for heap sort (#51368) d0d9a6669c5 is described below commit d0d9a6669c54133ff2b7d8c423f986a74a6bb635 Author: Pxl <x...@selectdb.com> AuthorDate: Mon Jun 16 20:13:59 2025 +0800 [Enchancement](sort) optimize for heap sort (#51368) use MergeSorterQueue to replace priority_queue. ```sql SELECT COUNT(UserAgentMajor), COUNT(ResolutionHeight), COUNT(URLCategoryID), COUNT(OS), COUNT(EventTime), COUNT(CounterID), COUNT(EventDate), COUNT(UserID), COUNT(UTMSource), COUNT(UTMMedium), COUNT(UTMCampaign), COUNT(UTMContent) FROM ( SELECT UserAgentMajor, ResolutionHeight, URLCategoryID, OS, EventTime, CounterID, EventDate, UserID, UTMSource, UTMMedium, UTMCampaign, UTMContent FROM hits_100m ORDER BY UserAgentMajor, ResolutionHeight, URLCategoryID, OS, EventTime L [...] before: - AppendBlockTime: 10sec77ms - ExecTime: 13sec646ms after: - AppendBlockTime: 1sec811ms - ExecTime: 2sec31ms ``` --- be/src/vec/common/sort/heap_sorter.cpp | 190 ++++----------------- be/src/vec/common/sort/heap_sorter.h | 78 +-------- be/src/vec/common/sort/partition_sorter.cpp | 4 +- be/src/vec/common/sort/sorter.cpp | 16 +- be/src/vec/common/sort/sorter.h | 5 +- be/src/vec/common/sort/topn_sorter.cpp | 2 +- be/src/vec/core/sort_cursor.h | 120 ++++--------- be/src/vec/runtime/vsorted_run_merger.cpp | 2 +- be/test/pipeline/operator/sort_operator_test.cpp | 33 +++- be/test/vec/exec/sort/heap_sorter_test.cpp | 36 ++-- be/test/vec/exec/sort/merge_sorter_state.cpp | 2 +- .../data/variant_p0/test_sub_path_pruning.out | Bin 5855 -> 5855 bytes .../data/variant_p0/topn_opt_read_by_rowids.out | Bin 9886 -> 9886 bytes .../suites/variant_p0/test_sub_path_pruning.groovy | 32 ++-- .../variant_p0/topn_opt_read_by_rowids.groovy | 8 +- 15 files changed, 152 insertions(+), 376 deletions(-) diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index dfabd9e436a..aed775cb6d8 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -17,199 +17,67 @@ #include "vec/common/sort/heap_sorter.h" -#include <glog/logging.h> - -#include <algorithm> - -#include "runtime/primitive_type.h" -#include "runtime/thread_context.h" -#include "util/defer_op.h" -#include "vec/columns/column.h" -#include "vec/columns/column_nullable.h" -#include "vec/common/sort/vsort_exec_exprs.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/sort_description.h" -#include "vec/data_types/data_type_nullable.h" -#include "vec/exprs/vexpr_context.h" - -namespace doris { +namespace doris::vectorized { #include "common/compile_check_begin.h" -class ObjectPool; -class RowDescriptor; -class RuntimeState; -} // namespace doris -namespace doris::vectorized { HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, const RowDescriptor& row_desc) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _data_size(0), _heap_size(limit + offset), - _heap(SortingHeap::create_unique()), - _topn_filter_rows(0), - _init_sort_descs(false) {} + _state(MergeSorterState::create_unique(row_desc, offset)) {} Status HeapSorter::append_block(Block* block) { - DCHECK(block->rows() > 0); - { - SCOPED_TIMER(_materialize_timer); - if (_vsort_exec_exprs.need_materialize_tuple()) { - auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); - std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size()); - for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) { - RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block, &valid_column_ids[i])); - } - - Block new_block; - int i = 0; - const auto& convert_nullable_flags = _vsort_exec_exprs.get_convert_nullable_flags(); - for (auto column_id : valid_column_ids) { - if (column_id < 0) { - continue; - } - if (i < convert_nullable_flags.size() && convert_nullable_flags[i]) { - auto column_ptr = make_nullable(block->get_by_position(column_id).column); - new_block.insert({column_ptr, - make_nullable(block->get_by_position(column_id).type), ""}); - } else { - new_block.insert(block->get_by_position(column_id)); - } - i++; - } - block->swap(new_block); + auto tmp_block = std::make_shared<Block>(block->clone_empty()); + RETURN_IF_ERROR(partial_sort(*block, *tmp_block, true)); + _queue.push( + MergeSortCursor(std::make_shared<MergeSortCursorImpl>(tmp_block, _sort_description))); + _queue_row_num += tmp_block->rows(); + _data_size += tmp_block->allocated_bytes(); + + while (_queue.is_valid() && _queue_row_num > _heap_size) { + auto [current, current_rows] = _queue.current(); + current_rows = std::min(current_rows, _queue_row_num - _heap_size); + + if (!current->impl->is_last(current_rows)) { + _queue.next(current_rows); + } else { + _queue.remove_top(); + _data_size -= current->impl->block->allocated_bytes(); } + _queue_row_num -= current_rows; } - if (!_init_sort_descs) { - RETURN_IF_ERROR(_prepare_sort_descs(block)); - } - Block tmp_block = block->clone_empty(); - tmp_block.swap(*block); - size_t num_rows = tmp_block.rows(); - auto block_view = - std::make_shared<HeapSortCursorBlockView>(std::move(tmp_block), _sort_description); - bool filtered = false; - if (_heap_size == _heap->size()) { - { - SCOPED_TIMER(_topn_filter_timer); - _do_filter(*block_view, num_rows); - } - size_t remain_rows = block_view->block.rows(); - _topn_filter_rows += (num_rows - remain_rows); - COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows); - filtered = remain_rows == 0; - for (size_t i = 0; i < remain_rows; ++i) { - HeapSortCursorImpl cursor(i, block_view); - _heap->replace_top_if_less(std::move(cursor)); - } - } else { - size_t free_slots = std::min<size_t>(_heap_size - _heap->size(), num_rows); - - size_t i = 0; - for (; i < free_slots; ++i) { - HeapSortCursorImpl cursor(i, block_view); - _heap->push(std::move(cursor)); - } - for (; i < num_rows; ++i) { - HeapSortCursorImpl cursor(i, block_view); - _heap->replace_top_if_less(std::move(cursor)); - } - } - if (!filtered) { - _data_size += block_view->block.allocated_bytes(); - } return Status::OK(); } Status HeapSorter::prepare_for_read() { - if (!_heap->empty() && _heap->size() > _offset) { - const auto& top = _heap->top(); - size_t num_columns = top.block()->columns(); - MutableColumns result_columns = top.block()->clone_empty_columns(); - - size_t init_size = std::min((size_t)_limit, _heap->size()); - result_columns.reserve(init_size); - - DCHECK(_heap->size() <= _heap_size); - // Use a vector to reverse elements in heap - std::vector<HeapSortCursorImpl> vector_to_reverse; - vector_to_reverse.reserve(init_size); - size_t capacity = 0; - while (!_heap->empty()) { - auto current = _heap->top(); - _heap->pop(); - vector_to_reverse.emplace_back(std::move(current)); - capacity++; - if (_offset != 0 && _heap->size() == _offset) { - break; - } - } - for (int64_t i = capacity - 1; i >= 0; i--) { - auto rid = vector_to_reverse[i].row_id(); - const auto cur_block = vector_to_reverse[i].block(); - Columns columns = cur_block->get_columns(); - for (size_t j = 0; j < num_columns; ++j) { - result_columns[j]->insert_from(*(columns[j]), rid); - } + while (_queue.is_valid()) { + auto [current, current_rows] = _queue.current(); + if (current_rows) { + current->impl->reverse(); + _state->get_queue().push(MergeSortCursor(current->impl)); } - _return_block = vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns)); + _queue.remove_top(); } return Status::OK(); } Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - _return_block.swap(*block); - *eos = true; - return Status::OK(); + return _state->merge_sort_read(block, state->batch_size(), eos); } Field HeapSorter::get_top_value() { Field field {PrimitiveType::TYPE_NULL}; // get field from first sort column of top row - if (_heap->size() >= _heap_size) { - auto& top = _heap->top(); - top.sort_columns()[0]->get(top.row_id(), field); + if (_queue_row_num >= _heap_size) { + auto [current, current_rows] = _queue.current(); + field = current->get_top_value(); } return field; } -// need exception safety -void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t num_rows) { - const auto& top_cursor = _heap->top(); - const auto cursor_rid = top_cursor.row_id(); - - IColumn::Filter filter(num_rows); - for (size_t i = 0; i < num_rows; ++i) { - filter[i] = 0; - } - - std::vector<uint8_t> cmp_res(num_rows, 0); - - for (size_t col_id = 0; col_id < _sort_description.size(); ++col_id) { - block_view.sort_columns[col_id]->compare_internal( - cursor_rid, *top_cursor.sort_columns()[col_id], - _sort_description[col_id].nulls_direction, _sort_description[col_id].direction, - cmp_res, filter.data()); - } - block_view.filter_block(filter); -} - -Status HeapSorter::_prepare_sort_descs(Block* block) { - _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size()); - for (int i = 0; i < _sort_description.size(); i++) { - const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i]; - RETURN_IF_ERROR(ordering_expr->execute(block, &_sort_description[i].column_number)); - - _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; - _sort_description[i].nulls_direction = - _nulls_first[i] ? -_sort_description[i].direction : _sort_description[i].direction; - } - _init_sort_descs = true; - return Status::OK(); -} - size_t HeapSorter::data_size() const { return _data_size; } diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h index b36ef28af70..16a1e7d5d08 100644 --- a/be/src/vec/common/sort/heap_sorter.h +++ b/be/src/vec/common/sort/heap_sorter.h @@ -16,62 +16,11 @@ // under the License. #pragma once -#include <gen_cpp/Metrics_types.h> -#include <stddef.h> -#include <stdint.h> -#include <memory> -#include <queue> -#include <utility> -#include <vector> - -#include "common/status.h" -#include "util/runtime_profile.h" #include "vec/common/sort/sorter.h" -#include "vec/core/block.h" -#include "vec/core/field.h" -#include "vec/core/sort_cursor.h" - -namespace doris { -#include "common/compile_check_begin.h" -class ObjectPool; -class RowDescriptor; -class RuntimeState; -namespace vectorized { -class VSortExecExprs; -} // namespace vectorized -} // namespace doris namespace doris::vectorized { - -class SortingHeap { - ENABLE_FACTORY_CREATOR(SortingHeap); - -public: - const HeapSortCursorImpl& top() { return _queue.top(); } - - size_t size() { return _queue.size(); } - - bool empty() { return _queue.empty(); } - - void pop() { _queue.pop(); } - - void replace_top(HeapSortCursorImpl&& top) { - _queue.pop(); - _queue.push(std::move(top)); - } - - void push(HeapSortCursorImpl&& cursor) { _queue.push(std::move(cursor)); } - - void replace_top_if_less(HeapSortCursorImpl&& val) { - if (val < top()) { - replace_top(std::move(val)); - } - } - -private: - std::priority_queue<HeapSortCursorImpl> _queue; -}; +#include "common/compile_check_begin.h" class HeapSorter final : public Sorter { ENABLE_FACTORY_CREATOR(HeapSorter); @@ -83,12 +32,6 @@ public: ~HeapSorter() override = default; - void init_profile(RuntimeProfile* runtime_profile) override { - _topn_filter_timer = ADD_TIMER(runtime_profile, "TopNFilterTime"); - _topn_filter_rows_counter = ADD_COUNTER(runtime_profile, "TopNFilterRows", TUnit::UNIT); - _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime"); - } - Status append_block(Block* block) override; Status prepare_for_read() override; @@ -99,23 +42,14 @@ public: Field get_top_value() override; - static constexpr size_t HEAP_SORT_THRESHOLD = 1024; - private: - void _do_filter(HeapSortCursorBlockView& block_view, size_t num_rows); - Status _prepare_sort_descs(Block* block); - size_t _data_size; - size_t _heap_size; - std::unique_ptr<SortingHeap> _heap; - Block _return_block; - int64_t _topn_filter_rows; - bool _init_sort_descs; - - RuntimeProfile::Counter* _topn_filter_timer = nullptr; - RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr; - RuntimeProfile::Counter* _materialize_timer = nullptr; + size_t _data_size = 0; + size_t _heap_size = 0; + size_t _queue_row_num = 0; + MergeSorterQueue _queue; + std::unique_ptr<MergeSorterState> _state; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 09c45e83c6a..e1b27e790a4 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -47,7 +47,7 @@ PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit bool has_global_limit, int64_t partition_inner_limit, TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)), + _state(MergeSorterState::create_unique(row_desc, offset)), _row_desc(row_desc), _partition_inner_limit(partition_inner_limit), _top_n_algorithm( @@ -81,7 +81,7 @@ Status PartitionSorter::prepare_for_read() { void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { std::priority_queue<MergeSortBlockCursor> empty_queue; std::swap(_block_priority_queue, empty_queue); - _state = MergeSorterState::create_unique(_row_desc, _offset, _limit, runtime_state, nullptr); + _state = MergeSorterState::create_unique(_row_desc, _offset); // _previous_row->impl inited at partition_sort_read function, // but maybe call get_next after do_partition_topn_sort() function, and running into else if branch at line 92L // so _previous_row->impl == nullptr and no need reset. diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 1df2d2bdc62..a2a69552192 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -152,7 +152,7 @@ Status Sorter::merge_sort_read_for_spill(RuntimeState* state, doris::vectorized: return get_next(state, block, eos); } -Status Sorter::partial_sort(Block& src_block, Block& dest_block) { +Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed) { size_t num_cols = src_block.columns(); if (_materialize_sort_exprs) { auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); @@ -189,18 +189,18 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) { _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; _sort_description[i].nulls_direction = _nulls_first[i] ? -_sort_description[i].direction : _sort_description[i].direction; + if (reversed) { + _sort_description[i].direction *= -1; + } } { SCOPED_TIMER(_partial_sort_timer); - if (_materialize_sort_exprs) { - sort_block(dest_block, dest_block, _sort_description, _offset + _limit); - } else { - sort_block(src_block, dest_block, _sort_description, _offset + _limit); - } - src_block.clear_column_data(num_cols); + uint64_t limit = reversed ? 0 : (_offset + _limit); + sort_block(*result_block, dest_block, _sort_description, limit); } + src_block.clear_column_data(num_cols); return Status::OK(); } @@ -209,7 +209,7 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t std::vector<bool>& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)) {} + _state(MergeSorterState::create_unique(row_desc, offset)) {} // check whether the unsorted block can hold more data from input block and no need to alloc new memory bool FullSorter::has_enough_capacity(Block* input_block, Block* unsorted_block) const { diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 37e74947ccc..cd56af60775 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -52,8 +52,7 @@ class MergeSorterState { ENABLE_FACTORY_CREATOR(MergeSorterState); public: - MergeSorterState(const RowDescriptor& row_desc, int64_t offset, int64_t limit, - RuntimeState* state, RuntimeProfile* profile) + MergeSorterState(const RowDescriptor& row_desc, int64_t offset) // create_empty_block should ignore invalid slots, unsorted_block // should be same structure with arrival block from child node // since block from child node may ignored these slots @@ -152,7 +151,7 @@ public: void set_enable_spill() { _enable_spill = true; } protected: - Status partial_sort(Block& src_block, Block& dest_block); + Status partial_sort(Block& src_block, Block& dest_block, bool reversed = false); bool _enable_spill = false; SortDescription _sort_description; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 3e0bc5ee688..fe3cecca5cd 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -44,7 +44,7 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t std::vector<bool>& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)), + _state(MergeSorterState::create_unique(row_desc, offset)), _row_desc(row_desc) {} Status TopNSorter::append_block(Block* block) { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 9331508c376..3e3baf6a9e0 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -24,98 +24,12 @@ #include "vec/columns/column.h" #include "vec/core/block.h" +#include "vec/core/field.h" #include "vec/core/sort_description.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { -struct HeapSortCursorBlockView { -public: - Block block; - ColumnRawPtrs sort_columns; - SortDescription& desc; - - HeapSortCursorBlockView(Block&& cur_block, SortDescription& sort_desc) - : block(cur_block), desc(sort_desc) { - _reset(); - } - - // need exception safety - void filter_block(IColumn::Filter& filter) { - Block::filter_block_internal(&block, filter, block.columns()); - _reset(); - } - -private: - void _reset() { - sort_columns.clear(); - auto columns = block.get_columns_and_convert(); - for (auto& column_desc : desc) { - size_t column_number = !column_desc.column_name.empty() - ? block.get_position_by_name(column_desc.column_name) - : column_desc.column_number; - sort_columns.push_back(columns[column_number].get()); - } - } -}; - -using HeapSortCursorBlockSPtr = std::shared_ptr<HeapSortCursorBlockView>; - -struct HeapSortCursorImpl { -public: - HeapSortCursorImpl(size_t row_id, HeapSortCursorBlockSPtr block_view) - : _row_id(row_id), _block_view(std::move(block_view)) {} - - HeapSortCursorImpl(const HeapSortCursorImpl& other) { - _row_id = other._row_id; - _block_view = other._block_view; - } - - HeapSortCursorImpl(HeapSortCursorImpl&& other) { - _row_id = other._row_id; - _block_view = other._block_view; - other._block_view = nullptr; - } - - HeapSortCursorImpl& operator=(HeapSortCursorImpl&& other) { - std::swap(_row_id, other._row_id); - std::swap(_block_view, other._block_view); - return *this; - } - - ~HeapSortCursorImpl() = default; - - size_t row_id() const { return _row_id; } - - const ColumnRawPtrs& sort_columns() const { return _block_view->sort_columns; } - - const Block* block() const { return &_block_view->block; } - - const SortDescription& sort_desc() const { return _block_view->desc; } - - bool operator<(const HeapSortCursorImpl& rhs) const { - for (size_t i = 0; i < sort_desc().size(); ++i) { - int direction = sort_desc()[i].direction; - int nulls_direction = sort_desc()[i].nulls_direction; - int res = direction * sort_columns()[i]->compare_at(row_id(), rhs.row_id(), - *(rhs.sort_columns()[i]), - nulls_direction); - // ASC: direction == 1. If bigger, res > 0. So we return true. - if (res < 0) { - return true; - } - if (res > 0) { - return false; - } - } - return false; - } - -private: - size_t _row_id; - HeapSortCursorBlockSPtr _block_view; -}; - /** Cursor allows to compare rows in different blocks (and parts). * Cursor moves inside single block. * It is used in priority queue. @@ -127,8 +41,8 @@ struct MergeSortCursorImpl { ColumnRawPtrs columns; SortDescription desc; size_t sort_columns_size = 0; - size_t pos = 0; - size_t rows = 0; + int pos = 0; + int rows = 0; MergeSortCursorImpl() = default; virtual ~MergeSortCursorImpl() = default; @@ -145,6 +59,22 @@ struct MergeSortCursorImpl { bool empty() const { return rows == 0; } + void reverse() { + MutableColumns columns_reversed; + for (auto& column : columns) { + auto col_reversed = column->clone_empty(); + for (int j = rows - 1; j >= pos; j--) { + col_reversed->insert_from(*column, j); + } + columns_reversed.push_back(std::move(col_reversed)); + } + block->set_columns(std::move(columns_reversed)); + for (auto& column_desc : desc) { + column_desc.direction *= -1; + } + reset(); + } + /// Set the cursor to the beginning of the new block. void reset() { sort_columns.clear(); @@ -174,6 +104,12 @@ struct MergeSortCursorImpl { virtual void process_next() {} virtual Block* block_ptr() { return nullptr; } virtual bool eof() const { return false; } + + Field get_top_value() const { + Field field {PrimitiveType::TYPE_NULL}; + sort_columns[0]->get(pos, field); + return field; + } }; using BlockSupplier = std::function<Status(Block*, bool* eos)>; @@ -287,6 +223,8 @@ struct MergeSortCursor { /// Inverted so that the priority queue elements are removed in ascending order. bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); } + + Field get_top_value() const { return impl->get_top_value(); } }; /// For easy copying. @@ -423,8 +361,8 @@ public: } } - void push(MergeSortCursorImpl& cursor) { - _queue.emplace_back(&cursor); + void push(MergeSortCursor cursor) { + _queue.emplace_back(std::move(cursor)); std::push_heap(_queue.begin(), _queue.end()); next_child_idx = 0; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 25376f9216b..8483e78f25b 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -119,7 +119,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { DCHECK(!current->eof()); DCHECK(current->block_ptr() != nullptr); while (_offset != 0) { - auto process_rows = std::min(current->rows - current->pos, _offset); + auto process_rows = std::min(current->rows - current->pos, (int)_offset); current->next(process_rows); _offset -= process_rows; if (current->is_last(0)) { diff --git a/be/test/pipeline/operator/sort_operator_test.cpp b/be/test/pipeline/operator/sort_operator_test.cpp index cd1a4c35d85..0cab5a95bd9 100644 --- a/be/test/pipeline/operator/sort_operator_test.cpp +++ b/be/test/pipeline/operator/sort_operator_test.cpp @@ -110,9 +110,9 @@ struct SortOperatorTest : public ::testing::Test { state->emplace_local_state(source->operator_id(), std::move(source_local_state_uptr)); } - { EXPECT_TRUE(sink_local_state->open(state.get()).ok()); } + EXPECT_TRUE(sink_local_state->open(state.get()).ok()); - { EXPECT_TRUE(source_local_state->open(state.get()).ok()); } + EXPECT_TRUE(source_local_state->open(state.get()).ok()); } bool is_block(std::vector<Dependency*> deps) { @@ -167,11 +167,17 @@ TEST_F(SortOperatorTest, test) { bool eos = false; auto st = source->get_block(state.get(), &block, &eos); EXPECT_TRUE(st.ok()) << st.msg(); - EXPECT_TRUE(eos); + EXPECT_FALSE(eos); EXPECT_EQ(block.rows(), 3); std::cout << block.dump_data() << std::endl; EXPECT_TRUE(ColumnHelper::block_equal( block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3}))); + + block.clear(); + st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_TRUE(eos); + EXPECT_EQ(block.rows(), 0); } } @@ -200,11 +206,26 @@ TEST_F(SortOperatorTest, test_dep) { bool eos = false; auto st = source->get_block(state.get(), &block, &eos); EXPECT_TRUE(st.ok()) << st.msg(); - EXPECT_TRUE(eos); - EXPECT_EQ(block.rows(), 6); + EXPECT_FALSE(eos); + EXPECT_EQ(block.rows(), 3); std::cout << block.dump_data() << std::endl; EXPECT_TRUE(ColumnHelper::block_equal( - block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5, 6}))); + block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3}))); + + block.clear(); + st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_FALSE(eos); + EXPECT_EQ(block.rows(), 3); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({4, 5, 6}))); + + block.clear(); + st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_TRUE(eos); + EXPECT_EQ(block.rows(), 0); } } diff --git a/be/test/vec/exec/sort/heap_sorter_test.cpp b/be/test/vec/exec/sort/heap_sorter_test.cpp index b1e13b4e150..24735190fc8 100644 --- a/be/test/vec/exec/sort/heap_sorter_test.cpp +++ b/be/test/vec/exec/sort/heap_sorter_test.cpp @@ -98,32 +98,48 @@ TEST_F(HeapSorterTest, test_topn_sorter1) { EXPECT_TRUE(st.ok()); } - EXPECT_EQ(sorter->_heap->size(), 6); + EXPECT_EQ(sorter->_queue_row_num, 6); { Block block = ColumnHelper::create_block<DataTypeInt64>({6}, {6}); auto st = sorter->append_block(&block); EXPECT_TRUE(st.ok()); - } - EXPECT_EQ(sorter->_heap->size(), 6); + EXPECT_EQ(sorter->_queue_row_num, 6); - static_cast<void>(sorter->get_top_value()); + auto value = sorter->get_top_value(); + Field real; + block.get_by_position(0).column->get(0, real); + EXPECT_EQ(value, real); + } EXPECT_TRUE(sorter->prepare_for_read()); { Block block; - bool eos; + bool eos = false; EXPECT_TRUE(sorter->get_next(&_state, &block, &eos)); - std::cout << block.dump_data() << std::endl; - EXPECT_EQ(block.rows(), 6); - + EXPECT_EQ(block.rows(), 5); + EXPECT_EQ(eos, false); EXPECT_TRUE(ColumnHelper::block_equal( block, Block {ColumnHelper::create_nullable_column_with_name<DataTypeInt64>( - {1, 2, 3, 4, 5, 6}, {false, false, false, false, false, false}), - ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6})})); + {1, 2, 3, 4, 5}, {false, false, false, false, false}), + ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5})})); + + block.clear(); + EXPECT_TRUE(sorter->get_next(&_state, &block, &eos)); + EXPECT_EQ(block.rows(), 1); + EXPECT_EQ(eos, false); + EXPECT_TRUE(ColumnHelper::block_equal( + block, + Block {ColumnHelper::create_nullable_column_with_name<DataTypeInt64>({6}, {false}), + ColumnHelper::create_column_with_name<DataTypeInt64>({6})})); + + block.clear(); + EXPECT_TRUE(sorter->get_next(&_state, &block, &eos)); + EXPECT_EQ(block.rows(), 0); + EXPECT_EQ(eos, true); } } diff --git a/be/test/vec/exec/sort/merge_sorter_state.cpp b/be/test/vec/exec/sort/merge_sorter_state.cpp index 34223fd1f3a..0bc4da12d9c 100644 --- a/be/test/vec/exec/sort/merge_sorter_state.cpp +++ b/be/test/vec/exec/sort/merge_sorter_state.cpp @@ -62,7 +62,7 @@ std::shared_ptr<Block> create_block(std::vector<int64_t> data) { } TEST_F(MergeSorterStateTest, test1) { - state.reset(new MergeSorterState(*row_desc, 0, -1, &_state, &_profile)); + state.reset(new MergeSorterState(*row_desc, 0)); state->add_sorted_block(create_block({1, 2, 3})); state->add_sorted_block(create_block({4, 5, 6})); state->add_sorted_block(create_block({})); diff --git a/regression-test/data/variant_p0/test_sub_path_pruning.out b/regression-test/data/variant_p0/test_sub_path_pruning.out index 16328739167..f8f9e3f3894 100644 Binary files a/regression-test/data/variant_p0/test_sub_path_pruning.out and b/regression-test/data/variant_p0/test_sub_path_pruning.out differ diff --git a/regression-test/data/variant_p0/topn_opt_read_by_rowids.out b/regression-test/data/variant_p0/topn_opt_read_by_rowids.out index 6ee3844f7ef..93ae6f7c5a3 100644 Binary files a/regression-test/data/variant_p0/topn_opt_read_by_rowids.out and b/regression-test/data/variant_p0/topn_opt_read_by_rowids.out differ diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy index 2ca4ab06683..1de622047a4 100644 --- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy +++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy @@ -161,20 +161,20 @@ suite("variant_sub_path_pruning", "variant_type"){ order_qt_sql """select c1['c']['d'] from (select dt['a']['b'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test union all select dt as c1 from pruning_test) v1;""" // one table + one const list - order_qt_sql """select id, cast(c1['a'] as text) from (select cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100;""" - order_qt_sql """select c1['a'] from (select id, c1 from (select cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select cast('1' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - order_qt_sql """select id, cast(c1['c'] as text) from (select cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by 1, 2 limit 100;""" - order_qt_sql """select c1['c'] from (select id, c1 from (select cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" + order_qt_sql """select id, cast(c1['a'] as text) from (select cast('{"a":1}' as variant) as c1, 0 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100;""" + order_qt_sql """select c1['a'] from (select id, c1 from (select cast('{"a":1}' as variant) as c1, 0 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp order by id;""" + order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp order by id;""" + // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select cast('1' as variant) as c1, 0 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" + order_qt_sql """select id, cast(c1['c'] as text) from (select cast('{"c":1}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by 1, 2 limit 100;""" + order_qt_sql """select c1['c'] from (select id, c1 from (select cast('{"c":1}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp order by id;""" + order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp order by id;""" + // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" // two const list - order_qt_sql """select id, cast(c1['a'] as text) from (select cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as variant) as c1, 2 as id) tmp order by id limit 100;""" - order_qt_sql """select c1['a'] from (select id, c1 from (select cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp;""" - order_qt_sql """select cast(c2['b'] as text) from (select id, c1['a'] as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp;""" - order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp;""" + order_qt_sql """select id, cast(c1['a'] as text) from (select cast('{"a":1}' as variant) as c1, 0 as id union all select cast('{"a":1}' as variant) as c1, 2 as id) tmp order by id limit 100;""" + order_qt_sql """select c1['a'] from (select id, c1 from (select cast('{"a":1}' as variant) as c1, 0 as id union all select cast('{"a":1}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp order by id;""" + order_qt_sql """select cast(c2['b'] as text) from (select id, c1['a'] as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp order by id;""" + order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp order by id;""" // join @@ -212,8 +212,8 @@ suite("variant_sub_path_pruning", "variant_type"){ order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select id, cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 from pruning_test order by id limit 100) tmp;""" // varaint in one row relation - order_qt_sql """select c1['a'] from (select 1 as id, cast('{"a":1}' as variant) as c1 order by id limit 100) tmp;""" - order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 1 as id, cast('{"a":1, "b":2}' as variant) as c1 order by id limit 100) tmp;""" - order_qt_sql """select c1['a'] from (select 1 as id, cast('{"b":{"a":1}}' as variant)["b"] as c1 order by id limit 100) tmp;""" - order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 1 as id, cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 order by id limit 100) tmp;""" + order_qt_sql """select c1['a'] from (select 0 as id, cast('{"a":1}' as variant) as c1 order by id limit 100) tmp;""" + order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 0 as id, cast('{"a":1, "b":2}' as variant) as c1 order by id limit 100) tmp;""" + order_qt_sql """select c1['a'] from (select 0 as id, cast('{"b":{"a":1}}' as variant)["b"] as c1 order by id limit 100) tmp;""" + order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 0 as id, cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 order by id limit 100) tmp;""" } \ No newline at end of file diff --git a/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy b/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy index dd597755a6d..15594775c40 100644 --- a/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy +++ b/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy @@ -77,14 +77,14 @@ PROPERTIES ( INSERT INTO `test_web_log` VALUES ('2024-04-09 09:01:39', '', '', '', '', '1712624474952', '0004.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '', NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '', 'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826', 'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIm [...] """ sql "set topn_opt_limit_threshold = 1024" - qt_sql """SELECT + order_qt_sql """SELECT * FROM test_web_log WHERE ts >= '1712480940849' AND ts <= '1712805483291' ORDER BY - ts DESC + ts DESC LIMIT 10""" sql """ INSERT INTO `test_web_log` VALUES ('2024-04-09 09:02:31', '', '', '', '', '1712624495211', '004.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '', NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '', 'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826', 'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIml [...] @@ -95,10 +95,10 @@ PROPERTIES ( sql """ INSERT INTO `test_web_log` VALUES ('2024-04-09 09:04:33', '', '', '', '', '1712624474959', '0024.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '', NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '', 'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826', 'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIm [...] """ - qt_sql """SELECT + order_qt_sql """SELECT * FROM test_web_log ORDER BY - ts DESC + ts DESC LIMIT 10""" } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org