This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d9a6669c5 [Enchancement](sort) optimize for heap sort (#51368)
d0d9a6669c5 is described below

commit d0d9a6669c54133ff2b7d8c423f986a74a6bb635
Author: Pxl <x...@selectdb.com>
AuthorDate: Mon Jun 16 20:13:59 2025 +0800

    [Enchancement](sort) optimize for heap sort (#51368)
    
    use MergeSorterQueue to replace priority_queue.
    ```sql
    SELECT COUNT(UserAgentMajor), COUNT(ResolutionHeight), 
COUNT(URLCategoryID), COUNT(OS), COUNT(EventTime), COUNT(CounterID), 
COUNT(EventDate), COUNT(UserID), COUNT(UTMSource), COUNT(UTMMedium), 
COUNT(UTMCampaign), COUNT(UTMContent) FROM (     SELECT UserAgentMajor, 
ResolutionHeight, URLCategoryID, OS, EventTime, CounterID, EventDate, UserID, 
UTMSource, UTMMedium, UTMCampaign, UTMContent     FROM hits_100m     ORDER BY 
UserAgentMajor, ResolutionHeight, URLCategoryID, OS, EventTime     L [...]
    
    before:
    -  AppendBlockTime:  10sec77ms
    -  ExecTime:  13sec646ms
    after:
    -  AppendBlockTime:  1sec811ms
    -  ExecTime:  2sec31ms
    
    ```
---
 be/src/vec/common/sort/heap_sorter.cpp             | 190 ++++-----------------
 be/src/vec/common/sort/heap_sorter.h               |  78 +--------
 be/src/vec/common/sort/partition_sorter.cpp        |   4 +-
 be/src/vec/common/sort/sorter.cpp                  |  16 +-
 be/src/vec/common/sort/sorter.h                    |   5 +-
 be/src/vec/common/sort/topn_sorter.cpp             |   2 +-
 be/src/vec/core/sort_cursor.h                      | 120 ++++---------
 be/src/vec/runtime/vsorted_run_merger.cpp          |   2 +-
 be/test/pipeline/operator/sort_operator_test.cpp   |  33 +++-
 be/test/vec/exec/sort/heap_sorter_test.cpp         |  36 ++--
 be/test/vec/exec/sort/merge_sorter_state.cpp       |   2 +-
 .../data/variant_p0/test_sub_path_pruning.out      | Bin 5855 -> 5855 bytes
 .../data/variant_p0/topn_opt_read_by_rowids.out    | Bin 9886 -> 9886 bytes
 .../suites/variant_p0/test_sub_path_pruning.groovy |  32 ++--
 .../variant_p0/topn_opt_read_by_rowids.groovy      |   8 +-
 15 files changed, 152 insertions(+), 376 deletions(-)

diff --git a/be/src/vec/common/sort/heap_sorter.cpp 
b/be/src/vec/common/sort/heap_sorter.cpp
index dfabd9e436a..aed775cb6d8 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -17,199 +17,67 @@
 
 #include "vec/common/sort/heap_sorter.h"
 
-#include <glog/logging.h>
-
-#include <algorithm>
-
-#include "runtime/primitive_type.h"
-#include "runtime/thread_context.h"
-#include "util/defer_op.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/common/sort/vsort_exec_exprs.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/sort_description.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/exprs/vexpr_context.h"
-
-namespace doris {
+namespace doris::vectorized {
 #include "common/compile_check_begin.h"
-class ObjectPool;
-class RowDescriptor;
-class RuntimeState;
-} // namespace doris
 
-namespace doris::vectorized {
 HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, 
int64_t offset,
                        ObjectPool* pool, std::vector<bool>& is_asc_order,
                        std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc)
         : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
-          _data_size(0),
           _heap_size(limit + offset),
-          _heap(SortingHeap::create_unique()),
-          _topn_filter_rows(0),
-          _init_sort_descs(false) {}
+          _state(MergeSorterState::create_unique(row_desc, offset)) {}
 
 Status HeapSorter::append_block(Block* block) {
-    DCHECK(block->rows() > 0);
-    {
-        SCOPED_TIMER(_materialize_timer);
-        if (_vsort_exec_exprs.need_materialize_tuple()) {
-            auto output_tuple_expr_ctxs = 
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
-            std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
-            for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
-                RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block, 
&valid_column_ids[i]));
-            }
-
-            Block new_block;
-            int i = 0;
-            const auto& convert_nullable_flags = 
_vsort_exec_exprs.get_convert_nullable_flags();
-            for (auto column_id : valid_column_ids) {
-                if (column_id < 0) {
-                    continue;
-                }
-                if (i < convert_nullable_flags.size() && 
convert_nullable_flags[i]) {
-                    auto column_ptr = 
make_nullable(block->get_by_position(column_id).column);
-                    new_block.insert({column_ptr,
-                                      
make_nullable(block->get_by_position(column_id).type), ""});
-                } else {
-                    new_block.insert(block->get_by_position(column_id));
-                }
-                i++;
-            }
-            block->swap(new_block);
+    auto tmp_block = std::make_shared<Block>(block->clone_empty());
+    RETURN_IF_ERROR(partial_sort(*block, *tmp_block, true));
+    _queue.push(
+            MergeSortCursor(std::make_shared<MergeSortCursorImpl>(tmp_block, 
_sort_description)));
+    _queue_row_num += tmp_block->rows();
+    _data_size += tmp_block->allocated_bytes();
+
+    while (_queue.is_valid() && _queue_row_num > _heap_size) {
+        auto [current, current_rows] = _queue.current();
+        current_rows = std::min(current_rows, _queue_row_num - _heap_size);
+
+        if (!current->impl->is_last(current_rows)) {
+            _queue.next(current_rows);
+        } else {
+            _queue.remove_top();
+            _data_size -= current->impl->block->allocated_bytes();
         }
+        _queue_row_num -= current_rows;
     }
-    if (!_init_sort_descs) {
-        RETURN_IF_ERROR(_prepare_sort_descs(block));
-    }
-    Block tmp_block = block->clone_empty();
-    tmp_block.swap(*block);
-    size_t num_rows = tmp_block.rows();
-    auto block_view =
-            std::make_shared<HeapSortCursorBlockView>(std::move(tmp_block), 
_sort_description);
-    bool filtered = false;
-    if (_heap_size == _heap->size()) {
-        {
-            SCOPED_TIMER(_topn_filter_timer);
-            _do_filter(*block_view, num_rows);
-        }
-        size_t remain_rows = block_view->block.rows();
-        _topn_filter_rows += (num_rows - remain_rows);
-        COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows);
-        filtered = remain_rows == 0;
-        for (size_t i = 0; i < remain_rows; ++i) {
-            HeapSortCursorImpl cursor(i, block_view);
-            _heap->replace_top_if_less(std::move(cursor));
-        }
-    } else {
-        size_t free_slots = std::min<size_t>(_heap_size - _heap->size(), 
num_rows);
-
-        size_t i = 0;
-        for (; i < free_slots; ++i) {
-            HeapSortCursorImpl cursor(i, block_view);
-            _heap->push(std::move(cursor));
-        }
 
-        for (; i < num_rows; ++i) {
-            HeapSortCursorImpl cursor(i, block_view);
-            _heap->replace_top_if_less(std::move(cursor));
-        }
-    }
-    if (!filtered) {
-        _data_size += block_view->block.allocated_bytes();
-    }
     return Status::OK();
 }
 
 Status HeapSorter::prepare_for_read() {
-    if (!_heap->empty() && _heap->size() > _offset) {
-        const auto& top = _heap->top();
-        size_t num_columns = top.block()->columns();
-        MutableColumns result_columns = top.block()->clone_empty_columns();
-
-        size_t init_size = std::min((size_t)_limit, _heap->size());
-        result_columns.reserve(init_size);
-
-        DCHECK(_heap->size() <= _heap_size);
-        // Use a vector to reverse elements in heap
-        std::vector<HeapSortCursorImpl> vector_to_reverse;
-        vector_to_reverse.reserve(init_size);
-        size_t capacity = 0;
-        while (!_heap->empty()) {
-            auto current = _heap->top();
-            _heap->pop();
-            vector_to_reverse.emplace_back(std::move(current));
-            capacity++;
-            if (_offset != 0 && _heap->size() == _offset) {
-                break;
-            }
-        }
-        for (int64_t i = capacity - 1; i >= 0; i--) {
-            auto rid = vector_to_reverse[i].row_id();
-            const auto cur_block = vector_to_reverse[i].block();
-            Columns columns = cur_block->get_columns();
-            for (size_t j = 0; j < num_columns; ++j) {
-                result_columns[j]->insert_from(*(columns[j]), rid);
-            }
+    while (_queue.is_valid()) {
+        auto [current, current_rows] = _queue.current();
+        if (current_rows) {
+            current->impl->reverse();
+            _state->get_queue().push(MergeSortCursor(current->impl));
         }
-        _return_block = 
vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns));
+        _queue.remove_top();
     }
     return Status::OK();
 }
 
 Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
-    _return_block.swap(*block);
-    *eos = true;
-    return Status::OK();
+    return _state->merge_sort_read(block, state->batch_size(), eos);
 }
 
 Field HeapSorter::get_top_value() {
     Field field {PrimitiveType::TYPE_NULL};
     // get field from first sort column of top row
-    if (_heap->size() >= _heap_size) {
-        auto& top = _heap->top();
-        top.sort_columns()[0]->get(top.row_id(), field);
+    if (_queue_row_num >= _heap_size) {
+        auto [current, current_rows] = _queue.current();
+        field = current->get_top_value();
     }
 
     return field;
 }
 
-// need exception safety
-void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t 
num_rows) {
-    const auto& top_cursor = _heap->top();
-    const auto cursor_rid = top_cursor.row_id();
-
-    IColumn::Filter filter(num_rows);
-    for (size_t i = 0; i < num_rows; ++i) {
-        filter[i] = 0;
-    }
-
-    std::vector<uint8_t> cmp_res(num_rows, 0);
-
-    for (size_t col_id = 0; col_id < _sort_description.size(); ++col_id) {
-        block_view.sort_columns[col_id]->compare_internal(
-                cursor_rid, *top_cursor.sort_columns()[col_id],
-                _sort_description[col_id].nulls_direction, 
_sort_description[col_id].direction,
-                cmp_res, filter.data());
-    }
-    block_view.filter_block(filter);
-}
-
-Status HeapSorter::_prepare_sort_descs(Block* block) {
-    _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
-    for (int i = 0; i < _sort_description.size(); i++) {
-        const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
-        RETURN_IF_ERROR(ordering_expr->execute(block, 
&_sort_description[i].column_number));
-
-        _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
-        _sort_description[i].nulls_direction =
-                _nulls_first[i] ? -_sort_description[i].direction : 
_sort_description[i].direction;
-    }
-    _init_sort_descs = true;
-    return Status::OK();
-}
-
 size_t HeapSorter::data_size() const {
     return _data_size;
 }
diff --git a/be/src/vec/common/sort/heap_sorter.h 
b/be/src/vec/common/sort/heap_sorter.h
index b36ef28af70..16a1e7d5d08 100644
--- a/be/src/vec/common/sort/heap_sorter.h
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -16,62 +16,11 @@
 // under the License.
 
 #pragma once
-#include <gen_cpp/Metrics_types.h>
-#include <stddef.h>
-#include <stdint.h>
 
-#include <memory>
-#include <queue>
-#include <utility>
-#include <vector>
-
-#include "common/status.h"
-#include "util/runtime_profile.h"
 #include "vec/common/sort/sorter.h"
-#include "vec/core/block.h"
-#include "vec/core/field.h"
-#include "vec/core/sort_cursor.h"
-
-namespace doris {
-#include "common/compile_check_begin.h"
-class ObjectPool;
-class RowDescriptor;
-class RuntimeState;
-namespace vectorized {
-class VSortExecExprs;
-} // namespace vectorized
-} // namespace doris
 
 namespace doris::vectorized {
-
-class SortingHeap {
-    ENABLE_FACTORY_CREATOR(SortingHeap);
-
-public:
-    const HeapSortCursorImpl& top() { return _queue.top(); }
-
-    size_t size() { return _queue.size(); }
-
-    bool empty() { return _queue.empty(); }
-
-    void pop() { _queue.pop(); }
-
-    void replace_top(HeapSortCursorImpl&& top) {
-        _queue.pop();
-        _queue.push(std::move(top));
-    }
-
-    void push(HeapSortCursorImpl&& cursor) { _queue.push(std::move(cursor)); }
-
-    void replace_top_if_less(HeapSortCursorImpl&& val) {
-        if (val < top()) {
-            replace_top(std::move(val));
-        }
-    }
-
-private:
-    std::priority_queue<HeapSortCursorImpl> _queue;
-};
+#include "common/compile_check_begin.h"
 
 class HeapSorter final : public Sorter {
     ENABLE_FACTORY_CREATOR(HeapSorter);
@@ -83,12 +32,6 @@ public:
 
     ~HeapSorter() override = default;
 
-    void init_profile(RuntimeProfile* runtime_profile) override {
-        _topn_filter_timer = ADD_TIMER(runtime_profile, "TopNFilterTime");
-        _topn_filter_rows_counter = ADD_COUNTER(runtime_profile, 
"TopNFilterRows", TUnit::UNIT);
-        _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime");
-    }
-
     Status append_block(Block* block) override;
 
     Status prepare_for_read() override;
@@ -99,23 +42,14 @@ public:
 
     Field get_top_value() override;
 
-    static constexpr size_t HEAP_SORT_THRESHOLD = 1024;
-
 private:
-    void _do_filter(HeapSortCursorBlockView& block_view, size_t num_rows);
-
     Status _prepare_sort_descs(Block* block);
 
-    size_t _data_size;
-    size_t _heap_size;
-    std::unique_ptr<SortingHeap> _heap;
-    Block _return_block;
-    int64_t _topn_filter_rows;
-    bool _init_sort_descs;
-
-    RuntimeProfile::Counter* _topn_filter_timer = nullptr;
-    RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr;
-    RuntimeProfile::Counter* _materialize_timer = nullptr;
+    size_t _data_size = 0;
+    size_t _heap_size = 0;
+    size_t _queue_row_num = 0;
+    MergeSorterQueue _queue;
+    std::unique_ptr<MergeSorterState> _state;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index 09c45e83c6a..e1b27e790a4 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -47,7 +47,7 @@ PartitionSorter::PartitionSorter(VSortExecExprs& 
vsort_exec_exprs, int64_t limit
                                  bool has_global_limit, int64_t 
partition_inner_limit,
                                  TopNAlgorithm::type top_n_algorithm, 
SortCursorCmp* previous_row)
         : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
-          _state(MergeSorterState::create_unique(row_desc, offset, limit, 
state, profile)),
+          _state(MergeSorterState::create_unique(row_desc, offset)),
           _row_desc(row_desc),
           _partition_inner_limit(partition_inner_limit),
           _top_n_algorithm(
@@ -81,7 +81,7 @@ Status PartitionSorter::prepare_for_read() {
 void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
     std::priority_queue<MergeSortBlockCursor> empty_queue;
     std::swap(_block_priority_queue, empty_queue);
-    _state = MergeSorterState::create_unique(_row_desc, _offset, _limit, 
runtime_state, nullptr);
+    _state = MergeSorterState::create_unique(_row_desc, _offset);
     // _previous_row->impl inited at partition_sort_read function,
     // but maybe call get_next after do_partition_topn_sort() function, and 
running into else if branch at line 92L
     // so _previous_row->impl == nullptr and no need reset.
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 1df2d2bdc62..a2a69552192 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -152,7 +152,7 @@ Status Sorter::merge_sort_read_for_spill(RuntimeState* 
state, doris::vectorized:
     return get_next(state, block, eos);
 }
 
-Status Sorter::partial_sort(Block& src_block, Block& dest_block) {
+Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool 
reversed) {
     size_t num_cols = src_block.columns();
     if (_materialize_sort_exprs) {
         auto output_tuple_expr_ctxs = 
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
@@ -189,18 +189,18 @@ Status Sorter::partial_sort(Block& src_block, Block& 
dest_block) {
         _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
         _sort_description[i].nulls_direction =
                 _nulls_first[i] ? -_sort_description[i].direction : 
_sort_description[i].direction;
+        if (reversed) {
+            _sort_description[i].direction *= -1;
+        }
     }
 
     {
         SCOPED_TIMER(_partial_sort_timer);
-        if (_materialize_sort_exprs) {
-            sort_block(dest_block, dest_block, _sort_description, _offset + 
_limit);
-        } else {
-            sort_block(src_block, dest_block, _sort_description, _offset + 
_limit);
-        }
-        src_block.clear_column_data(num_cols);
+        uint64_t limit = reversed ? 0 : (_offset + _limit);
+        sort_block(*result_block, dest_block, _sort_description, limit);
     }
 
+    src_block.clear_column_data(num_cols);
     return Status::OK();
 }
 
@@ -209,7 +209,7 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, 
int64_t limit, int64_t
                        std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc,
                        RuntimeState* state, RuntimeProfile* profile)
         : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
-          _state(MergeSorterState::create_unique(row_desc, offset, limit, 
state, profile)) {}
+          _state(MergeSorterState::create_unique(row_desc, offset)) {}
 
 // check whether the unsorted block can hold more data from input block and no 
need to alloc new memory
 bool FullSorter::has_enough_capacity(Block* input_block, Block* 
unsorted_block) const {
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 37e74947ccc..cd56af60775 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -52,8 +52,7 @@ class MergeSorterState {
     ENABLE_FACTORY_CREATOR(MergeSorterState);
 
 public:
-    MergeSorterState(const RowDescriptor& row_desc, int64_t offset, int64_t 
limit,
-                     RuntimeState* state, RuntimeProfile* profile)
+    MergeSorterState(const RowDescriptor& row_desc, int64_t offset)
             // create_empty_block should ignore invalid slots, unsorted_block
             // should be same structure with arrival block from child node
             // since block from child node may ignored these slots
@@ -152,7 +151,7 @@ public:
     void set_enable_spill() { _enable_spill = true; }
 
 protected:
-    Status partial_sort(Block& src_block, Block& dest_block);
+    Status partial_sort(Block& src_block, Block& dest_block, bool reversed = 
false);
 
     bool _enable_spill = false;
     SortDescription _sort_description;
diff --git a/be/src/vec/common/sort/topn_sorter.cpp 
b/be/src/vec/common/sort/topn_sorter.cpp
index 3e0bc5ee688..fe3cecca5cd 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -44,7 +44,7 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, 
int64_t limit, int64_t
                        std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc,
                        RuntimeState* state, RuntimeProfile* profile)
         : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
-          _state(MergeSorterState::create_unique(row_desc, offset, limit, 
state, profile)),
+          _state(MergeSorterState::create_unique(row_desc, offset)),
           _row_desc(row_desc) {}
 
 Status TopNSorter::append_block(Block* block) {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 9331508c376..3e3baf6a9e0 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -24,98 +24,12 @@
 
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
+#include "vec/core/field.h"
 #include "vec/core/sort_description.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
 
-struct HeapSortCursorBlockView {
-public:
-    Block block;
-    ColumnRawPtrs sort_columns;
-    SortDescription& desc;
-
-    HeapSortCursorBlockView(Block&& cur_block, SortDescription& sort_desc)
-            : block(cur_block), desc(sort_desc) {
-        _reset();
-    }
-
-    // need exception safety
-    void filter_block(IColumn::Filter& filter) {
-        Block::filter_block_internal(&block, filter, block.columns());
-        _reset();
-    }
-
-private:
-    void _reset() {
-        sort_columns.clear();
-        auto columns = block.get_columns_and_convert();
-        for (auto& column_desc : desc) {
-            size_t column_number = !column_desc.column_name.empty()
-                                           ? 
block.get_position_by_name(column_desc.column_name)
-                                           : column_desc.column_number;
-            sort_columns.push_back(columns[column_number].get());
-        }
-    }
-};
-
-using HeapSortCursorBlockSPtr = std::shared_ptr<HeapSortCursorBlockView>;
-
-struct HeapSortCursorImpl {
-public:
-    HeapSortCursorImpl(size_t row_id, HeapSortCursorBlockSPtr block_view)
-            : _row_id(row_id), _block_view(std::move(block_view)) {}
-
-    HeapSortCursorImpl(const HeapSortCursorImpl& other) {
-        _row_id = other._row_id;
-        _block_view = other._block_view;
-    }
-
-    HeapSortCursorImpl(HeapSortCursorImpl&& other) {
-        _row_id = other._row_id;
-        _block_view = other._block_view;
-        other._block_view = nullptr;
-    }
-
-    HeapSortCursorImpl& operator=(HeapSortCursorImpl&& other) {
-        std::swap(_row_id, other._row_id);
-        std::swap(_block_view, other._block_view);
-        return *this;
-    }
-
-    ~HeapSortCursorImpl() = default;
-
-    size_t row_id() const { return _row_id; }
-
-    const ColumnRawPtrs& sort_columns() const { return 
_block_view->sort_columns; }
-
-    const Block* block() const { return &_block_view->block; }
-
-    const SortDescription& sort_desc() const { return _block_view->desc; }
-
-    bool operator<(const HeapSortCursorImpl& rhs) const {
-        for (size_t i = 0; i < sort_desc().size(); ++i) {
-            int direction = sort_desc()[i].direction;
-            int nulls_direction = sort_desc()[i].nulls_direction;
-            int res = direction * sort_columns()[i]->compare_at(row_id(), 
rhs.row_id(),
-                                                                
*(rhs.sort_columns()[i]),
-                                                                
nulls_direction);
-            // ASC: direction == 1. If bigger, res > 0. So we return true.
-            if (res < 0) {
-                return true;
-            }
-            if (res > 0) {
-                return false;
-            }
-        }
-        return false;
-    }
-
-private:
-    size_t _row_id;
-    HeapSortCursorBlockSPtr _block_view;
-};
-
 /** Cursor allows to compare rows in different blocks (and parts).
   * Cursor moves inside single block.
   * It is used in priority queue.
@@ -127,8 +41,8 @@ struct MergeSortCursorImpl {
     ColumnRawPtrs columns;
     SortDescription desc;
     size_t sort_columns_size = 0;
-    size_t pos = 0;
-    size_t rows = 0;
+    int pos = 0;
+    int rows = 0;
 
     MergeSortCursorImpl() = default;
     virtual ~MergeSortCursorImpl() = default;
@@ -145,6 +59,22 @@ struct MergeSortCursorImpl {
 
     bool empty() const { return rows == 0; }
 
+    void reverse() {
+        MutableColumns columns_reversed;
+        for (auto& column : columns) {
+            auto col_reversed = column->clone_empty();
+            for (int j = rows - 1; j >= pos; j--) {
+                col_reversed->insert_from(*column, j);
+            }
+            columns_reversed.push_back(std::move(col_reversed));
+        }
+        block->set_columns(std::move(columns_reversed));
+        for (auto& column_desc : desc) {
+            column_desc.direction *= -1;
+        }
+        reset();
+    }
+
     /// Set the cursor to the beginning of the new block.
     void reset() {
         sort_columns.clear();
@@ -174,6 +104,12 @@ struct MergeSortCursorImpl {
     virtual void process_next() {}
     virtual Block* block_ptr() { return nullptr; }
     virtual bool eof() const { return false; }
+
+    Field get_top_value() const {
+        Field field {PrimitiveType::TYPE_NULL};
+        sort_columns[0]->get(pos, field);
+        return field;
+    }
 };
 
 using BlockSupplier = std::function<Status(Block*, bool* eos)>;
@@ -287,6 +223,8 @@ struct MergeSortCursor {
 
     /// Inverted so that the priority queue elements are removed in ascending 
order.
     bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); }
+
+    Field get_top_value() const { return impl->get_top_value(); }
 };
 
 /// For easy copying.
@@ -423,8 +361,8 @@ public:
         }
     }
 
-    void push(MergeSortCursorImpl& cursor) {
-        _queue.emplace_back(&cursor);
+    void push(MergeSortCursor cursor) {
+        _queue.emplace_back(std::move(cursor));
         std::push_heap(_queue.begin(), _queue.end());
         next_child_idx = 0;
 
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 25376f9216b..8483e78f25b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -119,7 +119,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
         DCHECK(!current->eof());
         DCHECK(current->block_ptr() != nullptr);
         while (_offset != 0) {
-            auto process_rows = std::min(current->rows - current->pos, 
_offset);
+            auto process_rows = std::min(current->rows - current->pos, 
(int)_offset);
             current->next(process_rows);
             _offset -= process_rows;
             if (current->is_last(0)) {
diff --git a/be/test/pipeline/operator/sort_operator_test.cpp 
b/be/test/pipeline/operator/sort_operator_test.cpp
index cd1a4c35d85..0cab5a95bd9 100644
--- a/be/test/pipeline/operator/sort_operator_test.cpp
+++ b/be/test/pipeline/operator/sort_operator_test.cpp
@@ -110,9 +110,9 @@ struct SortOperatorTest : public ::testing::Test {
             state->emplace_local_state(source->operator_id(), 
std::move(source_local_state_uptr));
         }
 
-        { EXPECT_TRUE(sink_local_state->open(state.get()).ok()); }
+        EXPECT_TRUE(sink_local_state->open(state.get()).ok());
 
-        { EXPECT_TRUE(source_local_state->open(state.get()).ok()); }
+        EXPECT_TRUE(source_local_state->open(state.get()).ok());
     }
 
     bool is_block(std::vector<Dependency*> deps) {
@@ -167,11 +167,17 @@ TEST_F(SortOperatorTest, test) {
         bool eos = false;
         auto st = source->get_block(state.get(), &block, &eos);
         EXPECT_TRUE(st.ok()) << st.msg();
-        EXPECT_TRUE(eos);
+        EXPECT_FALSE(eos);
         EXPECT_EQ(block.rows(), 3);
         std::cout << block.dump_data() << std::endl;
         EXPECT_TRUE(ColumnHelper::block_equal(
                 block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+
+        block.clear();
+        st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_TRUE(eos);
+        EXPECT_EQ(block.rows(), 0);
     }
 }
 
@@ -200,11 +206,26 @@ TEST_F(SortOperatorTest, test_dep) {
         bool eos = false;
         auto st = source->get_block(state.get(), &block, &eos);
         EXPECT_TRUE(st.ok()) << st.msg();
-        EXPECT_TRUE(eos);
-        EXPECT_EQ(block.rows(), 6);
+        EXPECT_FALSE(eos);
+        EXPECT_EQ(block.rows(), 3);
         std::cout << block.dump_data() << std::endl;
         EXPECT_TRUE(ColumnHelper::block_equal(
-                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5, 6})));
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+
+        block.clear();
+        st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_FALSE(eos);
+        EXPECT_EQ(block.rows(), 3);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({4, 5, 6})));
+
+        block.clear();
+        st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_TRUE(eos);
+        EXPECT_EQ(block.rows(), 0);
     }
 }
 
diff --git a/be/test/vec/exec/sort/heap_sorter_test.cpp 
b/be/test/vec/exec/sort/heap_sorter_test.cpp
index b1e13b4e150..24735190fc8 100644
--- a/be/test/vec/exec/sort/heap_sorter_test.cpp
+++ b/be/test/vec/exec/sort/heap_sorter_test.cpp
@@ -98,32 +98,48 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
         EXPECT_TRUE(st.ok());
     }
 
-    EXPECT_EQ(sorter->_heap->size(), 6);
+    EXPECT_EQ(sorter->_queue_row_num, 6);
 
     {
         Block block = ColumnHelper::create_block<DataTypeInt64>({6}, {6});
         auto st = sorter->append_block(&block);
         EXPECT_TRUE(st.ok());
-    }
 
-    EXPECT_EQ(sorter->_heap->size(), 6);
+        EXPECT_EQ(sorter->_queue_row_num, 6);
 
-    static_cast<void>(sorter->get_top_value());
+        auto value = sorter->get_top_value();
+        Field real;
+        block.get_by_position(0).column->get(0, real);
+        EXPECT_EQ(value, real);
+    }
 
     EXPECT_TRUE(sorter->prepare_for_read());
 
     {
         Block block;
-        bool eos;
+        bool eos = false;
         EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
-        std::cout << block.dump_data() << std::endl;
-        EXPECT_EQ(block.rows(), 6);
-
+        EXPECT_EQ(block.rows(), 5);
+        EXPECT_EQ(eos, false);
         EXPECT_TRUE(ColumnHelper::block_equal(
                 block,
                 Block 
{ColumnHelper::create_nullable_column_with_name<DataTypeInt64>(
-                               {1, 2, 3, 4, 5, 6}, {false, false, false, 
false, false, false}),
-                       
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6})}));
+                               {1, 2, 3, 4, 5}, {false, false, false, false, 
false}),
+                       
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5})}));
+
+        block.clear();
+        EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+        EXPECT_EQ(block.rows(), 1);
+        EXPECT_EQ(eos, false);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block,
+                Block 
{ColumnHelper::create_nullable_column_with_name<DataTypeInt64>({6}, {false}),
+                       
ColumnHelper::create_column_with_name<DataTypeInt64>({6})}));
+
+        block.clear();
+        EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_EQ(eos, true);
     }
 }
 
diff --git a/be/test/vec/exec/sort/merge_sorter_state.cpp 
b/be/test/vec/exec/sort/merge_sorter_state.cpp
index 34223fd1f3a..0bc4da12d9c 100644
--- a/be/test/vec/exec/sort/merge_sorter_state.cpp
+++ b/be/test/vec/exec/sort/merge_sorter_state.cpp
@@ -62,7 +62,7 @@ std::shared_ptr<Block> create_block(std::vector<int64_t> 
data) {
 }
 
 TEST_F(MergeSorterStateTest, test1) {
-    state.reset(new MergeSorterState(*row_desc, 0, -1, &_state, &_profile));
+    state.reset(new MergeSorterState(*row_desc, 0));
     state->add_sorted_block(create_block({1, 2, 3}));
     state->add_sorted_block(create_block({4, 5, 6}));
     state->add_sorted_block(create_block({}));
diff --git a/regression-test/data/variant_p0/test_sub_path_pruning.out 
b/regression-test/data/variant_p0/test_sub_path_pruning.out
index 16328739167..f8f9e3f3894 100644
Binary files a/regression-test/data/variant_p0/test_sub_path_pruning.out and 
b/regression-test/data/variant_p0/test_sub_path_pruning.out differ
diff --git a/regression-test/data/variant_p0/topn_opt_read_by_rowids.out 
b/regression-test/data/variant_p0/topn_opt_read_by_rowids.out
index 6ee3844f7ef..93ae6f7c5a3 100644
Binary files a/regression-test/data/variant_p0/topn_opt_read_by_rowids.out and 
b/regression-test/data/variant_p0/topn_opt_read_by_rowids.out differ
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy 
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index 2ca4ab06683..1de622047a4 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -161,20 +161,20 @@ suite("variant_sub_path_pruning", "variant_type"){
     order_qt_sql """select c1['c']['d'] from (select dt['a']['b'] as c1 from 
pruning_test union all select dt['a'] as c1 from pruning_test union all select 
dt as c1 from pruning_test) v1;"""
 
     // one table + one const list
-    order_qt_sql """select id, cast(c1['a'] as text) from (select 
cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from 
pruning_test) tmp order by id limit 100;"""
-    order_qt_sql """select c1['a'] from (select id, c1 from (select 
cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from 
pruning_test) tmp order by id limit 100) tmp;"""
-    order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as 
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all 
select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
-    // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from 
(select cast('1' as variant) as c1, 1 as id union all select dt as c1, id from 
pruning_test) tmp order by id limit 100) tmp;"""
-    order_qt_sql """select id, cast(c1['c'] as text) from (select 
cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, 
id from pruning_test) tmp order by 1, 2 limit 100;"""
-    order_qt_sql """select c1['c'] from (select id, c1 from (select 
cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, 
id from pruning_test) tmp order by id limit 100) tmp;"""
-    order_qt_sql """select  cast(c2['d'] as text)  from (select id, c1['a'] as 
c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all 
select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) 
tmp;"""
-    // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from 
(select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select 
dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
+    order_qt_sql """select id, cast(c1['a'] as text) from (select 
cast('{"a":1}' as variant) as c1, 0 as id union all select dt as c1, id from 
pruning_test) tmp order by id limit 100;"""
+    order_qt_sql """select c1['a'] from (select id, c1 from (select 
cast('{"a":1}' as variant) as c1, 0 as id union all select dt as c1, id from 
pruning_test) tmp order by id limit 100) tmp order by id;"""
+    order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as 
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all 
select dt as c1, id from pruning_test) tmp order by id limit 100) tmp order by 
id;"""
+    // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from 
(select cast('1' as variant) as c1, 0 as id union all select dt as c1, id from 
pruning_test) tmp order by id limit 100) tmp;"""
+    order_qt_sql """select id, cast(c1['c'] as text) from (select 
cast('{"c":1}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1, 
id from pruning_test) tmp order by 1, 2 limit 100;"""
+    order_qt_sql """select c1['c'] from (select id, c1 from (select 
cast('{"c":1}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1, 
id from pruning_test) tmp order by id limit 100) tmp order by id;"""
+    order_qt_sql """select  cast(c2['d'] as text)  from (select id, c1['a'] as 
c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 0 as id union all 
select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp 
order by id;"""
+    // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from 
(select cast('{"c":{"d":1}}' as variant) as c1, 0 as id union all select 
dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
 
     // two const list
-    order_qt_sql """select id, cast(c1['a'] as text) from (select 
cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as 
variant) as c1, 2 as id) tmp order by id limit 100;"""
-    order_qt_sql """select c1['a'] from (select id, c1 from (select 
cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as 
variant) as c1, 2 as id) tmp order by id limit 100) tmp;"""
-    order_qt_sql """select cast(c2['b'] as text) from (select id, c1['a'] as 
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all 
select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 
100) tmp;"""
-    order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select 
cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all select 
cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) 
tmp;"""
+    order_qt_sql """select id, cast(c1['a'] as text) from (select 
cast('{"a":1}' as variant) as c1, 0 as id union all select cast('{"a":1}' as 
variant) as c1, 2 as id) tmp order by id limit 100;"""
+    order_qt_sql """select c1['a'] from (select id, c1 from (select 
cast('{"a":1}' as variant) as c1, 0 as id union all select cast('{"a":1}' as 
variant) as c1, 2 as id) tmp order by id limit 100) tmp order by id;"""
+    order_qt_sql """select cast(c2['b'] as text) from (select id, c1['a'] as 
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all 
select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 
100) tmp order by id;"""
+    order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select 
cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all select 
cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp 
order by id;"""
 
 
     // join
@@ -212,8 +212,8 @@ suite("variant_sub_path_pruning", "variant_type"){
     order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select id, 
cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 from pruning_test order by 
id limit 100) tmp;"""
 
     // varaint in one row relation
-    order_qt_sql """select c1['a'] from (select 1 as id, cast('{"a":1}' as 
variant) as c1 order by id limit 100) tmp;"""
-    order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 1 as id, 
cast('{"a":1, "b":2}' as variant) as c1 order by id limit 100) tmp;"""
-    order_qt_sql """select c1['a'] from (select  1 as id, cast('{"b":{"a":1}}' 
as variant)["b"] as c1 order by id limit 100) tmp;"""
-    order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 1 as id, 
cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 order by id limit 100) 
tmp;"""
+    order_qt_sql """select c1['a'] from (select 0 as id, cast('{"a":1}' as 
variant) as c1 order by id limit 100) tmp;"""
+    order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 0 as id, 
cast('{"a":1, "b":2}' as variant) as c1 order by id limit 100) tmp;"""
+    order_qt_sql """select c1['a'] from (select  0 as id, cast('{"b":{"a":1}}' 
as variant)["b"] as c1 order by id limit 100) tmp;"""
+    order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 0 as id, 
cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 order by id limit 100) 
tmp;"""
 }
\ No newline at end of file
diff --git a/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy 
b/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
index dd597755a6d..15594775c40 100644
--- a/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
+++ b/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
@@ -77,14 +77,14 @@ PROPERTIES (
         INSERT INTO `test_web_log` VALUES ('2024-04-09 09:01:39', '', '', '', 
'', '1712624474952', '0004.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '', 
NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '', 
'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826', 
'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIm
 [...]
         """
     sql "set topn_opt_limit_threshold = 1024"
-    qt_sql """SELECT
+    order_qt_sql """SELECT
             * FROM
              test_web_log 
             WHERE
             ts  >= '1712480940849' 
              AND ts  <= '1712805483291'
             ORDER BY
-             ts DESC 
+             ts DESC
              LIMIT 10"""
     sql """
         INSERT INTO `test_web_log` VALUES ('2024-04-09 09:02:31', '', '', '', 
'', '1712624495211', '004.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '', 
NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '', 
'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826', 
'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIml
 [...]
@@ -95,10 +95,10 @@ PROPERTIES (
     sql """
         INSERT INTO `test_web_log` VALUES ('2024-04-09 09:04:33', '', '', '', 
'', '1712624474959', '0024.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '', 
NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '', 
'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826', 
'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIm
 [...]
         """
-    qt_sql """SELECT
+    order_qt_sql """SELECT
             * FROM
              test_web_log 
             ORDER BY
-             ts DESC 
+             ts DESC
              LIMIT 10"""
 } 
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to