This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e4ee0e83798 [fix](topn) Fix wrong rows returned by TOPN sorter (#40243)
e4ee0e83798 is described below

commit e4ee0e83798c9dfd7db4d65ebe19e05631054450
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Sep 2 14:34:53 2024 +0800

    [fix](topn) Fix wrong rows returned by TOPN sorter (#40243)
    
    ## Proposed changes
    
    pick #40241
    
    <!--Describe your changes.-->
---
 be/src/vec/common/sort/partition_sorter.cpp        |  42 ++++----
 be/src/vec/common/sort/partition_sorter.h          |   4 +-
 be/src/vec/common/sort/sorter.cpp                  |  75 +++++++-------
 be/src/vec/common/sort/sorter.h                    |  12 +--
 be/src/vec/common/sort/topn_sorter.cpp             |  17 ++--
 be/src/vec/core/sort_cursor.h                      |  68 +++++--------
 be/src/vec/runtime/vsorted_run_merger.cpp          |  34 ++-----
 be/src/vec/runtime/vsorted_run_merger.h            |  13 +--
 .../data/query_p0/operator/test_sort_operator.out  |  12 +++
 .../query_p0/operator/test_sort_operator.groovy    | 112 +++++++++++++++++++++
 10 files changed, 239 insertions(+), 150 deletions(-)

diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index 1ea7c6de6a8..c363a41d1c7 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) {
     Block sorted_block = 
VectorizedUtils::create_empty_columnswithtypename(_row_desc);
     DCHECK(input_block->columns() == sorted_block.columns());
     RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
-    RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+    _state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
     return Status::OK();
 }
 
 Status PartitionSorter::prepare_for_read() {
-    auto& cursors = _state->get_cursors();
     auto& blocks = _state->get_sorted_block();
     auto& priority_queue = _state->get_priority_queue();
     for (auto& block : blocks) {
-        cursors.emplace_back(block, _sort_description);
-    }
-    for (auto& cursor : cursors) {
-        priority_queue.push(MergeSortCursor(&cursor));
+        priority_queue.push(MergeSortCursorImpl::create_shared(block, 
_sort_description));
     }
+    blocks.clear();
     return Status::OK();
 }
 
@@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState* 
runtime_state) {
 }
 
 Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) 
{
-    if (_state->get_sorted_block().empty()) {
+    if (_state->get_priority_queue().empty()) {
+        *eos = true;
+    } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
+        block->swap(*_state->get_priority_queue().top().impl->block);
+        block->set_num_rows(_partition_inner_limit);
         *eos = true;
     } else {
-        if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
-            auto& sorted_block = _state->get_sorted_block()[0];
-            block->swap(sorted_block);
-            block->set_num_rows(_partition_inner_limit);
-            *eos = true;
-        } else {
-            RETURN_IF_ERROR(partition_sort_read(block, eos, 
state->batch_size()));
-        }
+        RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
     }
     return Status::OK();
 }
 
 Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, 
int batch_size) {
-    const auto& sorted_block = _state->get_sorted_block()[0];
-    size_t num_columns = sorted_block.columns();
+    auto& priority_queue = _state->get_priority_queue();
+    if (priority_queue.empty()) {
+        *eos = true;
+        return Status::OK();
+    }
+    const auto& sorted_block = priority_queue.top().impl->block;
+    size_t num_columns = sorted_block->columns();
     MutableBlock m_block =
-            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
sorted_block);
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*sorted_block);
     MutableColumns& merged_columns = m_block.mutable_columns();
     size_t current_output_rows = 0;
-    auto& priority_queue = _state->get_priority_queue();
 
     bool get_enough_data = false;
     while (!priority_queue.empty()) {
@@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
             //1 row_number no need to check distinct, just output 
partition_inner_limit row
             if ((current_output_rows + _output_total_rows) < 
_partition_inner_limit) {
                 for (size_t i = 0; i < num_columns; ++i) {
-                    merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+                    
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
                 }
             } else {
                 //rows has get enough
@@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
                 }
             }
             for (size_t i = 0; i < num_columns; ++i) {
-                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
             }
             break;
         }
@@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
                 *_previous_row = current;
             }
             for (size_t i = 0; i < num_columns; ++i) {
-                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
             }
             current_output_rows++;
             break;
diff --git a/be/src/vec/common/sort/partition_sorter.h 
b/be/src/vec/common/sort/partition_sorter.h
index 77dcb683711..01e009d200d 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -50,7 +50,7 @@ public:
     SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), 
impl(cursor.impl) {}
 
     void reset() {
-        impl = nullptr;
+        impl->reset();
         row = 0;
     }
     bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ public:
         return true;
     }
     int row = 0;
-    MergeSortCursorImpl* impl = nullptr;
+    std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
 };
 
 class PartitionSorter final : public Sorter {
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index eca7e15626b..72bf35f3cba 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -59,48 +59,46 @@ namespace doris::vectorized {
 void MergeSorterState::reset() {
     auto empty_queue = std::priority_queue<MergeSortCursor>();
     priority_queue_.swap(empty_queue);
-    std::vector<MergeSortCursorImpl> empty_cursors(0);
-    cursors_.swap(empty_cursors);
-    std::vector<Block> empty_blocks(0);
+    std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
+    std::vector<std::shared_ptr<Block>> empty_blocks(0);
     sorted_blocks_.swap(empty_blocks);
     unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
     in_mem_sorted_bocks_size_ = 0;
 }
 
-Status MergeSorterState::add_sorted_block(Block& block) {
-    auto rows = block.rows();
+void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
+    auto rows = block->rows();
     if (0 == rows) {
-        return Status::OK();
+        return;
     }
-    in_mem_sorted_bocks_size_ += block.bytes();
-    sorted_blocks_.emplace_back(std::move(block));
+    in_mem_sorted_bocks_size_ += block->bytes();
+    sorted_blocks_.emplace_back(block);
     num_rows_ += rows;
-    return Status::OK();
 }
 
 Status MergeSorterState::build_merge_tree(const SortDescription& 
sort_description) {
     for (auto& block : sorted_blocks_) {
-        cursors_.emplace_back(block, sort_description);
-    }
-
-    if (sorted_blocks_.size() > 1) {
-        for (auto& cursor : cursors_) {
-            priority_queue_.emplace(&cursor);
-        }
+        priority_queue_.emplace(
+                MergeSortCursorImpl::create_shared(std::move(block), 
sort_description));
     }
 
+    sorted_blocks_.clear();
     return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int 
batch_size,
                                          bool* eos) {
-    if (sorted_blocks_.empty()) {
+    DCHECK(sorted_blocks_.empty());
+    DCHECK(unsorted_block_->empty());
+    if (priority_queue_.empty()) {
         *eos = true;
-    } else if (sorted_blocks_.size() == 1) {
-        if (offset_ != 0) {
-            sorted_blocks_[0].skip_num_rows(offset_);
+    } else if (priority_queue_.size() == 1) {
+        if (offset_ != 0 || priority_queue_.top()->pos != 0) {
+            // Skip rows already returned or need to be ignored
+            int64_t offset = offset_ + (int64_t)priority_queue_.top()->pos;
+            priority_queue_.top().impl->block->skip_num_rows(offset);
         }
-        block->swap(sorted_blocks_[0]);
+        block->swap(*priority_queue_.top().impl->block);
         *eos = true;
     } else {
         RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -110,9 +108,14 @@ Status 
MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
 
 Status MergeSorterState::_merge_sort_read_impl(int batch_size, 
doris::vectorized::Block* block,
                                                bool* eos) {
-    size_t num_columns = sorted_blocks_[0].columns();
+    if (priority_queue_.empty()) {
+        *eos = true;
+        return Status::OK();
+    }
+    size_t num_columns = priority_queue_.top().impl->block->columns();
 
-    MutableBlock m_block = 
VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
+    MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
+            block, *priority_queue_.top().impl->block);
     MutableColumns& merged_columns = m_block.mutable_columns();
 
     /// Take rows from queue in right order and push to 'merged'.
@@ -123,7 +126,7 @@ Status MergeSorterState::_merge_sort_read_impl(int 
batch_size, doris::vectorized
 
         if (offset_ == 0) {
             for (size_t i = 0; i < num_columns; ++i)
-                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
             ++merged_rows;
         } else {
             offset_--;
@@ -134,7 +137,9 @@ Status MergeSorterState::_merge_sort_read_impl(int 
batch_size, doris::vectorized
             priority_queue_.push(current);
         }
 
-        if (merged_rows == batch_size) break;
+        if (merged_rows == batch_size) {
+            break;
+        }
     }
     block->set_columns(std::move(merged_columns));
 
@@ -261,22 +266,22 @@ Status FullSorter::_do_sort() {
         // if one block totally greater the heap top of _block_priority_queue
         // we can throw the block data directly.
         if (_state->num_rows() < _offset + _limit) {
-            static_cast<void>(_state->add_sorted_block(desc_block));
-            _block_priority_queue.emplace(_pool->add(
-                    new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
+            
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
+            _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
+                    _state->last_sorted_block(), _sort_description));
         } else {
-            auto tmp_cursor_impl =
-                    std::make_unique<MergeSortCursorImpl>(desc_block, 
_sort_description);
-            MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
+            auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
+                    Block::create_shared(std::move(desc_block)), 
_sort_description);
+            MergeSortBlockCursor block_cursor(tmp_cursor_impl);
             if (!block_cursor.totally_greater(_block_priority_queue.top())) {
-                static_cast<void>(_state->add_sorted_block(desc_block));
-                _block_priority_queue.emplace(_pool->add(
-                        new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
+                _state->add_sorted_block(tmp_cursor_impl->block);
+                
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
+                        _state->last_sorted_block(), _sort_description));
             }
         }
     } else {
         // dispose normal sort logic
-        static_cast<void>(_state->add_sorted_block(desc_block));
+        _state->add_sorted_block(Block::create_shared(std::move(desc_block)));
     }
     return Status::OK();
 }
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 2525ca8c0c1..daa871f5d48 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -59,7 +59,7 @@ public:
 
     ~MergeSorterState() = default;
 
-    Status add_sorted_block(Block& block);
+    void add_sorted_block(std::shared_ptr<Block> block);
 
     Status build_merge_tree(const SortDescription& sort_description);
 
@@ -72,23 +72,19 @@ public:
 
     uint64_t num_rows() const { return num_rows_; }
 
-    Block& last_sorted_block() { return sorted_blocks_.back(); }
+    std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); 
}
 
-    std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
+    std::vector<std::shared_ptr<Block>>& get_sorted_block() { return 
sorted_blocks_; }
     std::priority_queue<MergeSortCursor>& get_priority_queue() { return 
priority_queue_; }
-    std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
     void reset();
 
     std::unique_ptr<Block> unsorted_block_;
 
 private:
-    int _calc_spill_blocks_to_merge() const;
-
     Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* 
block, bool* eos);
 
     std::priority_queue<MergeSortCursor> priority_queue_;
-    std::vector<MergeSortCursorImpl> cursors_;
-    std::vector<Block> sorted_blocks_;
+    std::vector<std::shared_ptr<Block>> sorted_blocks_;
     size_t in_mem_sorted_bocks_size_ = 0;
     uint64_t num_rows_ = 0;
 
diff --git a/be/src/vec/common/sort/topn_sorter.cpp 
b/be/src/vec/common/sort/topn_sorter.cpp
index 58c3cd2dd0c..1f24fb14c95 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) {
         // if one block totally greater the heap top of _block_priority_queue
         // we can throw the block data directly.
         if (_state->num_rows() < _offset + _limit) {
-            RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
-            _block_priority_queue.emplace(_pool->add(
-                    new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
+            
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
+            _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
+                    _state->last_sorted_block(), _sort_description));
         } else {
-            auto tmp_cursor_impl =
-                    std::make_unique<MergeSortCursorImpl>(sorted_block, 
_sort_description);
-            MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
+            auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
+                    Block::create_shared(std::move(sorted_block)), 
_sort_description);
+            MergeSortBlockCursor block_cursor(tmp_cursor_impl);
             if (!block_cursor.totally_greater(_block_priority_queue.top())) {
-                RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
-                _block_priority_queue.emplace(_pool->add(
-                        new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
+                _state->add_sorted_block(block_cursor.impl->block);
+                _block_priority_queue.emplace(tmp_cursor_impl);
             }
         }
     } else {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index e565819c9d6..8b627f50af7 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -120,7 +120,8 @@ private:
   * It is used in priority queue.
   */
 struct MergeSortCursorImpl {
-    ColumnRawPtrs all_columns;
+    ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
+    std::shared_ptr<Block> block;
     ColumnRawPtrs sort_columns;
     SortDescription desc;
     size_t sort_columns_size = 0;
@@ -130,37 +131,30 @@ struct MergeSortCursorImpl {
     MergeSortCursorImpl() = default;
     virtual ~MergeSortCursorImpl() = default;
 
-    MergeSortCursorImpl(Block& block, const SortDescription& desc_)
-            : desc(desc_), sort_columns_size(desc.size()) {
-        reset(block);
+    MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription& 
desc_)
+            : block(block_), desc(desc_), sort_columns_size(desc.size()) {
+        reset();
     }
 
     MergeSortCursorImpl(const SortDescription& desc_)
-            : desc(desc_), sort_columns_size(desc.size()) {}
+            : block(Block::create_shared()), desc(desc_), 
sort_columns_size(desc.size()) {}
     bool empty() const { return rows == 0; }
 
     /// Set the cursor to the beginning of the new block.
-    void reset(Block& block) {
-        all_columns.clear();
+    void reset() {
         sort_columns.clear();
 
-        auto columns = block.get_columns_and_convert();
-        size_t num_columns = columns.size();
-
-        for (size_t j = 0; j < num_columns; ++j) {
-            all_columns.push_back(columns[j].get());
-        }
-
+        auto columns = block->get_columns_and_convert();
         for (size_t j = 0, size = desc.size(); j < size; ++j) {
             auto& column_desc = desc[j];
             size_t column_number = !column_desc.column_name.empty()
-                                           ? 
block.get_position_by_name(column_desc.column_name)
+                                           ? 
block->get_position_by_name(column_desc.column_name)
                                            : column_desc.column_number;
             sort_columns.push_back(columns[column_number].get());
         }
 
         pos = 0;
-        rows = all_columns[0]->size();
+        rows = block->rows();
     }
 
     bool is_first() const { return pos == 0; }
@@ -174,11 +168,13 @@ struct MergeSortCursorImpl {
 using BlockSupplier = std::function<Status(Block*, bool* eos)>;
 
 struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
+    ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
     BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
                                 const VExprContextSPtrs& ordering_expr,
                                 const std::vector<bool>& is_asc_order,
                                 const std::vector<bool>& nulls_first)
             : _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
+        block = Block::create_shared();
         sort_columns_size = ordering_expr.size();
 
         desc.resize(ordering_expr.size());
@@ -195,21 +191,21 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
     }
 
     bool has_next_block() override {
-        _block.clear();
+        block->clear();
         Status status;
         do {
-            status = _block_supplier(&_block, &_is_eof);
-        } while (_block.empty() && !_is_eof && status.ok());
+            status = _block_supplier(block.get(), &_is_eof);
+        } while (block->empty() && !_is_eof && status.ok());
         // If status not ok, upper callers could not detect whether it is eof 
or error.
         // So that fatal here, and should throw exception in the future.
-        if (status.ok() && !_block.empty()) {
+        if (status.ok() && !block->empty()) {
             if (_ordering_expr.size() > 0) {
                 for (int i = 0; status.ok() && i < desc.size(); ++i) {
                     // TODO yiguolei: throw exception if status not ok in the 
future
-                    status = _ordering_expr[i]->execute(&_block, 
&desc[i].column_number);
+                    status = _ordering_expr[i]->execute(block.get(), 
&desc[i].column_number);
                 }
             }
-            MergeSortCursorImpl::reset(_block);
+            MergeSortCursorImpl::reset();
             return status.ok();
         } else if (!status.ok()) {
             throw std::runtime_error(status.msg());
@@ -221,32 +217,21 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
         if (_is_eof) {
             return nullptr;
         }
-        return &_block;
-    }
-
-    size_t columns_num() const { return all_columns.size(); }
-
-    Block create_empty_blocks() const {
-        size_t num_columns = columns_num();
-        MutableColumns columns(num_columns);
-        for (size_t i = 0; i < num_columns; ++i) {
-            columns[i] = all_columns[i]->clone_empty();
-        }
-        return _block.clone_with_columns(std::move(columns));
+        return block.get();
     }
 
     VExprContextSPtrs _ordering_expr;
-    Block _block;
     BlockSupplier _block_supplier {};
     bool _is_eof = false;
 };
 
 /// For easy copying.
 struct MergeSortCursor {
-    MergeSortCursorImpl* impl;
+    ENABLE_FACTORY_CREATOR(MergeSortCursor);
+    std::shared_ptr<MergeSortCursorImpl> impl;
 
-    MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
-    MergeSortCursorImpl* operator->() const { return impl; }
+    MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) 
{}
+    MergeSortCursorImpl* operator->() const { return impl.get(); }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
     int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t 
rhs_pos) const {
@@ -286,10 +271,11 @@ struct MergeSortCursor {
 
 /// For easy copying.
 struct MergeSortBlockCursor {
-    MergeSortCursorImpl* impl = nullptr;
+    ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
+    std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
 
-    MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
-    MergeSortCursorImpl* operator->() const { return impl; }
+    MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : 
impl(impl_) {}
+    MergeSortCursorImpl* operator->() const { return impl.get(); }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
     int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 3b17f957deb..f321622012f 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -28,14 +28,6 @@
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/utils/util.hpp"
 
-namespace doris {
-namespace vectorized {
-class VExprContext;
-} // namespace vectorized
-} // namespace doris
-
-using std::vector;
-
 namespace doris::vectorized {
 
 VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
@@ -68,13 +60,14 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) 
{
     _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
 }
 
-Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
+Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs) 
{
     try {
         for (const auto& supplier : input_runs) {
             if (_use_sort_desc) {
-                _cursors.emplace_back(supplier, _desc);
+                
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, 
_desc));
             } else {
-                _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, 
_nulls_first);
+                
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
+                        supplier, _ordering_expr, _is_asc_order, 
_nulls_first));
             }
         }
     } catch (const std::exception& e) {
@@ -82,15 +75,8 @@ Status VSortedRunMerger::prepare(const 
vector<BlockSupplier>& input_runs) {
     }
 
     for (auto& _cursor : _cursors) {
-        if (!_cursor._is_eof) {
-            _priority_queue.push(MergeSortCursor(&_cursor));
-        }
-    }
-
-    for (const auto& cursor : _cursors) {
-        if (!cursor._is_eof) {
-            _empty_block = cursor.create_empty_blocks();
-            break;
+        if (!_cursor->_is_eof) {
+            _priority_queue.push(MergeSortCursor(_cursor));
         }
     }
 
@@ -145,7 +131,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
             }
         } else {
             if (current->block_ptr() != nullptr) {
-                for (int i = 0; i < current->all_columns.size(); i++) {
+                for (int i = 0; i < current->block->columns(); i++) {
                     auto& column_with_type = 
current->block_ptr()->get_by_position(i);
                     column_with_type.column = column_with_type.column->cut(
                             current->pos, current->rows - current->pos);
@@ -162,9 +148,9 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
             }
         }
     } else {
-        size_t num_columns = _empty_block.columns();
-        MutableBlock m_block =
-                VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
_empty_block);
+        size_t num_columns = _priority_queue.top().impl->block->columns();
+        MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
+                output_block, *_priority_queue.top().impl->block);
         MutableColumns& merged_columns = m_block.mutable_columns();
 
         if (num_columns != merged_columns.size()) {
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 943956d8c38..844704fd130 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -30,9 +30,7 @@
 #include "vec/core/sort_description.h"
 #include "vec/exprs/vexpr_fwd.h"
 
-namespace doris {
-
-namespace vectorized {
+namespace doris::vectorized {
 
 // VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is 
a sorted
 // sequence of blocks, which are fetched from a BlockSupplier function object.
@@ -78,14 +76,12 @@ protected:
 
     bool _pipeline_engine_enabled = false;
 
-    std::vector<BlockSupplierSortCursorImpl> _cursors;
+    std::vector<std::shared_ptr<BlockSupplierSortCursorImpl>> _cursors;
     std::priority_queue<MergeSortCursor> _priority_queue;
 
     /// In pipeline engine, if a cursor needs to read one more block from 
supplier,
     /// we make it as a pending cursor until the supplier is readable.
-    MergeSortCursorImpl* _pending_cursor = nullptr;
-
-    Block _empty_block;
+    std::shared_ptr<MergeSortCursorImpl> _pending_cursor = nullptr;
 
     // Times calls to get_next().
     RuntimeProfile::Counter* _get_next_timer = nullptr;
@@ -105,5 +101,4 @@ private:
     bool has_next_block(MergeSortCursor& current);
 };
 
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/regression-test/data/query_p0/operator/test_sort_operator.out 
b/regression-test/data/query_p0/operator/test_sort_operator.out
new file mode 100644
index 00000000000..b3bd14633d2
--- /dev/null
+++ b/regression-test/data/query_p0/operator/test_sort_operator.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+100174 \N
+100271 \N
+100271 \N
+100271 \N
+100471 \N
+100471 \N
+100471 \N
+100567 \N
+100567 \N
+
diff --git a/regression-test/suites/query_p0/operator/test_sort_operator.groovy 
b/regression-test/suites/query_p0/operator/test_sort_operator.groovy
new file mode 100644
index 00000000000..ae58b45df69
--- /dev/null
+++ b/regression-test/suites/query_p0/operator/test_sort_operator.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_sort_operator", "query,p0,arrow_flight_sql") {
+
+    sql """
+        DROP TABLE IF EXISTS dim_org_ful;
+    """
+
+    sql """
+        CREATE TABLE `dim_org_ful` (
+          `org_id` int(11) NOT NULL COMMENT '',
+          `start_dt` date NOT NULL COMMENT '',
+          `end_dt` date REPLACE_IF_NOT_NULL NULL COMMENT ''
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`org_id`, `start_dt`)
+        COMMENT '网点'
+        DISTRIBUTED BY HASH(`org_id`) BUCKETS 3
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "is_being_synced" = "false",
+        "storage_format" = "V2",
+        "light_schema_change" = "true",
+        "disable_auto_compaction" = "false",
+        "enable_single_replica_compaction" = "false"
+        );
+    """
+
+    sql """
+        DROP TABLE IF EXISTS dim_day;
+    """
+
+    sql """
+        CREATE TABLE `dim_day` (
+          `day_key` varchar(80) NULL,
+          `day_date` date NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`day_key`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`day_key`, `day_date`) BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "is_being_synced" = "false",
+        "storage_format" = "V2",
+        "light_schema_change" = "true",
+        "disable_auto_compaction" = "false"
+        );
+    """
+
+    sql """
+        INSERT INTO `dim_day` VALUES
+            ('20231006','2023-10-06'),
+            ('20231010','2023-10-10'),
+            ('20230822','2023-08-22'),
+            ('20230829','2023-08-29'),
+            ('20230925','2023-09-25'),
+            ('20230731','2023-07-31'),
+            ('20230928','2023-09-28'),
+            ('20230727','2023-07-27'),
+            ('20230801','2023-08-01'),
+            ('20231017','2023-10-17');
+    """
+
+    sql """INSERT INTO `dim_org_ful` VALUES
+           (20,'2023-08-02','3000-12-31'),
+           (100174,'2023-07-31','2023-08-01'),
+           (100174,'2023-08-01','3000-12-31'),
+           (100271,'2023-07-26','3000-12-31'),
+           (100424,'2023-08-02','3000-12-31'),
+           (100471,'2023-07-26','3000-12-31'),
+           (100567,'2023-07-29','2023-07-30'),
+           (100567,'2023-07-30','2023-07-31'),
+           (100567,'2023-07-31','3000-12-31'),
+           (100723,'2023-07-30','2023-07-31');"""
+
+    sql """
+        set batch_size = 9;
+    """
+    sql """
+        set parallel_pipeline_task_num=1;
+    """
+
+    order_qt_select """
+        with `dim_org` AS(
+        SELECT
+            `t0`.`day_date` AS `ds`,
+            `org_id` AS `org_id`
+        FROM
+            `dim_day` t0
+            INNER JOIN `dim_org_ful` t1 ON `t0`.`day_date` BETWEEN 
`t1`.`start_dt`
+            AND `t1`.`end_dt` - 1.0
+        WHERE
+            `t0`.`day_date` BETWEEN '2021-01-01 00:00:00'
+            AND '2023-08-07'
+        )
+        select org_id,null from dim_org order by 1,2 limit 1,10
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to