This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e4ee0e83798 [fix](topn) Fix wrong rows returned by TOPN sorter (#40243) e4ee0e83798 is described below commit e4ee0e83798c9dfd7db4d65ebe19e05631054450 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Sep 2 14:34:53 2024 +0800 [fix](topn) Fix wrong rows returned by TOPN sorter (#40243) ## Proposed changes pick #40241 <!--Describe your changes.--> --- be/src/vec/common/sort/partition_sorter.cpp | 42 ++++---- be/src/vec/common/sort/partition_sorter.h | 4 +- be/src/vec/common/sort/sorter.cpp | 75 +++++++------- be/src/vec/common/sort/sorter.h | 12 +-- be/src/vec/common/sort/topn_sorter.cpp | 17 ++-- be/src/vec/core/sort_cursor.h | 68 +++++-------- be/src/vec/runtime/vsorted_run_merger.cpp | 34 ++----- be/src/vec/runtime/vsorted_run_merger.h | 13 +-- .../data/query_p0/operator/test_sort_operator.out | 12 +++ .../query_p0/operator/test_sort_operator.groovy | 112 +++++++++++++++++++++ 10 files changed, 239 insertions(+), 150 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 1ea7c6de6a8..c363a41d1c7 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) { Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); DCHECK(input_block->columns() == sorted_block.columns()); RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); + _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); return Status::OK(); } Status PartitionSorter::prepare_for_read() { - auto& cursors = _state->get_cursors(); auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - cursors.emplace_back(block, _sort_description); - } - for (auto& cursor : cursors) { - priority_queue.push(MergeSortCursor(&cursor)); + priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); } + blocks.clear(); return Status::OK(); } @@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { } Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->get_sorted_block().empty()) { + if (_state->get_priority_queue().empty()) { + *eos = true; + } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) { + block->swap(*_state->get_priority_queue().top().impl->block); + block->set_num_rows(_partition_inner_limit); *eos = true; } else { - if (_state->get_sorted_block().size() == 1 && _has_global_limit) { - auto& sorted_block = _state->get_sorted_block()[0]; - block->swap(sorted_block); - block->set_num_rows(_partition_inner_limit); - *eos = true; - } else { - RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); - } + RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); } return Status::OK(); } Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { - const auto& sorted_block = _state->get_sorted_block()[0]; - size_t num_columns = sorted_block.columns(); + auto& priority_queue = _state->get_priority_queue(); + if (priority_queue.empty()) { + *eos = true; + return Status::OK(); + } + const auto& sorted_block = priority_queue.top().impl->block; + size_t num_columns = sorted_block->columns(); MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block); + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block); MutableColumns& merged_columns = m_block.mutable_columns(); size_t current_output_rows = 0; - auto& priority_queue = _state->get_priority_queue(); bool get_enough_data = false; while (!priority_queue.empty()) { @@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int //1 row_number no need to check distinct, just output partition_inner_limit row if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } } else { //rows has get enough @@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } break; } @@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int *_previous_row = current; } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } current_output_rows++; break; diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index 77dcb683711..01e009d200d 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -50,7 +50,7 @@ public: SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {} void reset() { - impl = nullptr; + impl->reset(); row = 0; } bool compare_two_rows(const MergeSortCursor& rhs) const { @@ -67,7 +67,7 @@ public: return true; } int row = 0; - MergeSortCursorImpl* impl = nullptr; + std::shared_ptr<MergeSortCursorImpl> impl = nullptr; }; class PartitionSorter final : public Sorter { diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index eca7e15626b..72bf35f3cba 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -59,48 +59,46 @@ namespace doris::vectorized { void MergeSorterState::reset() { auto empty_queue = std::priority_queue<MergeSortCursor>(); priority_queue_.swap(empty_queue); - std::vector<MergeSortCursorImpl> empty_cursors(0); - cursors_.swap(empty_cursors); - std::vector<Block> empty_blocks(0); + std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0); + std::vector<std::shared_ptr<Block>> empty_blocks(0); sorted_blocks_.swap(empty_blocks); unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); in_mem_sorted_bocks_size_ = 0; } -Status MergeSorterState::add_sorted_block(Block& block) { - auto rows = block.rows(); +void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) { + auto rows = block->rows(); if (0 == rows) { - return Status::OK(); + return; } - in_mem_sorted_bocks_size_ += block.bytes(); - sorted_blocks_.emplace_back(std::move(block)); + in_mem_sorted_bocks_size_ += block->bytes(); + sorted_blocks_.emplace_back(block); num_rows_ += rows; - return Status::OK(); } Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { for (auto& block : sorted_blocks_) { - cursors_.emplace_back(block, sort_description); - } - - if (sorted_blocks_.size() > 1) { - for (auto& cursor : cursors_) { - priority_queue_.emplace(&cursor); - } + priority_queue_.emplace( + MergeSortCursorImpl::create_shared(std::move(block), sort_description)); } + sorted_blocks_.clear(); return Status::OK(); } Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { - if (sorted_blocks_.empty()) { + DCHECK(sorted_blocks_.empty()); + DCHECK(unsorted_block_->empty()); + if (priority_queue_.empty()) { *eos = true; - } else if (sorted_blocks_.size() == 1) { - if (offset_ != 0) { - sorted_blocks_[0].skip_num_rows(offset_); + } else if (priority_queue_.size() == 1) { + if (offset_ != 0 || priority_queue_.top()->pos != 0) { + // Skip rows already returned or need to be ignored + int64_t offset = offset_ + (int64_t)priority_queue_.top()->pos; + priority_queue_.top().impl->block->skip_num_rows(offset); } - block->swap(sorted_blocks_[0]); + block->swap(*priority_queue_.top().impl->block); *eos = true; } else { RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); @@ -110,9 +108,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - size_t num_columns = sorted_blocks_[0].columns(); + if (priority_queue_.empty()) { + *eos = true; + return Status::OK(); + } + size_t num_columns = priority_queue_.top().impl->block->columns(); - MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( + block, *priority_queue_.top().impl->block); MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. @@ -123,7 +126,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized if (offset_ == 0) { for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); ++merged_rows; } else { offset_--; @@ -134,7 +137,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized priority_queue_.push(current); } - if (merged_rows == batch_size) break; + if (merged_rows == batch_size) { + break; + } } block->set_columns(std::move(merged_columns)); @@ -261,22 +266,22 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - static_cast<void>(_state->add_sorted_block(desc_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(Block::create_shared(std::move(desc_block))); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } else { - auto tmp_cursor_impl = - std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); + auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( + Block::create_shared(std::move(desc_block)), _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - static_cast<void>(_state->add_sorted_block(desc_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(tmp_cursor_impl->block); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } } } else { // dispose normal sort logic - static_cast<void>(_state->add_sorted_block(desc_block)); + _state->add_sorted_block(Block::create_shared(std::move(desc_block))); } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 2525ca8c0c1..daa871f5d48 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -59,7 +59,7 @@ public: ~MergeSorterState() = default; - Status add_sorted_block(Block& block); + void add_sorted_block(std::shared_ptr<Block> block); Status build_merge_tree(const SortDescription& sort_description); @@ -72,23 +72,19 @@ public: uint64_t num_rows() const { return num_rows_; } - Block& last_sorted_block() { return sorted_blocks_.back(); } + std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); } - std::vector<Block>& get_sorted_block() { return sorted_blocks_; } + std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; } std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; } - std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; } void reset(); std::unique_ptr<Block> unsorted_block_; private: - int _calc_spill_blocks_to_merge() const; - Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); std::priority_queue<MergeSortCursor> priority_queue_; - std::vector<MergeSortCursorImpl> cursors_; - std::vector<Block> sorted_blocks_; + std::vector<std::shared_ptr<Block>> sorted_blocks_; size_t in_mem_sorted_bocks_size_ = 0; uint64_t num_rows_ = 0; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 58c3cd2dd0c..1f24fb14c95 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } else { - auto tmp_cursor_impl = - std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); + auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( + Block::create_shared(std::move(sorted_block)), _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(block_cursor.impl->block); + _block_priority_queue.emplace(tmp_cursor_impl); } } } else { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index e565819c9d6..8b627f50af7 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -120,7 +120,8 @@ private: * It is used in priority queue. */ struct MergeSortCursorImpl { - ColumnRawPtrs all_columns; + ENABLE_FACTORY_CREATOR(MergeSortCursorImpl); + std::shared_ptr<Block> block; ColumnRawPtrs sort_columns; SortDescription desc; size_t sort_columns_size = 0; @@ -130,37 +131,30 @@ struct MergeSortCursorImpl { MergeSortCursorImpl() = default; virtual ~MergeSortCursorImpl() = default; - MergeSortCursorImpl(Block& block, const SortDescription& desc_) - : desc(desc_), sort_columns_size(desc.size()) { - reset(block); + MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription& desc_) + : block(block_), desc(desc_), sort_columns_size(desc.size()) { + reset(); } MergeSortCursorImpl(const SortDescription& desc_) - : desc(desc_), sort_columns_size(desc.size()) {} + : block(Block::create_shared()), desc(desc_), sort_columns_size(desc.size()) {} bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. - void reset(Block& block) { - all_columns.clear(); + void reset() { sort_columns.clear(); - auto columns = block.get_columns_and_convert(); - size_t num_columns = columns.size(); - - for (size_t j = 0; j < num_columns; ++j) { - all_columns.push_back(columns[j].get()); - } - + auto columns = block->get_columns_and_convert(); for (size_t j = 0, size = desc.size(); j < size; ++j) { auto& column_desc = desc[j]; size_t column_number = !column_desc.column_name.empty() - ? block.get_position_by_name(column_desc.column_name) + ? block->get_position_by_name(column_desc.column_name) : column_desc.column_number; sort_columns.push_back(columns[column_number].get()); } pos = 0; - rows = all_columns[0]->size(); + rows = block->rows(); } bool is_first() const { return pos == 0; } @@ -174,11 +168,13 @@ struct MergeSortCursorImpl { using BlockSupplier = std::function<Status(Block*, bool* eos)>; struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { + ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl); BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const VExprContextSPtrs& ordering_expr, const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first) : _ordering_expr(ordering_expr), _block_supplier(block_supplier) { + block = Block::create_shared(); sort_columns_size = ordering_expr.size(); desc.resize(ordering_expr.size()); @@ -195,21 +191,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { } bool has_next_block() override { - _block.clear(); + block->clear(); Status status; do { - status = _block_supplier(&_block, &_is_eof); - } while (_block.empty() && !_is_eof && status.ok()); + status = _block_supplier(block.get(), &_is_eof); + } while (block->empty() && !_is_eof && status.ok()); // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. - if (status.ok() && !_block.empty()) { + if (status.ok() && !block->empty()) { if (_ordering_expr.size() > 0) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future - status = _ordering_expr[i]->execute(&_block, &desc[i].column_number); + status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number); } } - MergeSortCursorImpl::reset(_block); + MergeSortCursorImpl::reset(); return status.ok(); } else if (!status.ok()) { throw std::runtime_error(status.msg()); @@ -221,32 +217,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return nullptr; } - return &_block; - } - - size_t columns_num() const { return all_columns.size(); } - - Block create_empty_blocks() const { - size_t num_columns = columns_num(); - MutableColumns columns(num_columns); - for (size_t i = 0; i < num_columns; ++i) { - columns[i] = all_columns[i]->clone_empty(); - } - return _block.clone_with_columns(std::move(columns)); + return block.get(); } VExprContextSPtrs _ordering_expr; - Block _block; BlockSupplier _block_supplier {}; bool _is_eof = false; }; /// For easy copying. struct MergeSortCursor { - MergeSortCursorImpl* impl; + ENABLE_FACTORY_CREATOR(MergeSortCursor); + std::shared_ptr<MergeSortCursorImpl> impl; - MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl; } + MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { @@ -286,10 +271,11 @@ struct MergeSortCursor { /// For easy copying. struct MergeSortBlockCursor { - MergeSortCursorImpl* impl = nullptr; + ENABLE_FACTORY_CREATOR(MergeSortBlockCursor); + std::shared_ptr<MergeSortCursorImpl> impl = nullptr; - MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl; } + MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const { diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 3b17f957deb..f321622012f 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -28,14 +28,6 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/utils/util.hpp" -namespace doris { -namespace vectorized { -class VExprContext; -} // namespace vectorized -} // namespace doris - -using std::vector; - namespace doris::vectorized { VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr, @@ -68,13 +60,14 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) { _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock"); } -Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) { +Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs) { try { for (const auto& supplier : input_runs) { if (_use_sort_desc) { - _cursors.emplace_back(supplier, _desc); + _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, _desc)); } else { - _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first); + _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared( + supplier, _ordering_expr, _is_asc_order, _nulls_first)); } } } catch (const std::exception& e) { @@ -82,15 +75,8 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) { } for (auto& _cursor : _cursors) { - if (!_cursor._is_eof) { - _priority_queue.push(MergeSortCursor(&_cursor)); - } - } - - for (const auto& cursor : _cursors) { - if (!cursor._is_eof) { - _empty_block = cursor.create_empty_blocks(); - break; + if (!_cursor->_is_eof) { + _priority_queue.push(MergeSortCursor(_cursor)); } } @@ -145,7 +131,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } else { if (current->block_ptr() != nullptr) { - for (int i = 0; i < current->all_columns.size(); i++) { + for (int i = 0; i < current->block->columns(); i++) { auto& column_with_type = current->block_ptr()->get_by_position(i); column_with_type.column = column_with_type.column->cut( current->pos, current->rows - current->pos); @@ -162,9 +148,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } } else { - size_t num_columns = _empty_block.columns(); - MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block); + size_t num_columns = _priority_queue.top().impl->block->columns(); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( + output_block, *_priority_queue.top().impl->block); MutableColumns& merged_columns = m_block.mutable_columns(); if (num_columns != merged_columns.size()) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 943956d8c38..844704fd130 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -30,9 +30,7 @@ #include "vec/core/sort_description.h" #include "vec/exprs/vexpr_fwd.h" -namespace doris { - -namespace vectorized { +namespace doris::vectorized { // VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted // sequence of blocks, which are fetched from a BlockSupplier function object. @@ -78,14 +76,12 @@ protected: bool _pipeline_engine_enabled = false; - std::vector<BlockSupplierSortCursorImpl> _cursors; + std::vector<std::shared_ptr<BlockSupplierSortCursorImpl>> _cursors; std::priority_queue<MergeSortCursor> _priority_queue; /// In pipeline engine, if a cursor needs to read one more block from supplier, /// we make it as a pending cursor until the supplier is readable. - MergeSortCursorImpl* _pending_cursor = nullptr; - - Block _empty_block; + std::shared_ptr<MergeSortCursorImpl> _pending_cursor = nullptr; // Times calls to get_next(). RuntimeProfile::Counter* _get_next_timer = nullptr; @@ -105,5 +101,4 @@ private: bool has_next_block(MergeSortCursor& current); }; -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/regression-test/data/query_p0/operator/test_sort_operator.out b/regression-test/data/query_p0/operator/test_sort_operator.out new file mode 100644 index 00000000000..b3bd14633d2 --- /dev/null +++ b/regression-test/data/query_p0/operator/test_sort_operator.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +100174 \N +100271 \N +100271 \N +100271 \N +100471 \N +100471 \N +100471 \N +100567 \N +100567 \N + diff --git a/regression-test/suites/query_p0/operator/test_sort_operator.groovy b/regression-test/suites/query_p0/operator/test_sort_operator.groovy new file mode 100644 index 00000000000..ae58b45df69 --- /dev/null +++ b/regression-test/suites/query_p0/operator/test_sort_operator.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_sort_operator", "query,p0,arrow_flight_sql") { + + sql """ + DROP TABLE IF EXISTS dim_org_ful; + """ + + sql """ + CREATE TABLE `dim_org_ful` ( + `org_id` int(11) NOT NULL COMMENT '', + `start_dt` date NOT NULL COMMENT '', + `end_dt` date REPLACE_IF_NOT_NULL NULL COMMENT '' + ) ENGINE=OLAP + AGGREGATE KEY(`org_id`, `start_dt`) + COMMENT '网点' + DISTRIBUTED BY HASH(`org_id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "is_being_synced" = "false", + "storage_format" = "V2", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + ); + """ + + sql """ + DROP TABLE IF EXISTS dim_day; + """ + + sql """ + CREATE TABLE `dim_day` ( + `day_key` varchar(80) NULL, + `day_date` date NULL + ) ENGINE=OLAP + DUPLICATE KEY(`day_key`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`day_key`, `day_date`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "is_being_synced" = "false", + "storage_format" = "V2", + "light_schema_change" = "true", + "disable_auto_compaction" = "false" + ); + """ + + sql """ + INSERT INTO `dim_day` VALUES + ('20231006','2023-10-06'), + ('20231010','2023-10-10'), + ('20230822','2023-08-22'), + ('20230829','2023-08-29'), + ('20230925','2023-09-25'), + ('20230731','2023-07-31'), + ('20230928','2023-09-28'), + ('20230727','2023-07-27'), + ('20230801','2023-08-01'), + ('20231017','2023-10-17'); + """ + + sql """INSERT INTO `dim_org_ful` VALUES + (20,'2023-08-02','3000-12-31'), + (100174,'2023-07-31','2023-08-01'), + (100174,'2023-08-01','3000-12-31'), + (100271,'2023-07-26','3000-12-31'), + (100424,'2023-08-02','3000-12-31'), + (100471,'2023-07-26','3000-12-31'), + (100567,'2023-07-29','2023-07-30'), + (100567,'2023-07-30','2023-07-31'), + (100567,'2023-07-31','3000-12-31'), + (100723,'2023-07-30','2023-07-31');""" + + sql """ + set batch_size = 9; + """ + sql """ + set parallel_pipeline_task_num=1; + """ + + order_qt_select """ + with `dim_org` AS( + SELECT + `t0`.`day_date` AS `ds`, + `org_id` AS `org_id` + FROM + `dim_day` t0 + INNER JOIN `dim_org_ful` t1 ON `t0`.`day_date` BETWEEN `t1`.`start_dt` + AND `t1`.`end_dt` - 1.0 + WHERE + `t0`.`day_date` BETWEEN '2021-01-01 00:00:00' + AND '2023-08-07' + ) + select org_id,null from dim_org order by 1,2 limit 1,10 + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org