This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cfaae0031 [Improvement](sort) Use heap sort to optimize sort node 
(#12700)
3cfaae0031 is described below

commit 3cfaae0031f2a4260528cd4d1b1855af82bf0c3b
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Sep 21 10:01:52 2022 +0800

    [Improvement](sort) Use heap sort to optimize sort node (#12700)
---
 be/src/util/simd/bits.h                   |   2 +-
 be/src/vec/CMakeLists.txt                 |   2 +
 be/src/vec/columns/column.cpp             |  21 ++++
 be/src/vec/columns/column.h               |  10 ++
 be/src/vec/columns/column_decimal.cpp     |  26 +++++
 be/src/vec/columns/column_decimal.h       |   4 +
 be/src/vec/columns/column_string.cpp      |  24 +++++
 be/src/vec/columns/column_string.h        |   4 +
 be/src/vec/columns/column_vector.cpp      |  25 +++++
 be/src/vec/columns/column_vector.h        |   4 +
 be/src/vec/common/sort/heap_sorter.cpp    | 165 ++++++++++++++++++++++++++++
 be/src/vec/common/sort/heap_sorter.h      |  90 ++++++++++++++++
 be/src/vec/common/sort/sorter.cpp         |  85 ++-------------
 be/src/vec/common/sort/sorter.h           |  47 +++-----
 be/src/vec/common/sort/topn_sorter.cpp    |  92 ++++++++++++++++
 be/src/vec/common/sort/topn_sorter.h      |  47 ++++++++
 be/src/vec/core/sort_block.h              |   2 +-
 be/src/vec/core/sort_cursor.h             | 172 +++++++++++++++++++++++++-----
 be/src/vec/exec/vsort_node.cpp            |  36 ++++---
 be/src/vec/exec/vsort_node.h              |   2 -
 be/src/vec/runtime/vsorted_run_merger.cpp |   6 +-
 be/src/vec/runtime/vsorted_run_merger.h   |   6 +-
 22 files changed, 704 insertions(+), 168 deletions(-)

diff --git a/be/src/util/simd/bits.h b/be/src/util/simd/bits.h
index 8edbe72f4a..df91e63c61 100644
--- a/be/src/util/simd/bits.h
+++ b/be/src/util/simd/bits.h
@@ -112,7 +112,7 @@ inline static size_t find_byte(const std::vector<T>& vec, 
size_t start, T byte)
     return (T*)p - vec.data();
 }
 
-inline size_t find_nonzero(const std::vector<uint8_t>& vec, size_t start) {
+inline size_t find_one(const std::vector<uint8_t>& vec, size_t start) {
     return find_byte<uint8_t>(vec, start, 1);
 }
 
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 01f20742e6..0d0497add1 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -54,7 +54,9 @@ set(VEC_FILES
   common/exception.cpp
   common/mremap.cpp
   common/pod_array.cpp
+  common/sort/heap_sorter.cpp
   common/sort/sorter.cpp
+  common/sort/topn_sorter.cpp
   common/sort/vsort_exec_exprs.cpp
   common/string_utils/string_utils.cpp
   core/block.cpp
diff --git a/be/src/vec/columns/column.cpp b/be/src/vec/columns/column.cpp
index 845600bc1b..44521cc98c 100644
--- a/be/src/vec/columns/column.cpp
+++ b/be/src/vec/columns/column.cpp
@@ -52,6 +52,27 @@ void IColumn::sort_column(const ColumnSorter* sorter, 
EqualFlags& flags,
     sorter->sort_column(static_cast<const IColumn&>(*this), flags, perms, 
range, last_column);
 }
 
+void IColumn::compare_internal(size_t rhs_row_id, const IColumn& rhs, int 
nan_direction_hint,
+                               int direction, std::vector<uint8>& cmp_res,
+                               uint8* __restrict filter) const {
+    auto sz = this->size();
+    DCHECK(cmp_res.size() == sz);
+    size_t begin = simd::find_zero(cmp_res, 0);
+    while (begin < sz) {
+        size_t end = simd::find_one(cmp_res, begin + 1);
+        for (size_t row_id = begin; row_id < end; row_id++) {
+            int res = this->compare_at(row_id, rhs_row_id, rhs, 
nan_direction_hint);
+            if (res * direction < 0) {
+                filter[row_id] = 1;
+                cmp_res[row_id] = 1;
+            } else if (res * direction > 0) {
+                cmp_res[row_id] = 1;
+            }
+        }
+        begin = simd::find_zero(cmp_res, end + 1);
+    }
+}
+
 bool is_column_nullable(const IColumn& column) {
     return check_column<ColumnNullable>(column);
 }
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index f58cfa8c0e..94b39ba1e0 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -402,6 +402,16 @@ public:
     virtual int compare_at(size_t n, size_t m, const IColumn& rhs,
                            int nan_direction_hint) const = 0;
 
+    /**
+     * To compare all rows in this column with another row (with row_id = 
rhs_row_id in column rhs)
+     * @param nan_direction_hint and direction indicates the ordering.
+     * @param cmp_res if we already has a comparison result for row i, e.g. 
cmp_res[i] = 1, we can skip row i
+     * @param filter this stores comparison results for all rows. filter[i] = 
1 means row i is less than row rhs_row_id in rhs
+     */
+    virtual void compare_internal(size_t rhs_row_id, const IColumn& rhs, int 
nan_direction_hint,
+                                  int direction, std::vector<uint8>& cmp_res,
+                                  uint8* __restrict filter) const;
+
     /** Returns a permutation that sorts elements of this column,
       *  i.e. perm[i]-th element of source column should be i-th element of 
sorted column.
       * reverse - reverse ordering (acsending).
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index 690714b2ba..ca7260b16c 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -381,6 +381,32 @@ void ColumnDecimal<T>::sort_column(const ColumnSorter* 
sorter, EqualFlags& flags
     sorter->template sort_column(static_cast<const Self&>(*this), flags, 
perms, range, last_column);
 }
 
+template <typename T>
+void ColumnDecimal<T>::compare_internal(size_t rhs_row_id, const IColumn& rhs,
+                                        int nan_direction_hint, int direction,
+                                        std::vector<uint8>& cmp_res,
+                                        uint8* __restrict filter) const {
+    auto sz = this->size();
+    DCHECK(cmp_res.size() == sz);
+    const auto& cmp_base = assert_cast<const 
ColumnDecimal<T>&>(rhs).get_data()[rhs_row_id];
+
+    size_t begin = simd::find_zero(cmp_res, 0);
+    while (begin < sz) {
+        size_t end = simd::find_one(cmp_res, begin + 1);
+        for (size_t row_id = begin; row_id < end; row_id++) {
+            auto value_a = get_data()[row_id];
+            int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0);
+            if (res * direction < 0) {
+                filter[row_id] = 1;
+                cmp_res[row_id] = 1;
+            } else if (res * direction > 0) {
+                cmp_res[row_id] = 1;
+            }
+        }
+        begin = simd::find_zero(cmp_res, end + 1);
+    }
+}
+
 template <>
 Decimal32 ColumnDecimal<Decimal32>::get_scale_multiplier() const {
     return common::exp10_i32(scale);
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 9957742c0d..bd1d90e63c 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -233,6 +233,10 @@ public:
     void sort_column(const ColumnSorter* sorter, EqualFlags& flags, 
IColumn::Permutation& perms,
                      EqualRange& range, bool last_column) const override;
 
+    void compare_internal(size_t rhs_row_id, const IColumn& rhs, int 
nan_direction_hint,
+                          int direction, std::vector<uint8>& cmp_res,
+                          uint8* __restrict filter) const override;
+
     UInt32 get_scale() const { return scale; }
 
     T get_scale_multiplier() const;
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 87fa8da649..d988791ca0 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -487,4 +487,28 @@ void ColumnString::protect() {
     get_offsets().protect();
 }
 
+void ColumnString::compare_internal(size_t rhs_row_id, const IColumn& rhs, int 
nan_direction_hint,
+                                    int direction, std::vector<uint8>& cmp_res,
+                                    uint8* __restrict filter) const {
+    auto sz = this->size();
+    DCHECK(cmp_res.size() == sz);
+    const auto& cmp_base = assert_cast<const 
ColumnString&>(rhs).get_data_at(rhs_row_id);
+    size_t begin = simd::find_zero(cmp_res, 0);
+    while (begin < sz) {
+        size_t end = simd::find_one(cmp_res, begin + 1);
+        for (size_t row_id = begin; row_id < end; row_id++) {
+            auto value_a = get_data_at(row_id);
+            int res = memcmp_small_allow_overflow15(value_a.data, 
value_a.size, cmp_base.data,
+                                                    cmp_base.size);
+            if (res * direction < 0) {
+                filter[row_id] = 1;
+                cmp_res[row_id] = 1;
+            } else if (res * direction > 0) {
+                cmp_res[row_id] = 1;
+            }
+        }
+        begin = simd::find_zero(cmp_res, end + 1);
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 56344a85e9..783d79cf07 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -371,6 +371,10 @@ public:
             offsets[self_row] = offsets[self_row - 1];
         }
     }
+
+    void compare_internal(size_t rhs_row_id, const IColumn& rhs, int 
nan_direction_hint,
+                          int direction, std::vector<uint8>& cmp_res,
+                          uint8* __restrict filter) const override;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index e80b4009c6..2ca35f6948 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -128,6 +128,31 @@ void ColumnVector<T>::sort_column(const ColumnSorter* 
sorter, EqualFlags& flags,
     sorter->template sort_column(static_cast<const Self&>(*this), flags, 
perms, range, last_column);
 }
 
+template <typename T>
+void ColumnVector<T>::compare_internal(size_t rhs_row_id, const IColumn& rhs,
+                                       int nan_direction_hint, int direction,
+                                       std::vector<uint8>& cmp_res,
+                                       uint8* __restrict filter) const {
+    auto sz = this->size();
+    DCHECK(cmp_res.size() == sz);
+    const auto& cmp_base = assert_cast<const 
ColumnVector<T>&>(rhs).get_data()[rhs_row_id];
+    size_t begin = simd::find_zero(cmp_res, 0);
+    while (begin < sz) {
+        size_t end = simd::find_one(cmp_res, begin + 1);
+        for (size_t row_id = begin; row_id < end; row_id++) {
+            auto value_a = get_data()[row_id];
+            int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0);
+            if (res * direction < 0) {
+                filter[row_id] = 1;
+                cmp_res[row_id] = 1;
+            } else if (res * direction > 0) {
+                cmp_res[row_id] = 1;
+            }
+        }
+        begin = simd::find_zero(cmp_res, end + 1);
+    }
+}
+
 template <typename T>
 void ColumnVector<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, 
PrimitiveType type,
                                              const uint8_t* __restrict 
null_data) const {
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 2867d64c65..d22e8f4b69 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -393,6 +393,10 @@ public:
     void sort_column(const ColumnSorter* sorter, EqualFlags& flags, 
IColumn::Permutation& perms,
                      EqualRange& range, bool last_column) const override;
 
+    void compare_internal(size_t rhs_row_id, const IColumn& rhs, int 
nan_direction_hint,
+                          int direction, std::vector<uint8>& cmp_res,
+                          uint8* __restrict filter) const override;
+
 protected:
     Container data;
 };
diff --git a/be/src/vec/common/sort/heap_sorter.cpp 
b/be/src/vec/common/sort/heap_sorter.cpp
new file mode 100644
index 0000000000..795bd66941
--- /dev/null
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/sort/heap_sorter.h"
+
+#include "util/defer_op.h"
+
+namespace doris::vectorized {
+HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t 
offset,
+                       ObjectPool* pool, std::vector<bool>& is_asc_order,
+                       std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc)
+        : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
+          _heap_size(limit + offset),
+          _heap(std::make_unique<SortingHeap>()),
+          _topn_filter_rows(0),
+          _init_sort_descs(false) {}
+
+Status HeapSorter::append_block(Block* block, bool* mem_reuse) {
+    DCHECK(block->rows() > 0);
+    {
+        SCOPED_TIMER(_materialize_timer);
+        if (_vsort_exec_exprs.need_materialize_tuple()) {
+            auto output_tuple_expr_ctxs = 
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
+            std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
+            for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
+                RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block, 
&valid_column_ids[i]));
+            }
+
+            Block new_block;
+            for (auto column_id : valid_column_ids) {
+                new_block.insert(block->get_by_position(column_id));
+            }
+            block->swap(new_block);
+        }
+    }
+    if (!_init_sort_descs) {
+        RETURN_IF_ERROR(_prepare_sort_descs(block));
+    }
+    Block tmp_block = block->clone_empty();
+    tmp_block.swap(*block);
+    HeapSortCursorBlockView block_view_val(std::move(tmp_block), 
_sort_description);
+    SharedHeapSortCursorBlockView* block_view =
+            new SharedHeapSortCursorBlockView(std::move(block_view_val));
+    block_view->ref();
+    Defer defer([&] { block_view->unref(); });
+    size_t num_rows = tmp_block.rows();
+    if (_heap_size == _heap->size()) {
+        {
+            SCOPED_TIMER(_topn_filter_timer);
+            _do_filter(block_view->value(), num_rows);
+        }
+        size_t remain_rows = block_view->value().block.rows();
+        _topn_filter_rows += (num_rows - remain_rows);
+        COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows);
+        for (size_t i = 0; i < remain_rows; ++i) {
+            HeapSortCursorImpl cursor(i, block_view);
+            _heap->replace_top_if_less(std::move(cursor));
+        }
+    } else {
+        size_t free_slots = std::min<size_t>(_heap_size - _heap->size(), 
num_rows);
+
+        size_t i = 0;
+        for (; i < free_slots; ++i) {
+            HeapSortCursorImpl cursor(i, block_view);
+            _heap->push(std::move(cursor));
+        }
+
+        for (; i < num_rows; ++i) {
+            HeapSortCursorImpl cursor(i, block_view);
+            _heap->replace_top_if_less(std::move(cursor));
+        }
+    }
+    return Status::OK();
+}
+
+Status HeapSorter::prepare_for_read() {
+    if (!_heap->empty() && _heap->size() > _offset) {
+        const auto& top = _heap->top();
+        size_t num_columns = top.block()->columns();
+        MutableColumns result_columns = top.block()->clone_empty_columns();
+
+        size_t init_size = std::min((size_t)_limit, _heap->size());
+        result_columns.reserve(init_size);
+
+        DCHECK(_heap->size() <= _heap_size);
+        // Use a vector to reverse elements in heap
+        std::vector<HeapSortCursorImpl> vector_to_reverse;
+        vector_to_reverse.reserve(init_size);
+        size_t capacity = 0;
+        while (!_heap->empty()) {
+            auto current = _heap->top();
+            _heap->pop();
+            vector_to_reverse.emplace_back(std::move(current));
+            capacity++;
+            if (_offset != 0 && _heap->size() == _offset) {
+                break;
+            }
+        }
+        for (int i = capacity - 1; i >= 0; i--) {
+            auto rid = vector_to_reverse[i].row_id();
+            const auto cur_block = vector_to_reverse[i].block();
+            for (size_t j = 0; j < num_columns; ++j) {
+                result_columns[j]->insert_from(*(cur_block->get_columns()[j]), 
rid);
+            }
+        }
+        _return_block = 
vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns));
+    }
+    return Status::OK();
+}
+
+Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
+    _return_block.swap(*block);
+    *eos = true;
+    return Status::OK();
+}
+
+void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t 
num_rows) {
+    const auto& top_cursor = _heap->top();
+    const int cursor_rid = top_cursor.row_id();
+
+    IColumn::Filter filter(num_rows);
+    for (size_t i = 0; i < num_rows; ++i) {
+        filter[i] = 0;
+    }
+
+    std::vector<uint8_t> cmp_res(num_rows, 0);
+
+    for (size_t col_id = 0; col_id < _sort_description.size(); ++col_id) {
+        block_view.sort_columns[col_id]->compare_internal(
+                cursor_rid, *top_cursor.sort_columns()[col_id],
+                _sort_description[col_id].nulls_direction, 
_sort_description[col_id].direction,
+                cmp_res, filter.data());
+    }
+    block_view.filter_block(filter);
+}
+
+Status HeapSorter::_prepare_sort_descs(Block* block) {
+    
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
+    for (int i = 0; i < _sort_description.size(); i++) {
+        const auto& ordering_expr = 
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
+        RETURN_IF_ERROR(ordering_expr->execute(block, 
&_sort_description[i].column_number));
+
+        _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
+        _sort_description[i].nulls_direction =
+                _nulls_first[i] ? -_sort_description[i].direction : 
_sort_description[i].direction;
+    }
+    _init_sort_descs = true;
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/heap_sorter.h 
b/be/src/vec/common/sort/heap_sorter.h
new file mode 100644
index 0000000000..f725d585c2
--- /dev/null
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "vec/common/sort/sorter.h"
+
+namespace doris::vectorized {
+
+class SortingHeap {
+public:
+    const HeapSortCursorImpl& top() { return _queue.top(); }
+
+    size_t size() { return _queue.size(); }
+
+    bool empty() { return _queue.empty(); }
+
+    void pop() { _queue.pop(); }
+
+    void replace_top(HeapSortCursorImpl&& top) {
+        _queue.pop();
+        _queue.push(std::move(top));
+    }
+
+    void push(HeapSortCursorImpl&& cursor) { _queue.push(std::move(cursor)); }
+
+    void replace_top_if_less(HeapSortCursorImpl&& val) {
+        if (val < top()) {
+            replace_top(std::move(val));
+        }
+    }
+
+private:
+    std::priority_queue<HeapSortCursorImpl> _queue;
+};
+
+class HeapSorter final : public Sorter {
+public:
+    HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, 
ObjectPool* pool,
+               std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
+               const RowDescriptor& row_desc);
+
+    ~HeapSorter() override = default;
+
+    void init_profile(RuntimeProfile* runtime_profile) override {
+        _topn_filter_timer = ADD_TIMER(runtime_profile, "TopNFilterTime");
+        _topn_filter_rows_counter = ADD_COUNTER(runtime_profile, 
"TopNFilterRows", TUnit::UNIT);
+        _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime");
+    }
+
+    Status append_block(Block* block, bool* mem_reuse) override;
+
+    Status prepare_for_read() override;
+
+    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+
+    static constexpr size_t HEAP_SORT_THRESHOLD = 1024;
+
+private:
+    void _do_filter(HeapSortCursorBlockView& block_view, size_t num_rows);
+
+    Status _prepare_sort_descs(Block* block);
+
+    size_t _heap_size;
+    std::unique_ptr<SortingHeap> _heap;
+    Block _return_block;
+    int64_t _topn_filter_rows;
+    bool _init_sort_descs;
+
+    RuntimeProfile::Counter* _topn_filter_timer = nullptr;
+    RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr;
+    RuntimeProfile::Counter* _materialize_timer = nullptr;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 86de482333..9b5641075d 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -25,7 +25,7 @@ void MergeSorterState::build_merge_tree(SortDescription& 
sort_description) {
     }
 
     if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(SortCursor(&cursor));
+        for (auto& cursor : cursors) 
priority_queue.push(MergeSortCursor(&cursor));
     }
 }
 
@@ -105,11 +105,10 @@ Status Sorter::partial_sort(Block& block) {
     return Status::OK();
 }
 
-FullSorter::FullSorter(SortDescription& sort_description, VSortExecExprs& 
vsort_exec_exprs,
-                       int limit, int64_t offset, ObjectPool* pool, 
std::vector<bool>& is_asc_order,
+FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t 
offset,
+                       ObjectPool* pool, std::vector<bool>& is_asc_order,
                        std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc)
-        : Sorter(sort_description, vsort_exec_exprs, limit, offset, pool, 
is_asc_order,
-                 nulls_first),
+        : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
           _state(std::unique_ptr<MergeSorterState>(new 
MergeSorterState(row_desc, offset))) {}
 
 Status FullSorter::append_block(Block* block, bool* mem_reuse) {
@@ -160,9 +159,10 @@ Status FullSorter::_do_sort() {
             _state->sorted_blocks.emplace_back(std::move(block));
             _state->num_rows += block.rows();
             _block_priority_queue.emplace(_pool->add(
-                    new SortCursorImpl(_state->sorted_blocks.back(), 
_sort_description)));
+                    new MergeSortCursorImpl(_state->sorted_blocks.back(), 
_sort_description)));
         } else {
-            SortBlockCursor block_cursor(_pool->add(new SortCursorImpl(block, 
_sort_description)));
+            MergeSortBlockCursor block_cursor(
+                    _pool->add(new MergeSortCursorImpl(block, 
_sort_description)));
             if (!block_cursor.totally_greater(_block_priority_queue.top())) {
                 _state->sorted_blocks.emplace_back(std::move(block));
                 _block_priority_queue.push(block_cursor);
@@ -176,75 +176,4 @@ Status FullSorter::_do_sort() {
     return Status::OK();
 }
 
-TopNSorter::TopNSorter(SortDescription& sort_description, VSortExecExprs& 
vsort_exec_exprs,
-                       int limit, int64_t offset, ObjectPool* pool, 
std::vector<bool>& is_asc_order,
-                       std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc)
-        : Sorter(sort_description, vsort_exec_exprs, limit, offset, pool, 
is_asc_order,
-                 nulls_first),
-          _state(std::unique_ptr<MergeSorterState>(new 
MergeSorterState(row_desc, offset))) {}
-
-Status TopNSorter::append_block(Block* block, bool* mem_reuse) {
-    DCHECK(block->rows() > 0);
-    RETURN_IF_ERROR(_do_sort(block, mem_reuse));
-    return Status::OK();
-}
-
-Status TopNSorter::prepare_for_read() {
-    _state->build_merge_tree(_sort_description);
-    return Status::OK();
-}
-
-Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
-    if (_state->sorted_blocks.empty()) {
-        *eos = true;
-    } else if (_state->sorted_blocks.size() == 1) {
-        if (_offset != 0) {
-            _state->sorted_blocks[0].skip_num_rows(_offset);
-        }
-        block->swap(_state->sorted_blocks[0]);
-        *eos = true;
-    } else {
-        RETURN_IF_ERROR(_state->merge_sort_read(state, block, eos));
-    }
-    return Status::OK();
-}
-
-Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
-    *mem_reuse = false;
-    RETURN_IF_ERROR(partial_sort(*block));
-    // dispose TOP-N logic
-    if (_limit != -1) {
-        // Here is a little opt to reduce the mem uasge, we build a max heap
-        // to order the block in _block_priority_queue.
-        // if one block totally greater the heap top of _block_priority_queue
-        // we can throw the block data directly.
-        if (_state->num_rows < _limit) {
-            Block sorted_block;
-            sorted_block.swap(*block);
-            _state->sorted_blocks.emplace_back(std::move(sorted_block));
-            _state->num_rows += sorted_block.rows();
-            _block_priority_queue.emplace(_pool->add(
-                    new SortCursorImpl(_state->sorted_blocks.back(), 
_sort_description)));
-        } else {
-            Block sorted_block;
-            sorted_block.swap(*block);
-            SortBlockCursor block_cursor(
-                    _pool->add(new SortCursorImpl(sorted_block, 
_sort_description)));
-            if (!block_cursor.totally_greater(_block_priority_queue.top())) {
-                _state->sorted_blocks.emplace_back(std::move(sorted_block));
-                _block_priority_queue.push(block_cursor);
-            } else {
-                *mem_reuse = true;
-                block->clear_column_data();
-            }
-        }
-    } else {
-        Block sorted_block;
-        sorted_block.swap(*block);
-        // dispose normal sort logic
-        _state->sorted_blocks.emplace_back(std::move(sorted_block));
-    }
-    return Status::OK();
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 9d881538a0..2e56f8012a 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -47,8 +47,8 @@ public:
 
     Status merge_sort_read(doris::RuntimeState* state, 
doris::vectorized::Block* block, bool* eos);
 
-    std::priority_queue<SortCursor> priority_queue;
-    std::vector<SortCursorImpl> cursors;
+    std::priority_queue<MergeSortCursor> priority_queue;
+    std::vector<MergeSortCursorImpl> cursors;
     std::unique_ptr<MutableBlock> unsorted_block;
     std::vector<Block> sorted_blocks;
     uint64_t num_rows = 0;
@@ -60,11 +60,9 @@ private:
 
 class Sorter {
 public:
-    Sorter(SortDescription& sort_description, VSortExecExprs& 
vsort_exec_exprs, int limit,
-           int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order,
-           std::vector<bool>& nulls_first)
-            : _sort_description(sort_description),
-              _vsort_exec_exprs(vsort_exec_exprs),
+    Sorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, 
ObjectPool* pool,
+           std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first)
+            : _vsort_exec_exprs(vsort_exec_exprs),
               _limit(limit),
               _offset(offset),
               _pool(pool),
@@ -73,10 +71,10 @@ public:
 
     virtual ~Sorter() = default;
 
-    void init_profile(RuntimeProfile* runtime_profile) {
+    virtual void init_profile(RuntimeProfile* runtime_profile) {
         _partial_sort_timer = ADD_TIMER(runtime_profile, "PartialSortTime");
         _merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime");
-    }
+    };
 
     virtual Status append_block(Block* block, bool* mem_reuse) = 0;
 
@@ -87,7 +85,7 @@ public:
 protected:
     Status partial_sort(Block& block);
 
-    SortDescription& _sort_description;
+    SortDescription _sort_description;
     VSortExecExprs& _vsort_exec_exprs;
     int _limit;
     int64_t _offset;
@@ -95,16 +93,17 @@ protected:
     std::vector<bool>& _is_asc_order;
     std::vector<bool>& _nulls_first;
 
-    std::priority_queue<SortBlockCursor> _block_priority_queue;
     RuntimeProfile::Counter* _partial_sort_timer = nullptr;
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
+
+    std::priority_queue<MergeSortBlockCursor> _block_priority_queue;
 };
 
 class FullSorter final : public Sorter {
 public:
-    FullSorter(SortDescription& sort_description, VSortExecExprs& 
vsort_exec_exprs, int limit,
-               int64_t offset, ObjectPool* pool, std::vector<bool>& 
is_asc_order,
-               std::vector<bool>& nulls_first, const RowDescriptor& row_desc);
+    FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, 
ObjectPool* pool,
+               std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
+               const RowDescriptor& row_desc);
 
     ~FullSorter() override = default;
 
@@ -128,24 +127,4 @@ private:
     static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20;
 };
 
-class TopNSorter final : public Sorter {
-public:
-    TopNSorter(SortDescription& sort_description, VSortExecExprs& 
vsort_exec_exprs, int limit,
-               int64_t offset, ObjectPool* pool, std::vector<bool>& 
is_asc_order,
-               std::vector<bool>& nulls_first, const RowDescriptor& row_desc);
-
-    ~TopNSorter() override = default;
-
-    Status append_block(Block* block, bool* mem_reuse) override;
-
-    Status prepare_for_read() override;
-
-    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-
-private:
-    Status _do_sort(Block* block, bool* mem_reuse);
-
-    std::unique_ptr<MergeSorterState> _state;
-};
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/topn_sorter.cpp 
b/be/src/vec/common/sort/topn_sorter.cpp
new file mode 100644
index 0000000000..4ed7af6d04
--- /dev/null
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/sort/topn_sorter.h"
+
+namespace doris::vectorized {
+
+TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t 
offset,
+                       ObjectPool* pool, std::vector<bool>& is_asc_order,
+                       std::vector<bool>& nulls_first, const RowDescriptor& 
row_desc)
+        : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
+          _state(std::unique_ptr<MergeSorterState>(new 
MergeSorterState(row_desc, offset))) {}
+
+Status TopNSorter::append_block(Block* block, bool* mem_reuse) {
+    DCHECK(block->rows() > 0);
+    RETURN_IF_ERROR(_do_sort(block, mem_reuse));
+    return Status::OK();
+}
+
+Status TopNSorter::prepare_for_read() {
+    _state->build_merge_tree(_sort_description);
+    return Status::OK();
+}
+
+Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
+    if (_state->sorted_blocks.empty()) {
+        *eos = true;
+    } else if (_state->sorted_blocks.size() == 1) {
+        if (_offset != 0) {
+            _state->sorted_blocks[0].skip_num_rows(_offset);
+        }
+        block->swap(_state->sorted_blocks[0]);
+        *eos = true;
+    } else {
+        RETURN_IF_ERROR(_state->merge_sort_read(state, block, eos));
+    }
+    return Status::OK();
+}
+
+Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
+    *mem_reuse = false;
+    RETURN_IF_ERROR(partial_sort(*block));
+    // dispose TOP-N logic
+    if (_limit != -1) {
+        // Here is a little opt to reduce the mem uasge, we build a max heap
+        // to order the block in _block_priority_queue.
+        // if one block totally greater the heap top of _block_priority_queue
+        // we can throw the block data directly.
+        if (_state->num_rows < _limit) {
+            Block sorted_block;
+            sorted_block.swap(*block);
+            _state->sorted_blocks.emplace_back(std::move(sorted_block));
+            _state->num_rows += sorted_block.rows();
+            _block_priority_queue.emplace(_pool->add(
+                    new MergeSortCursorImpl(_state->sorted_blocks.back(), 
_sort_description)));
+        } else {
+            Block sorted_block;
+            sorted_block.swap(*block);
+            MergeSortBlockCursor block_cursor(
+                    _pool->add(new MergeSortCursorImpl(sorted_block, 
_sort_description)));
+            if (!block_cursor.totally_greater(_block_priority_queue.top())) {
+                _state->sorted_blocks.emplace_back(std::move(sorted_block));
+                _block_priority_queue.push(block_cursor);
+            } else {
+                *mem_reuse = true;
+                block->clear_column_data();
+            }
+        }
+    } else {
+        Block sorted_block;
+        sorted_block.swap(*block);
+        // dispose normal sort logic
+        _state->sorted_blocks.emplace_back(std::move(sorted_block));
+    }
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/topn_sorter.h 
b/be/src/vec/common/sort/topn_sorter.h
new file mode 100644
index 0000000000..675442f5a1
--- /dev/null
+++ b/be/src/vec/common/sort/topn_sorter.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "vec/common/sort/sorter.h"
+
+namespace doris::vectorized {
+
+class TopNSorter final : public Sorter {
+public:
+    TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, 
ObjectPool* pool,
+               std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
+               const RowDescriptor& row_desc);
+
+    ~TopNSorter() override = default;
+
+    Status append_block(Block* block, bool* mem_reuse) override;
+
+    Status prepare_for_read() override;
+
+    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+
+    static constexpr size_t TOPN_SORT_THRESHOLD = 256;
+
+private:
+    Status _do_sort(Block* block, bool* mem_reuse);
+
+    std::unique_ptr<MergeSorterState> _state;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/core/sort_block.h b/be/src/vec/core/sort_block.h
index 2a2babf46f..cc791881c1 100644
--- a/be/src/vec/core/sort_block.h
+++ b/be/src/vec/core/sort_block.h
@@ -77,7 +77,7 @@ struct EqualRangeIterator {
         // should continue to sort this row according to current column. Using 
the first non-zero
         // value and first zero value after first non-zero value as two 
bounds, we can get an equal range here
         if (!(_cur_range_begin == 0) || !(_flags[_cur_range_begin] == 1)) {
-            _cur_range_begin = simd::find_nonzero(_flags, _cur_range_begin + 
1);
+            _cur_range_begin = simd::find_one(_flags, _cur_range_begin + 1);
             if (_cur_range_begin >= _end) {
                 return false;
             }
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index d6c8613bb1..b13316fe40 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -21,22 +21,133 @@
 #pragma once
 
 #include "vec/columns/column.h"
-#include "vec/columns/column_string.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/typeid_cast.h"
 #include "vec/core/block.h"
-#include "vec/core/column_numbers.h"
 #include "vec/core/sort_description.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vec/runtime/vdata_stream_recvr.h"
 
 namespace doris::vectorized {
 
+struct HeapSortCursorBlockView {
+public:
+    Block block;
+    ColumnRawPtrs sort_columns;
+    SortDescription& desc;
+
+    HeapSortCursorBlockView(Block&& cur_block, SortDescription& sort_desc)
+            : block(cur_block), desc(sort_desc) {
+        _reset();
+    }
+
+    void filter_block(IColumn::Filter& filter) {
+        Block::filter_block_internal(&block, filter, block.columns());
+        _reset();
+    }
+
+private:
+    void _reset() {
+        sort_columns.clear();
+        auto columns = block.get_columns();
+        for (size_t j = 0, size = desc.size(); j < size; ++j) {
+            auto& column_desc = desc[j];
+            size_t column_number = !column_desc.column_name.empty()
+                                           ? 
block.get_position_by_name(column_desc.column_name)
+                                           : column_desc.column_number;
+            sort_columns.push_back(columns[column_number].get());
+        }
+    }
+};
+
+// Use `SharedHeapSortCursorBlockView` for `HeapSortCursorBlockView` instead 
of shared_ptr because there will be no
+// concurrent operation for `HeapSortCursorBlockView` and we don't need the 
lock inside shared_ptr
+class SharedHeapSortCursorBlockView {
+public:
+    SharedHeapSortCursorBlockView(HeapSortCursorBlockView&& reference)
+            : _ref_count(0), _reference(std::move(reference)) {}
+    SharedHeapSortCursorBlockView(const SharedHeapSortCursorBlockView&) = 
delete;
+    void unref() noexcept {
+        DCHECK_GT(_ref_count, 0);
+        _ref_count--;
+        if (_ref_count == 0) {
+            delete this;
+        }
+    }
+    void ref() noexcept { _ref_count++; }
+
+    HeapSortCursorBlockView& value() { return _reference; }
+
+private:
+    ~SharedHeapSortCursorBlockView() noexcept = default;
+    int _ref_count;
+    HeapSortCursorBlockView _reference;
+};
+
+struct HeapSortCursorImpl {
+public:
+    HeapSortCursorImpl(int row_id, SharedHeapSortCursorBlockView* block_view)
+            : _row_id(row_id), _block_view(block_view) {
+        block_view->ref();
+    }
+
+    HeapSortCursorImpl(const HeapSortCursorImpl& other) {
+        _row_id = other._row_id;
+        _block_view = other._block_view;
+        _block_view->ref();
+    }
+
+    HeapSortCursorImpl(HeapSortCursorImpl&& other) {
+        _row_id = other._row_id;
+        _block_view = other._block_view;
+        other._block_view = nullptr;
+    }
+
+    HeapSortCursorImpl& operator=(HeapSortCursorImpl&& other) {
+        std::swap(_row_id, other._row_id);
+        std::swap(_block_view, other._block_view);
+        return *this;
+    }
+
+    ~HeapSortCursorImpl() {
+        if (_block_view) {
+            _block_view->unref();
+        }
+    };
+
+    const size_t row_id() const { return _row_id; }
+
+    const ColumnRawPtrs& sort_columns() const { return 
_block_view->value().sort_columns; }
+
+    const Block* block() const { return &_block_view->value().block; }
+
+    const SortDescription& sort_desc() const { return 
_block_view->value().desc; }
+
+    bool operator<(const HeapSortCursorImpl& rhs) const {
+        for (size_t i = 0; i < sort_desc().size(); ++i) {
+            int direction = sort_desc()[i].direction;
+            int nulls_direction = sort_desc()[i].nulls_direction;
+            int res = direction * sort_columns()[i]->compare_at(row_id(), 
rhs.row_id(),
+                                                                
*(rhs.sort_columns()[i]),
+                                                                
nulls_direction);
+            // ASC: direction == 1. If bigger, res > 0. So we return true.
+            if (res < 0) {
+                return true;
+            }
+            if (res > 0) {
+                return false;
+            }
+        }
+        return false;
+    }
+
+private:
+    size_t _row_id;
+    SharedHeapSortCursorBlockView* _block_view;
+};
+
 /** Cursor allows to compare rows in different blocks (and parts).
   * Cursor moves inside single block.
   * It is used in priority queue.
   */
-struct SortCursorImpl {
+struct MergeSortCursorImpl {
     ColumnRawPtrs all_columns;
     ColumnRawPtrs sort_columns;
     SortDescription desc;
@@ -44,20 +155,21 @@ struct SortCursorImpl {
     size_t pos = 0;
     size_t rows = 0;
 
-    SortCursorImpl() = default;
-    virtual ~SortCursorImpl() = default;
+    MergeSortCursorImpl() = default;
+    virtual ~MergeSortCursorImpl() = default;
 
-    SortCursorImpl(const Block& block, const SortDescription& desc_)
+    MergeSortCursorImpl(const Block& block, const SortDescription& desc_)
             : desc(desc_), sort_columns_size(desc.size()) {
         reset(block);
     }
 
-    SortCursorImpl(const Columns& columns, const SortDescription& desc_)
+    MergeSortCursorImpl(const Columns& columns, const SortDescription& desc_)
             : desc(desc_), sort_columns_size(desc.size()) {
         for (auto& column_desc : desc) {
             if (!column_desc.column_name.empty()) {
-                LOG(FATAL) << "SortDesctiption should contain column position 
if SortCursor was "
-                              "used without header.";
+                LOG(FATAL)
+                        << "SortDesctiption should contain column position if 
MergeSortCursor was "
+                           "used without header.";
             }
         }
         reset(columns, {});
@@ -101,7 +213,7 @@ struct SortCursorImpl {
 
 using BlockSupplier = std::function<Status(Block**)>;
 
-struct ReceiveQueueSortCursorImpl : public SortCursorImpl {
+struct ReceiveQueueSortCursorImpl : public MergeSortCursorImpl {
     ReceiveQueueSortCursorImpl(const BlockSupplier& block_supplier,
                                const std::vector<VExprContext*>& ordering_expr,
                                const std::vector<bool>& is_asc_order,
@@ -123,7 +235,7 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl {
             for (int i = 0; i < desc.size(); ++i) {
                 _ordering_expr[i]->execute(_block_ptr, &desc[i].column_number);
             }
-            SortCursorImpl::reset(*_block_ptr);
+            MergeSortCursorImpl::reset(*_block_ptr);
             return true;
         }
         _block_ptr = nullptr;
@@ -150,14 +262,14 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl 
{
 };
 
 /// For easy copying.
-struct SortCursor {
-    SortCursorImpl* impl;
+struct MergeSortCursor {
+    MergeSortCursorImpl* impl;
 
-    SortCursor(SortCursorImpl* impl_) : impl(impl_) {}
-    SortCursorImpl* operator->() const { return impl; }
+    MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+    MergeSortCursorImpl* operator->() const { return impl; }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
-    int8_t greater_at(const SortCursor& rhs, size_t lhs_pos, size_t rhs_pos) 
const {
+    int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t 
rhs_pos) const {
         for (size_t i = 0; i < impl->sort_columns_size; ++i) {
             int direction = impl->desc[i].direction;
             int nulls_direction = impl->desc[i].nulls_direction;
@@ -175,7 +287,7 @@ struct SortCursor {
     }
 
     /// Checks that all rows in the current block of this cursor are less than 
or equal to all the rows of the current block of another cursor.
-    bool totally_less(const SortCursor& rhs) const {
+    bool totally_less(const MergeSortCursor& rhs) const {
         if (impl->rows == 0 || rhs.impl->rows == 0) {
             return false;
         }
@@ -184,23 +296,23 @@ struct SortCursor {
         return greater_at(rhs, impl->rows - 1, 0) == -1;
     }
 
-    bool greater(const SortCursor& rhs) const {
+    bool greater(const MergeSortCursor& rhs) const {
         return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0;
     }
 
     /// Inverted so that the priority queue elements are removed in ascending 
order.
-    bool operator<(const SortCursor& rhs) const { return greater(rhs); }
+    bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); }
 };
 
 /// For easy copying.
-struct SortBlockCursor {
-    SortCursorImpl* impl;
+struct MergeSortBlockCursor {
+    MergeSortCursorImpl* impl;
 
-    SortBlockCursor(SortCursorImpl* impl_) : impl(impl_) {}
-    SortCursorImpl* operator->() const { return impl; }
+    MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+    MergeSortCursorImpl* operator->() const { return impl; }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
-    int8_t less_at(const SortBlockCursor& rhs, int rows) const {
+    int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
         for (size_t i = 0; i < impl->sort_columns_size; ++i) {
             int direction = impl->desc[i].direction;
             int nulls_direction = impl->desc[i].nulls_direction;
@@ -218,7 +330,7 @@ struct SortBlockCursor {
     }
 
     /// Checks that all rows in the current block of this cursor are less than 
or equal to all the rows of the current block of another cursor.
-    bool totally_greater(const SortBlockCursor& rhs) const {
+    bool totally_greater(const MergeSortBlockCursor& rhs) const {
         if (impl->rows == 0 || rhs.impl->rows == 0) {
             return false;
         }
@@ -228,7 +340,9 @@ struct SortBlockCursor {
     }
 
     /// Inverted so that the priority queue elements are removed in ascending 
order.
-    bool operator<(const SortBlockCursor& rhs) const { return less_at(rhs, 
impl->rows - 1) == 1; }
+    bool operator<(const MergeSortBlockCursor& rhs) const {
+        return less_at(rhs, impl->rows - 1) == 1;
+    }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index c090efa961..54323c561c 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -17,10 +17,13 @@
 
 #include "vec/exec/vsort_node.h"
 
+#include "common/config.h"
 #include "exec/sort_exec_exprs.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
+#include "vec/common/sort/heap_sorter.h"
+#include "vec/common/sort/topn_sorter.h"
 #include "vec/core/sort_block.h"
 #include "vec/utils/util.hpp"
 
@@ -35,24 +38,23 @@ Status VSortNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
     _is_asc_order = tnode.sort_node.sort_info.is_asc_order;
     _nulls_first = tnode.sort_node.sort_info.nulls_first;
-    bool has_string_slot = false;
-    for (const auto& tuple_desc : child(0)->row_desc().tuple_descriptors()) {
-        for (const auto& slot : tuple_desc->slots()) {
-            if (slot->type().is_string_type()) {
-                has_string_slot = true;
-                break;
-            }
-        }
-        if (has_string_slot) {
-            break;
-        }
-    }
-    if (has_string_slot && _limit > 0 && _limit < 
ACCUMULATED_PARTIAL_SORT_THRESHOLD) {
-        _sorter.reset(new TopNSorter(_sort_description, _vsort_exec_exprs, 
_limit, _offset, _pool,
-                                     _is_asc_order, _nulls_first, 
child(0)->row_desc()));
+    const auto& row_desc = child(0)->row_desc();
+    // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap 
sort in priority.
+    // To do heap sorting, each income block will be filtered by heap-top row. 
There will be some
+    // `memcpy` operations. To ensure heap sort will not incur performance 
fallback, we should
+    // exclude cases which incoming blocks has string column which is 
sensitive to operations like
+    // `filter` and `memcpy`
+    if (_limit > 0 && _limit + _offset < HeapSorter::HEAP_SORT_THRESHOLD &&
+        !row_desc.has_varlen_slots()) {
+        _sorter.reset(new HeapSorter(_vsort_exec_exprs, _limit, _offset, 
_pool, _is_asc_order,
+                                     _nulls_first, row_desc));
+    } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit > 0 &&
+               _limit + _offset < TopNSorter::TOPN_SORT_THRESHOLD) {
+        _sorter.reset(new TopNSorter(_vsort_exec_exprs, _limit, _offset, 
_pool, _is_asc_order,
+                                     _nulls_first, row_desc));
     } else {
-        _sorter.reset(new FullSorter(_sort_description, _vsort_exec_exprs, 
_limit, _offset, _pool,
-                                     _is_asc_order, _nulls_first, 
child(0)->row_desc()));
+        _sorter.reset(new FullSorter(_vsort_exec_exprs, _limit, _offset, 
_pool, _is_asc_order,
+                                     _nulls_first, row_desc));
     }
 
     _sorter->init_profile(_runtime_profile.get());
diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h
index e7cb12b437..ff640b6851 100644
--- a/be/src/vec/exec/vsort_node.h
+++ b/be/src/vec/exec/vsort_node.h
@@ -63,8 +63,6 @@ private:
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
 
-    SortDescription _sort_description;
-
     std::unique_ptr<Sorter> _sorter;
 
     static constexpr size_t ACCUMULATED_PARTIAL_SORT_THRESHOLD = 256;
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index f7bd965299..937d743780 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -50,7 +50,7 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>& 
input_runs, bool p
     }
 
     for (auto& _cursor : _cursors) {
-        if (!_cursor._is_eof) _priority_queue.push(SortCursor(&_cursor));
+        if (!_cursor._is_eof) _priority_queue.push(MergeSortCursor(&_cursor));
     }
 
     for (const auto& cursor : _cursors) {
@@ -142,7 +142,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
     return Status::OK();
 }
 
-void VSortedRunMerger::next_heap(SortCursor& current) {
+void VSortedRunMerger::next_heap(MergeSortCursor& current) {
     if (!current->isLast()) {
         current->next();
         _priority_queue.push(current);
@@ -151,7 +151,7 @@ void VSortedRunMerger::next_heap(SortCursor& current) {
     }
 }
 
-inline bool VSortedRunMerger::has_next_block(doris::vectorized::SortCursor& 
current) {
+inline bool 
VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) {
     ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
     return current->has_next_block();
 }
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 05c463fec3..e374f2cdc0 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -72,7 +72,7 @@ protected:
     size_t _offset = 0;
 
     std::vector<ReceiveQueueSortCursorImpl> _cursors;
-    std::priority_queue<SortCursor> _priority_queue;
+    std::priority_queue<MergeSortCursor> _priority_queue;
 
     Block _empty_block;
 
@@ -83,8 +83,8 @@ protected:
     RuntimeProfile::Counter* _get_next_block_timer;
 
 private:
-    void next_heap(SortCursor& current);
-    bool has_next_block(SortCursor& current);
+    void next_heap(MergeSortCursor& current);
+    bool has_next_block(MergeSortCursor& current);
 };
 
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to