This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 53ae24912f [vectorized](feature) support partition sort node (#19708)
53ae24912f is described below

commit 53ae24912f18a73cea4d6d7fd2bef3586cb57d60
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Thu May 25 11:22:02 2023 +0800

    [vectorized](feature) support partition sort node (#19708)
---
 be/src/exec/exec_node.cpp                          |   5 +
 .../pipeline/exec/partition_sort_sink_operator.h   |  54 +++
 .../pipeline/exec/partition_sort_source_operator.h |  56 +++
 be/src/pipeline/pipeline_fragment_context.cpp      |  16 +
 be/src/vec/CMakeLists.txt                          |   2 +
 be/src/vec/common/columns_hashing.h                |  32 ++
 be/src/vec/common/sort/partition_sorter.cpp        | 203 +++++++++
 be/src/vec/common/sort/partition_sorter.h          | 108 +++++
 be/src/vec/common/sort/sorter.h                    |  17 +-
 be/src/vec/exec/vpartition_sort_node.cpp           | 454 +++++++++++++++++++++
 be/src/vec/exec/vpartition_sort_node.h             | 386 ++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                     |  15 +
 12 files changed, 1342 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 62a63afd98..b74fbd23b1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -62,6 +62,7 @@
 #include "vec/exec/vempty_set_node.h"
 #include "vec/exec/vexchange_node.h"
 #include "vec/exec/vmysql_scan_node.h" // IWYU pragma: keep
+#include "vec/exec/vpartition_sort_node.h"
 #include "vec/exec/vrepeat_node.h"
 #include "vec/exec/vschema_scan_node.h"
 #include "vec/exec/vselect_node.h"
@@ -318,6 +319,7 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
     case TPlanNodeType::FILE_SCAN_NODE:
     case TPlanNodeType::JDBC_SCAN_NODE:
     case TPlanNodeType::META_SCAN_NODE:
+    case TPlanNodeType::PARTITION_SORT_NODE:
         break;
     default: {
         const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
@@ -438,6 +440,9 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
         *node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, 
tnode, descs));
         return Status::OK();
 
+    case TPlanNodeType::PARTITION_SORT_NODE:
+        *node = pool->add(new vectorized::VPartitionSortNode(pool, tnode, 
descs));
+        return Status::OK();
     default:
         std::map<int, const char*>::const_iterator i =
                 _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
new file mode 100644
index 0000000000..ddcbebbb9d
--- /dev/null
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "operator.h"
+#include "vec/exec/vpartition_sort_node.h"
+
+namespace doris {
+class ExecNode;
+
+namespace pipeline {
+
+class PartitionSortSinkOperatorBuilder final
+        : public OperatorBuilder<vectorized::VPartitionSortNode> {
+public:
+    PartitionSortSinkOperatorBuilder(int32_t id, ExecNode* sort_node)
+            : OperatorBuilder(id, "PartitionSortSinkOperator", sort_node) {}
+
+    bool is_sink() const override { return true; }
+
+    OperatorPtr build_operator() override;
+};
+
+class PartitionSortSinkOperator final : public 
StreamingOperator<PartitionSortSinkOperatorBuilder> {
+public:
+    PartitionSortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* 
sort_node)
+            : StreamingOperator(operator_builder, sort_node) {};
+
+    bool can_write() override { return true; }
+};
+
+OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
+    return std::make_shared<PartitionSortSinkOperator>(this, _node);
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
new file mode 100644
index 0000000000..bd55c42e4b
--- /dev/null
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/exec/vpartition_sort_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+
+class PartitionSortSourceOperatorBuilder final
+        : public OperatorBuilder<vectorized::VPartitionSortNode> {
+public:
+    PartitionSortSourceOperatorBuilder(int32_t id, ExecNode* sort_node)
+            : OperatorBuilder(id, "PartitionSortSourceOperator", sort_node) {}
+
+    bool is_source() const override { return true; }
+
+    OperatorPtr build_operator() override;
+};
+
+class PartitionSortSourceOperator final
+        : public SourceOperator<PartitionSortSourceOperatorBuilder> {
+public:
+    PartitionSortSourceOperator(OperatorBuilderBase* operator_builder, 
ExecNode* sort_node)
+            : SourceOperator(operator_builder, sort_node) {}
+    Status open(RuntimeState*) override { return Status::OK(); }
+};
+
+OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
+    return std::make_shared<PartitionSortSourceOperator>(this, _node);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 923558b333..eea8d34b08 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -61,6 +61,8 @@
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_table_sink_operator.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/partition_sort_sink_operator.h"
+#include "pipeline/exec/partition_sort_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -532,6 +534,20 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         RETURN_IF_ERROR(cur_pipe->add_operator(sort_source));
         break;
     }
+    case TPlanNodeType::PARTITION_SORT_NODE: {
+        auto new_pipeline = add_pipeline();
+        RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
+
+        OperatorBuilderPtr partition_sort_sink = 
std::make_shared<PartitionSortSinkOperatorBuilder>(
+                next_operator_builder_id(), node);
+        RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
+
+        OperatorBuilderPtr partition_sort_source =
+                
std::make_shared<PartitionSortSourceOperatorBuilder>(next_operator_builder_id(),
+                                                                     node);
+        RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source));
+        break;
+    }
     case TPlanNodeType::ANALYTIC_EVAL_NODE: {
         auto new_pipeline = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 2d5bc7f177..97e77cd4d0 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -71,6 +71,7 @@ set(VEC_FILES
   common/sort/sorter.cpp
   common/sort/topn_sorter.cpp
   common/sort/vsort_exec_exprs.cpp
+  common/sort/partition_sorter.cpp
   common/string_utils/string_utils.cpp
   common/hex.cpp
   common/allocator.cpp
@@ -136,6 +137,7 @@ set(VEC_FILES
   exec/vrepeat_node.cpp
   exec/vtable_function_node.cpp
   exec/vjdbc_connector.cpp
+  exec/vpartition_sort_node.cpp
   exec/join/vhash_join_node.cpp
   exec/join/vjoin_node_base.cpp
   exec/join/vnested_loop_join_node.cpp
diff --git a/be/src/vec/common/columns_hashing.h 
b/be/src/vec/common/columns_hashing.h
index 06f0773fc8..64fe7d87b5 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -258,6 +258,38 @@ struct HashMethodSingleLowNullableColumn : public 
SingleColumnMethod {
             return EmplaceResult(inserted);
     }
 
+    template <typename Data>
+    ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, 
size_t row,
+                                            Arena& pool) {
+        if (key_column->is_null_at(row)) {
+            bool has_null_key = data.has_null_key_data();
+            data.has_null_key_data() = true;
+
+            if constexpr (has_mapped) {
+                return EmplaceResult(data.get_null_key_data(), 
data.get_null_key_data(),
+                                     !has_null_key);
+            } else {
+                return EmplaceResult(!has_null_key);
+            }
+        }
+
+        auto key_holder = Base::get_key_holder(row, pool);
+
+        bool inserted = false;
+        typename Data::LookupResult it;
+        data.emplace(key_holder, it, hash_value, inserted);
+
+        if constexpr (has_mapped) {
+            auto& mapped = *lookup_result_get_mapped(it);
+            if (inserted) {
+                new (&mapped) Mapped();
+            }
+            return EmplaceResult(mapped, mapped, inserted);
+        } else {
+            return EmplaceResult(inserted);
+        }
+    }
+
     template <typename Data, typename Func, typename CreatorForNull>
     ALWAYS_INLINE typename std::enable_if_t<has_mapped, Mapped>& 
lazy_emplace_key(
             Data& data, size_t row, Arena& pool, Func&& f, CreatorForNull&& 
null_creator) {
diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
new file mode 100644
index 0000000000..ca29d62eb4
--- /dev/null
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/sort/partition_sorter.h"
+
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <queue>
+
+#include "common/object_pool.h"
+#include "vec/core/block.h"
+#include "vec/core/sort_cursor.h"
+#include "vec/functions/function_binary_arithmetic.h"
+#include "vec/utils/util.hpp"
+
+namespace doris {
+class RowDescriptor;
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class VSortExecExprs;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, 
int64_t offset,
+                                 ObjectPool* pool, std::vector<bool>& 
is_asc_order,
+                                 std::vector<bool>& nulls_first, const 
RowDescriptor& row_desc,
+                                 RuntimeState* state, RuntimeProfile* profile,
+                                 bool has_global_limit, int 
partition_inner_limit,
+                                 TopNAlgorithm::type top_n_algorithm, 
SortCursorCmp* previous_row)
+        : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
+          _state(MergeSorterState::create_unique(row_desc, offset, limit, 
state, profile)),
+          _row_desc(row_desc),
+          _has_global_limit(has_global_limit),
+          _partition_inner_limit(partition_inner_limit),
+          _top_n_algorithm(top_n_algorithm),
+          _previous_row(previous_row) {}
+
+Status PartitionSorter::append_block(Block* input_block) {
+    Block sorted_block = 
VectorizedUtils::create_empty_columnswithtypename(_row_desc);
+    DCHECK(input_block->columns() == sorted_block.columns());
+    RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
+    RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+    return Status::OK();
+}
+
+Status PartitionSorter::prepare_for_read() {
+    auto& cursors = _state->get_cursors();
+    auto& blocks = _state->get_sorted_block();
+    auto& priority_queue = _state->get_priority_queue();
+    for (const auto& block : blocks) {
+        cursors.emplace_back(block, _sort_description);
+    }
+    for (auto& cursor : cursors) {
+        priority_queue.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) 
{
+    if (_state->get_sorted_block().empty()) {
+        *eos = true;
+    } else {
+        if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
+            auto& sorted_block = _state->get_sorted_block()[0];
+            block->swap(sorted_block);
+            block->set_num_rows(_partition_inner_limit);
+            *eos = true;
+        } else {
+            RETURN_IF_ERROR(partition_sort_read(block, eos, 
state->batch_size()));
+        }
+    }
+    return Status::OK();
+}
+
+Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, 
int batch_size) {
+    const auto& sorted_block = _state->get_sorted_block()[0];
+    size_t num_columns = sorted_block.columns();
+    bool mem_reuse = output_block->mem_reuse();
+    MutableColumns merged_columns =
+            mem_reuse ? output_block->mutate_columns() : 
sorted_block.clone_empty_columns();
+
+    size_t current_output_rows = 0;
+    auto& priority_queue = _state->get_priority_queue();
+
+    bool get_enough_data = false;
+    bool first_compare_row = false;
+    while (!priority_queue.empty()) {
+        auto current = priority_queue.top();
+        priority_queue.pop();
+        if (UNLIKELY(_previous_row->impl == nullptr)) {
+            first_compare_row = true;
+            *_previous_row = current;
+        }
+
+        switch (_top_n_algorithm) {
+        case TopNAlgorithm::ROW_NUMBER: {
+            //1 row_number no need to check distinct, just output 
partition_inner_limit row
+            if ((current_output_rows + _output_total_rows) < 
_partition_inner_limit) {
+                for (size_t i = 0; i < num_columns; ++i) {
+                    merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+                }
+            } else {
+                //rows has get enough
+                get_enough_data = true;
+            }
+            current_output_rows++;
+            break;
+        }
+        case TopNAlgorithm::DENSE_RANK: {
+            //3 dense_rank() maybe need distinct rows of partition_inner_limit
+            if ((current_output_rows + _output_total_rows) < 
_partition_inner_limit) {
+                for (size_t i = 0; i < num_columns; ++i) {
+                    merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+                }
+            } else {
+                get_enough_data = true;
+            }
+            if (_has_global_limit) {
+                current_output_rows++;
+            } else {
+                //when it's first comes, the rows are same no need compare
+                if (first_compare_row) {
+                    current_output_rows++;
+                    first_compare_row = false;
+                } else {
+                    // not the first comes, so need compare those, when is 
distinct row
+                    // so could current_output_rows++
+                    bool cmp_res = _previous_row->compare_two_rows(current);
+                    if (cmp_res == false) { // distinct row
+                        current_output_rows++;
+                        *_previous_row = current;
+                    }
+                }
+            }
+            break;
+        }
+        case TopNAlgorithm::RANK: {
+            if (_has_global_limit &&
+                (current_output_rows + _output_total_rows) >= 
_partition_inner_limit) {
+                get_enough_data = true;
+                break;
+            }
+            bool cmp_res = _previous_row->compare_two_rows(current);
+            //get a distinct row
+            if (cmp_res == false) {
+                //here must be check distinct of two rows, and then check nums 
of row
+                if ((current_output_rows + _output_total_rows) >= 
_partition_inner_limit) {
+                    get_enough_data = true;
+                    break;
+                }
+                *_previous_row = current;
+            }
+            for (size_t i = 0; i < num_columns; ++i) {
+                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
+            }
+            current_output_rows++;
+            break;
+        }
+        default:
+            break;
+        }
+
+        if (!current->isLast()) {
+            current->next();
+            priority_queue.push(current);
+        }
+
+        if (current_output_rows == batch_size || get_enough_data == true) {
+            break;
+        }
+    }
+
+    if (!mem_reuse) {
+        Block merge_block = 
sorted_block.clone_with_columns(std::move(merged_columns));
+        merge_block.swap(*output_block);
+    }
+    _output_total_rows += output_block->rows();
+    if (current_output_rows == 0 || get_enough_data == true) {
+        *eos = true;
+    }
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/partition_sorter.h 
b/be/src/vec/common/sort/partition_sorter.h
new file mode 100644
index 0000000000..ff17ac2115
--- /dev/null
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <gen_cpp/PlanNodes_types.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "vec/common/sort/sorter.h"
+
+namespace doris {
+class ObjectPool;
+class RowDescriptor;
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+class VSortExecExprs;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+struct SortCursorCmp {
+public:
+    SortCursorCmp() {
+        impl = nullptr;
+        row = 0;
+    }
+    SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), 
impl(cursor.impl) {}
+
+    void reset() {
+        impl = nullptr;
+        row = 0;
+    }
+    bool compare_two_rows(const MergeSortCursor& rhs) const {
+        for (size_t i = 0; i < impl->sort_columns_size; ++i) {
+            int direction = impl->desc[i].direction;
+            int nulls_direction = impl->desc[i].nulls_direction;
+            int res = direction * impl->sort_columns[i]->compare_at(row, 
rhs.impl->pos,
+                                                                    
*(rhs.impl->sort_columns[i]),
+                                                                    
nulls_direction);
+            if (res != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+    int row = 0;
+    MergeSortCursorImpl* impl;
+};
+
+class PartitionSorter final : public Sorter {
+    ENABLE_FACTORY_CREATOR(PartitionSorter);
+
+public:
+    PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t 
offset, ObjectPool* pool,
+                    std::vector<bool>& is_asc_order, std::vector<bool>& 
nulls_first,
+                    const RowDescriptor& row_desc, RuntimeState* state, 
RuntimeProfile* profile,
+                    bool has_global_limit, int partition_inner_limit,
+                    TopNAlgorithm::type top_n_algorithm, SortCursorCmp* 
previous_row);
+
+    ~PartitionSorter() override = default;
+
+    Status append_block(Block* block) override;
+
+    Status prepare_for_read() override;
+
+    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+
+    size_t data_size() const override { return _state->data_size(); }
+
+    bool is_spilled() const override { return false; }
+
+    Status partition_sort_read(Block* block, bool* eos, int batch_size);
+    int64 get_output_rows() const { return _output_total_rows; }
+
+private:
+    std::unique_ptr<MergeSorterState> _state;
+    const RowDescriptor& _row_desc;
+    int64 _output_total_rows = 0;
+    bool _has_global_limit = false;
+    int _partition_inner_limit = 0;
+    TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER;
+    SortCursorCmp* _previous_row;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 48ea77852a..f5a12db3c0 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -60,13 +60,14 @@ public:
               limit_(limit),
               profile_(profile) {
         external_sort_bytes_threshold_ = 
state->external_sort_bytes_threshold();
+        if (profile != nullptr) {
+            block_spill_profile_ = profile->create_child("BlockSpill", true, 
true);
+            profile->add_child(block_spill_profile_, false, nullptr);
 
-        block_spill_profile_ = profile->create_child("BlockSpill", true, true);
-        profile->add_child(block_spill_profile_, false, nullptr);
-
-        spilled_block_count_ = ADD_COUNTER(block_spill_profile_, "BlockCount", 
TUnit::UNIT);
-        spilled_original_block_size_ =
-                ADD_COUNTER(block_spill_profile_, "BlockBytes", TUnit::BYTES);
+            spilled_block_count_ = ADD_COUNTER(block_spill_profile_, 
"BlockCount", TUnit::UNIT);
+            spilled_original_block_size_ =
+                    ADD_COUNTER(block_spill_profile_, "BlockBytes", 
TUnit::BYTES);
+        }
     }
 
     ~MergeSorterState() = default;
@@ -91,6 +92,10 @@ public:
 
     const Block& last_sorted_block() const { return sorted_blocks_.back(); }
 
+    std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
+    std::priority_queue<MergeSortCursor>& get_priority_queue() { return 
priority_queue_; }
+    std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
+
     std::unique_ptr<Block> unsorted_block_;
 
 private:
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp 
b/be/src/vec/exec/vpartition_sort_node.cpp
new file mode 100644
index 0000000000..8f3f50b9d4
--- /dev/null
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vpartition_sort_node.h"
+
+#include <glog/logging.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include "common/logging.h"
+#include "common/object_pool.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/hash_table/hash_set.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::vectorized {
+// Here is an empirical value.
+static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;
+VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& 
tnode,
+                                       const DescriptorTbl& descs)
+        : ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) {
+    _partitioned_data = std::make_unique<PartitionedHashMapVariants>();
+    _agg_arena_pool = std::make_unique<Arena>();
+    _previous_row = std::make_unique<SortCursorCmp>();
+}
+
+Status VPartitionSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+
+    //order by key
+    if (tnode.partition_sort_node.__isset.sort_info) {
+        
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, 
_pool));
+        _is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order;
+        _nulls_first = tnode.partition_sort_node.sort_info.nulls_first;
+    }
+    //partition by key
+    if (tnode.partition_sort_node.__isset.partition_exprs) {
+        RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, 
tnode.partition_sort_node.partition_exprs,
+                                                 &_partition_expr_ctxs));
+        _partition_exprs_num = _partition_expr_ctxs.size();
+        _partition_columns.resize(_partition_exprs_num);
+    }
+    if (_partition_exprs_num == 0) {
+        _value_places.push_back(_pool->add(new PartitionBlocks()));
+    }
+
+    _has_global_limit = tnode.partition_sort_node.has_global_limit;
+    _top_n_algorithm = tnode.partition_sort_node.top_n_algorithm;
+    _partition_inner_limit = tnode.partition_sort_node.partition_inner_limit;
+    return Status::OK();
+}
+
+Status VPartitionSortNode::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << "VPartitionSortNode::prepare";
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    _hash_table_size_counter = ADD_COUNTER(_runtime_profile, "HashTableSize", 
TUnit::UNIT);
+    _build_timer = ADD_TIMER(runtime_profile(), "HashTableBuildTime");
+    _partition_sort_timer = ADD_TIMER(runtime_profile(), "PartitionSortTime");
+    _get_sorted_timer = ADD_TIMER(runtime_profile(), "GetSortedTime");
+    _selector_block_timer = ADD_TIMER(runtime_profile(), "SelectorBlockTime");
+    _emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime");
+
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), 
_row_descriptor));
+    RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
child(0)->row_desc()));
+    _init_hash_method();
+
+    return Status::OK();
+}
+
+Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* 
input_block,
+                                                     int batch_size) {
+    for (int i = 0; i < _partition_exprs_num; ++i) {
+        int result_column_id = -1;
+        RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, 
&result_column_id));
+        DCHECK(result_column_id != -1);
+        _partition_columns[i] = 
input_block->get_by_position(result_column_id).column.get();
+    }
+    _emplace_into_hash_table(_partition_columns, input_block, batch_size);
+    return Status::OK();
+}
+
+void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& 
key_columns,
+                                                  const vectorized::Block* 
input_block,
+                                                  int batch_size) {
+    std::visit(
+            [&](auto&& agg_method) -> void {
+                SCOPED_TIMER(_build_timer);
+                using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using HashTableType = std::decay_t<decltype(agg_method.data)>;
+                using AggState = typename HashMethodType::State;
+
+                AggState state(key_columns, _partition_key_sz, nullptr);
+                size_t num_rows = input_block->rows();
+                _pre_serialize_key_if_need(state, agg_method, key_columns, 
num_rows);
+
+                //PHHashMap
+                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+                    if (_hash_values.size() < num_rows) {
+                        _hash_values.resize(num_rows);
+                    }
+                    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                          AggState>::value) {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            _hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    } else {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            _hash_values[i] =
+                                    
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
+                        }
+                    }
+                }
+
+                for (size_t row = 0; row < num_rows; ++row) {
+                    SCOPED_TIMER(_emplace_key_timer);
+                    PartitionDataPtr aggregate_data = nullptr;
+                    auto emplace_result = [&]() {
+                        if constexpr 
(HashTableTraits<HashTableType>::is_phmap) {
+                            if (LIKELY(row + HASH_MAP_PREFETCH_DIST < 
num_rows)) {
+                                agg_method.data.prefetch_by_hash(
+                                        _hash_values[row + 
HASH_MAP_PREFETCH_DIST]);
+                            }
+                            return state.emplace_key(agg_method.data, 
_hash_values[row], row,
+                                                     *_agg_arena_pool);
+                        } else {
+                            return state.emplace_key(agg_method.data, row, 
*_agg_arena_pool);
+                        }
+                    }();
+
+                    /// If a new key is inserted, initialize the states of the 
aggregate functions, and possibly something related to the key.
+                    if (emplace_result.is_inserted()) {
+                        /// exception-safety - if you can not allocate memory 
or create states, then destructors will not be called.
+                        emplace_result.set_mapped(nullptr);
+                        aggregate_data = _pool->add(new PartitionBlocks());
+                        emplace_result.set_mapped(aggregate_data);
+                        _value_places.push_back(aggregate_data);
+                        _num_partition++;
+                    } else {
+                        aggregate_data = emplace_result.get_mapped();
+                    }
+                    assert(aggregate_data != nullptr);
+                    aggregate_data->add_row_idx(row);
+                }
+                for (auto place : _value_places) {
+                    SCOPED_TIMER(_selector_block_timer);
+                    place->append_block_by_selector(input_block, 
child(0)->row_desc(),
+                                                    _has_global_limit, 
_partition_inner_limit,
+                                                    batch_size);
+                }
+            },
+            _partitioned_data->_partition_method_variant);
+}
+
+Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
+    auto current_rows = input_block->rows();
+    if (current_rows > 0) {
+        child_input_rows = child_input_rows + current_rows;
+        if (UNLIKELY(_partition_exprs_num == 0)) {
+            //no partition key
+            _value_places[0]->append_whole_block(input_block, 
child(0)->row_desc());
+        } else {
+            //just simply use partition num to check
+            //TODO: here could set can read to true directly. need mutex
+            if (_num_partition > 512 && child_input_rows < 10000 * 
_num_partition) {
+                _blocks_buffer.push(std::move(*input_block));
+            } else {
+                RETURN_IF_ERROR(_split_block_by_partition(input_block, 
state->batch_size()));
+                RETURN_IF_CANCELLED(state);
+                RETURN_IF_ERROR(
+                        state->check_query_state("VPartitionSortNode, while 
split input block."));
+                input_block->clear_column_data();
+            }
+        }
+    }
+
+    if (eos) {
+        //seems could free for hashtable
+        _agg_arena_pool.reset(nullptr);
+        _partitioned_data.reset(nullptr);
+        SCOPED_TIMER(_partition_sort_timer);
+        for (int i = 0; i < _value_places.size(); ++i) {
+            auto sorter = PartitionSorter::create_unique(
+                    _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, 
_nulls_first,
+                    child(0)->row_desc(), state, i == 0 ? 
_runtime_profile.get() : nullptr,
+                    _has_global_limit, _partition_inner_limit, 
_top_n_algorithm,
+                    _previous_row.get());
+
+            DCHECK(child(0)->row_desc().num_materialized_slots() ==
+                   _value_places[i]->blocks.back()->columns());
+            //get blocks from every partition, and sorter get those data.
+            for (const auto& block : _value_places[i]->blocks) {
+                RETURN_IF_ERROR(sorter->append_block(block.get()));
+            }
+            sorter->init_profile(_runtime_profile.get());
+            RETURN_IF_ERROR(sorter->prepare_for_read());
+            _partition_sorts.push_back(std::move(sorter));
+        }
+        if (state->enable_profile()) {
+            debug_profile();
+        }
+        COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
+        _can_read = true;
+    }
+    return Status::OK();
+}
+
+Status VPartitionSortNode::open(RuntimeState* state) {
+    VLOG_CRITICAL << "VPartitionSortNode::open";
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    std::unique_ptr<Block> input_block = Block::create_unique();
+    do {
+        RETURN_IF_ERROR(child(0)->get_next_after_projects(
+                state, input_block.get(), &eos,
+                std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
+                                  ExecNode::get_next,
+                          _children[0], std::placeholders::_1, 
std::placeholders::_2,
+                          std::placeholders::_3)));
+        RETURN_IF_ERROR(sink(state, input_block.get(), eos));
+    } while (!eos);
+
+    child(0)->close(state);
+
+    return Status::OK();
+}
+
+Status VPartitionSortNode::alloc_resource(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::alloc_resource(state));
+    RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
+    RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(state->check_query_state("VPartitionSortNode, while 
open."));
+    return Status::OK();
+}
+
+Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* 
output_block,
+                                bool* eos) {
+    RETURN_IF_CANCELLED(state);
+    output_block->clear_column_data();
+    bool current_eos = false;
+    RETURN_IF_ERROR(get_sorted_block(state, output_block, &current_eos));
+    if (_sort_idx >= _partition_sorts.size() && output_block->rows() == 0) {
+        if (_blocks_buffer.empty() == false) {
+            _blocks_buffer.front().swap(*output_block);
+            _blocks_buffer.pop();
+        } else {
+            *eos = true;
+        }
+    }
+    return Status::OK();
+}
+
+Status VPartitionSortNode::get_next(RuntimeState* state, Block* output_block, 
bool* eos) {
+    if (state == nullptr || output_block == nullptr || eos == nullptr) {
+        return Status::InternalError("input is nullptr");
+    }
+    VLOG_CRITICAL << "VPartitionSortNode::get_next";
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+
+    return pull(state, output_block, eos);
+}
+
+Status VPartitionSortNode::get_sorted_block(RuntimeState* state, Block* 
output_block,
+                                            bool* current_eos) {
+    SCOPED_TIMER(_get_sorted_timer);
+    //sorter output data one by one
+    if (_sort_idx < _partition_sorts.size()) {
+        RETURN_IF_ERROR(_partition_sorts[_sort_idx]->get_next(state, 
output_block, current_eos));
+    }
+    if (*current_eos) {
+        //current sort have eos, so get next idx
+        _previous_row->reset();
+        auto rows = _partition_sorts[_sort_idx]->get_output_rows();
+        partition_profile_output_rows.push_back(rows);
+        _num_rows_returned += rows;
+        _partition_sorts[_sort_idx].reset(nullptr);
+        _sort_idx++;
+    }
+
+    return Status::OK();
+}
+
+Status VPartitionSortNode::close(RuntimeState* state) {
+    VLOG_CRITICAL << "VPartitionSortNode::close";
+    if (is_closed()) {
+        return Status::OK();
+    }
+    return ExecNode::close(state);
+}
+
+void VPartitionSortNode::release_resource(RuntimeState* state) {
+    VExpr::close(_partition_expr_ctxs, state);
+    _vsort_exec_exprs.close(state);
+    ExecNode::release_resource(state);
+}
+
+void VPartitionSortNode::_init_hash_method() {
+    if (_partition_exprs_num == 0) {
+        return;
+    } else if (_partition_exprs_num == 1) {
+        auto is_nullable = _partition_expr_ctxs[0]->root()->is_nullable();
+        switch (_partition_expr_ctxs[0]->root()->result_type()) {
+        case TYPE_TINYINT:
+        case TYPE_BOOLEAN:
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::int8_key, 
is_nullable);
+            return;
+        case TYPE_SMALLINT:
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::int16_key, 
is_nullable);
+            return;
+        case TYPE_INT:
+        case TYPE_FLOAT:
+        case TYPE_DATEV2:
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::int32_key, 
is_nullable);
+            return;
+        case TYPE_BIGINT:
+        case TYPE_DOUBLE:
+        case TYPE_DATE:
+        case TYPE_DATETIME:
+        case TYPE_DATETIMEV2:
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_key, 
is_nullable);
+            return;
+        case TYPE_LARGEINT: {
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_key, 
is_nullable);
+            return;
+        }
+        case TYPE_DECIMALV2:
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128I: {
+            DataTypePtr& type_ptr = 
_partition_expr_ctxs[0]->root()->data_type();
+            TypeIndex idx = is_nullable ? assert_cast<const 
DataTypeNullable&>(*type_ptr)
+                                                  .get_nested_type()
+                                                  ->get_type_id()
+                                        : type_ptr->get_type_id();
+            WhichDataType which(idx);
+            if (which.is_decimal32()) {
+                
_partitioned_data->init(PartitionedHashMapVariants::Type::int32_key, 
is_nullable);
+            } else if (which.is_decimal64()) {
+                
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_key, 
is_nullable);
+            } else {
+                
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_key, 
is_nullable);
+            }
+            return;
+        }
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+        case TYPE_STRING: {
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::string_key, 
is_nullable);
+            break;
+        }
+        default:
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::serialized);
+        }
+    } else {
+        bool use_fixed_key = true;
+        bool has_null = false;
+        int key_byte_size = 0;
+
+        _partition_key_sz.resize(_partition_exprs_num);
+        for (int i = 0; i < _partition_exprs_num; ++i) {
+            const auto& data_type = 
_partition_expr_ctxs[i]->root()->data_type();
+
+            if (!data_type->have_maximum_size_of_value()) {
+                use_fixed_key = false;
+                break;
+            }
+
+            auto is_null = data_type->is_nullable();
+            has_null |= is_null;
+            _partition_key_sz[i] =
+                    data_type->get_maximum_size_of_value_in_memory() - 
(is_null ? 1 : 0);
+            key_byte_size += _partition_key_sz[i];
+        }
+
+        if (std::tuple_size<KeysNullMap<UInt256>>::value + key_byte_size > 
sizeof(UInt256)) {
+            use_fixed_key = false;
+        }
+
+        if (use_fixed_key) {
+            if (has_null) {
+                if (std::tuple_size<KeysNullMap<UInt64>>::value + 
key_byte_size <= sizeof(UInt64)) {
+                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null);
+                } else if (std::tuple_size<KeysNullMap<UInt128>>::value + 
key_byte_size <=
+                           sizeof(UInt128)) {
+                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys,
+                                            has_null);
+                } else {
+                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys,
+                                            has_null);
+                }
+            } else {
+                if (key_byte_size <= sizeof(UInt64)) {
+                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null);
+                } else if (key_byte_size <= sizeof(UInt128)) {
+                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys,
+                                            has_null);
+                } else {
+                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys,
+                                            has_null);
+                }
+            }
+        } else {
+            
_partitioned_data->init(PartitionedHashMapVariants::Type::serialized);
+        }
+    }
+}
+
+void VPartitionSortNode::debug_profile() {
+    fmt::memory_buffer partition_rows_read, partition_blocks_read;
+    fmt::format_to(partition_rows_read, "[");
+    fmt::format_to(partition_blocks_read, "[");
+    for (auto place : _value_places) {
+        fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
+        fmt::format_to(partition_rows_read, "{}, ", place->blocks.size());
+    }
+    fmt::format_to(partition_rows_read, "]");
+    fmt::format_to(partition_blocks_read, "]");
+
+    runtime_profile()->add_info_string("PerPartitionBlocksRead", 
partition_blocks_read.data());
+    runtime_profile()->add_info_string("PerPartitionRowsRead", 
partition_rows_read.data());
+    fmt::memory_buffer partition_output_rows;
+    fmt::format_to(partition_output_rows, "[");
+    for (auto row : partition_profile_output_rows) {
+        fmt::format_to(partition_output_rows, "{}, ", row);
+    }
+    fmt::format_to(partition_output_rows, "]");
+    runtime_profile()->add_info_string("PerPartitionOutputRows", 
partition_output_rows.data());
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
new file mode 100644
index 0000000000..4143b19dc9
--- /dev/null
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <glog/logging.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "exec/exec_node.h"
+#include "vec/columns/column.h"
+#include "vec/common/columns_hashing.h"
+#include "vec/common/hash_table/hash.h"
+#include "vec/common/hash_table/ph_hash_map.h"
+#include "vec/common/hash_table/string_hash_map.h"
+#include "vec/common/sort/partition_sorter.h"
+#include "vec/common/sort/vsort_exec_exprs.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace vectorized {
+static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20;
+
+struct PartitionBlocks {
+public:
+    PartitionBlocks() = default;
+    ~PartitionBlocks() = default;
+
+    void add_row_idx(size_t row) { selector.push_back(row); }
+
+    void append_block_by_selector(const vectorized::Block* input_block,
+                                  const RowDescriptor& row_desc, bool is_limit,
+                                  int64_t partition_inner_limit, int 
batch_size) {
+        if (blocks.empty() || reach_limit()) {
+            init_rows = batch_size;
+            
blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc)));
+        }
+        auto columns = input_block->get_columns();
+        auto mutable_columns = blocks.back()->mutate_columns();
+        DCHECK(columns.size() == mutable_columns.size());
+        for (int i = 0; i < mutable_columns.size(); ++i) {
+            columns[i]->append_data_by_selector(mutable_columns[i], selector);
+        }
+        init_rows = init_rows - selector.size();
+        total_rows = total_rows + selector.size();
+        selector.clear();
+    }
+
+    void append_whole_block(vectorized::Block* input_block, const 
RowDescriptor& row_desc) {
+        auto empty_block = 
Block::create_unique(VectorizedUtils::create_empty_block(row_desc));
+        empty_block->swap(*input_block);
+        blocks.emplace_back(std::move(empty_block));
+    }
+
+    bool reach_limit() {
+        return init_rows <= 0 || blocks.back()->bytes() > 
INITIAL_BUFFERED_BLOCK_BYTES;
+    }
+
+    size_t get_total_rows() const { return total_rows; }
+
+    IColumn::Selector selector;
+    std::vector<std::unique_ptr<Block>> blocks;
+    size_t total_rows = 0;
+    int init_rows = 4096;
+};
+
+using PartitionDataPtr = PartitionBlocks*;
+using PartitionDataWithStringKey = PHHashMap<StringRef, PartitionDataPtr, 
DefaultHash<StringRef>>;
+using PartitionDataWithShortStringKey = StringHashMap<PartitionDataPtr>;
+using PartitionDataWithUInt32Key = PHHashMap<UInt32, PartitionDataPtr, 
HashCRC32<UInt32>>;
+
+template <typename TData>
+struct PartitionMethodSerialized {
+    using Data = TData;
+    using Key = typename Data::key_type;
+    using Mapped = typename Data::mapped_type;
+    using Iterator = typename Data::iterator;
+
+    Data data;
+    Iterator iterator;
+    bool inited = false;
+    std::vector<StringRef> keys;
+    size_t keys_memory_usage = 0;
+    PartitionMethodSerialized() : _serialized_key_buffer_size(0), 
_serialized_key_buffer(nullptr) {}
+
+    using State = ColumnsHashing::HashMethodSerialized<typename 
Data::value_type, Mapped, true>;
+
+    template <typename Other>
+    explicit PartitionMethodSerialized(const Other& other) : data(other.data) 
{}
+
+    size_t serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) {
+        if (keys.size() < num_rows) {
+            keys.resize(num_rows);
+        }
+
+        size_t max_one_row_byte_size = 0;
+        for (const auto& column : key_columns) {
+            max_one_row_byte_size += column->get_max_row_byte_size();
+        }
+        size_t total_bytes = max_one_row_byte_size * num_rows;
+
+        if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) {
+            // reach mem limit, don't serialize in batch
+            // for simplicity, we just create a new arena here.
+            _arena.reset(new Arena());
+            size_t keys_size = key_columns.size();
+            for (size_t i = 0; i < num_rows; ++i) {
+                keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, *_arena);
+            }
+            keys_memory_usage = _arena->size();
+        } else {
+            _arena.reset();
+            if (total_bytes > _serialized_key_buffer_size) {
+                _serialized_key_buffer_size = total_bytes;
+                _serialize_key_arena.reset(new Arena());
+                _serialized_key_buffer = reinterpret_cast<uint8_t*>(
+                        
_serialize_key_arena->alloc(_serialized_key_buffer_size));
+            }
+
+            for (size_t i = 0; i < num_rows; ++i) {
+                keys[i].data =
+                        reinterpret_cast<char*>(_serialized_key_buffer + i * 
max_one_row_byte_size);
+                keys[i].size = 0;
+            }
+
+            for (const auto& column : key_columns) {
+                column->serialize_vec(keys, num_rows, max_one_row_byte_size);
+            }
+            keys_memory_usage = _serialized_key_buffer_size;
+        }
+        return max_one_row_byte_size;
+    }
+
+private:
+    size_t _serialized_key_buffer_size;
+    uint8_t* _serialized_key_buffer;
+    std::unique_ptr<Arena> _serialize_key_arena;
+    std::unique_ptr<Arena> _arena;
+    static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 
1024; // 16M
+};
+
+//for string
+template <typename TData>
+struct PartitionMethodStringNoCache {
+    using Data = TData;
+    using Key = typename Data::key_type;
+    using Mapped = typename Data::mapped_type;
+    using Iterator = typename Data::iterator;
+
+    Data data;
+    Iterator iterator;
+    bool inited = false;
+
+    PartitionMethodStringNoCache() = default;
+
+    explicit PartitionMethodStringNoCache(size_t size_hint) : data(size_hint) 
{}
+
+    template <typename Other>
+    explicit PartitionMethodStringNoCache(const Other& other) : 
data(other.data) {}
+
+    using State = ColumnsHashing::HashMethodString<typename Data::value_type, 
Mapped, true, false>;
+
+    static const bool low_cardinality_optimization = false;
+};
+
+/// For the case where there is one numeric key.
+/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
+template <typename FieldType, typename TData, bool 
consecutive_keys_optimization = false>
+struct PartitionMethodOneNumber {
+    using Data = TData;
+    using Key = typename Data::key_type;
+    using Mapped = typename Data::mapped_type;
+    using Iterator = typename Data::iterator;
+
+    Data data;
+    Iterator iterator;
+    bool inited = false;
+
+    PartitionMethodOneNumber() = default;
+
+    template <typename Other>
+    PartitionMethodOneNumber(const Other& other) : data(other.data) {}
+
+    /// To use one `Method` in different threads, use different `State`.
+    using State = ColumnsHashing::HashMethodOneNumber<typename 
Data::value_type, Mapped, FieldType,
+                                                      
consecutive_keys_optimization>;
+};
+
+template <typename Base>
+struct PartitionDataWithNullKey : public Base {
+    using Base::Base;
+
+    bool& has_null_key_data() { return has_null_key; }
+    PartitionDataPtr& get_null_key_data() { return null_key_data; }
+    bool has_null_key_data() const { return has_null_key; }
+    PartitionDataPtr get_null_key_data() const { return null_key_data; }
+    size_t size() const { return Base::size() + (has_null_key ? 1 : 0); }
+    bool empty() const { return Base::empty() && !has_null_key; }
+
+    void clear() {
+        Base::clear();
+        has_null_key = false;
+    }
+
+    void clear_and_shrink() {
+        Base::clear_and_shrink();
+        has_null_key = false;
+    }
+
+private:
+    bool has_null_key = false;
+    PartitionDataPtr null_key_data = nullptr;
+};
+
+template <typename SingleColumnMethod>
+struct PartitionMethodSingleNullableColumn : public SingleColumnMethod {
+    using Base = SingleColumnMethod;
+    using BaseState = typename Base::State;
+
+    using Data = typename Base::Data;
+    using Key = typename Base::Key;
+    using Mapped = typename Base::Mapped;
+
+    using Base::data;
+
+    PartitionMethodSingleNullableColumn() = default;
+
+    template <typename Other>
+    explicit PartitionMethodSingleNullableColumn(const Other& other) : 
Base(other) {}
+
+    using State = ColumnsHashing::HashMethodSingleLowNullableColumn<BaseState, 
Mapped, true>;
+};
+
+using PartitionedMethodVariants =
+        std::variant<PartitionMethodSerialized<PartitionDataWithStringKey>,
+                     PartitionMethodOneNumber<UInt32, 
PartitionDataWithUInt32Key>,
+                     
PartitionMethodSingleNullableColumn<PartitionMethodOneNumber<
+                             UInt32, 
PartitionDataWithNullKey<PartitionDataWithUInt32Key>>>,
+                     
PartitionMethodStringNoCache<PartitionDataWithShortStringKey>,
+                     
PartitionMethodSingleNullableColumn<PartitionMethodStringNoCache<
+                             
PartitionDataWithNullKey<PartitionDataWithShortStringKey>>>>;
+
+struct PartitionedHashMapVariants {
+    PartitionedHashMapVariants() = default;
+    PartitionedHashMapVariants(const PartitionedHashMapVariants&) = delete;
+    PartitionedHashMapVariants& operator=(const PartitionedHashMapVariants&) = 
delete;
+    PartitionedMethodVariants _partition_method_variant;
+
+    enum class Type {
+        EMPTY = 0,
+        serialized,
+        int8_key,
+        int16_key,
+        int32_key,
+        int64_key,
+        int128_key,
+        int64_keys,
+        int128_keys,
+        int256_keys,
+        string_key,
+    };
+
+    Type _type = Type::EMPTY;
+
+    void init(Type type, bool is_nullable = false) {
+        _type = type;
+        switch (_type) {
+        case Type::serialized:
+            _partition_method_variant
+                    
.emplace<PartitionMethodSerialized<PartitionDataWithStringKey>>();
+            break;
+        case Type::int32_key:
+            if (is_nullable) {
+                _partition_method_variant
+                        
.emplace<PartitionMethodSingleNullableColumn<PartitionMethodOneNumber<
+                                UInt32, 
PartitionDataWithNullKey<PartitionDataWithUInt32Key>>>>();
+            } else {
+                _partition_method_variant
+                        .emplace<PartitionMethodOneNumber<UInt32, 
PartitionDataWithUInt32Key>>();
+            }
+            break;
+        case Type::string_key:
+            if (is_nullable) {
+                _partition_method_variant
+                        
.emplace<PartitionMethodSingleNullableColumn<PartitionMethodStringNoCache<
+                                
PartitionDataWithNullKey<PartitionDataWithShortStringKey>>>>();
+            } else {
+                _partition_method_variant
+                        
.emplace<PartitionMethodStringNoCache<PartitionDataWithShortStringKey>>();
+            }
+            break;
+        default:
+            DCHECK(false) << "Do not have a rigth partition by data type";
+        }
+    }
+};
+
+class VExprContext;
+
+class VPartitionSortNode : public ExecNode {
+public:
+    VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    ~VPartitionSortNode() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
+    Status prepare(RuntimeState* state) override;
+    Status alloc_resource(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+    void release_resource(RuntimeState* state) override;
+    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+    Status close(RuntimeState* state) override;
+
+    Status pull(RuntimeState* state, vectorized::Block* output_block, bool* 
eos) override;
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override;
+
+    void debug_profile();
+
+private:
+    template <typename AggState, typename AggMethod>
+    void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
+                                    const ColumnRawPtrs& key_columns, const 
size_t num_rows) {
+        if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<AggState>::value) {
+            (agg_method.serialize_keys(key_columns, num_rows));
+            state.set_serialized_keys(agg_method.keys.data());
+        }
+    }
+
+    void _init_hash_method();
+    Status _split_block_by_partition(vectorized::Block* input_block, int 
batch_size);
+    void _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
+                                  const vectorized::Block* input_block, int 
batch_size);
+    Status get_sorted_block(RuntimeState* state, Block* output_block, bool* 
eos);
+
+    // hash table
+    std::unique_ptr<PartitionedHashMapVariants> _partitioned_data;
+    std::unique_ptr<Arena> _agg_arena_pool;
+    // partition by k1,k2
+    int _partition_exprs_num = 0;
+    std::vector<VExprContext*> _partition_expr_ctxs;
+    std::vector<const IColumn*> _partition_columns;
+    std::vector<size_t> _partition_key_sz;
+    std::vector<size_t> _hash_values;
+
+    std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts;
+    std::vector<PartitionDataPtr> _value_places;
+    // Expressions and parameters used for build _sort_description
+    VSortExecExprs _vsort_exec_exprs;
+    std::vector<bool> _is_asc_order;
+    std::vector<bool> _nulls_first;
+    TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
+    bool _has_global_limit = false;
+    int _num_partition = 0;
+    int64_t _partition_inner_limit = 0;
+    int _sort_idx = 0;
+    std::unique_ptr<SortCursorCmp> _previous_row = nullptr;
+    std::queue<Block> _blocks_buffer;
+    int64_t child_input_rows = 0;
+
+    RuntimeProfile::Counter* _build_timer;
+    RuntimeProfile::Counter* _emplace_key_timer;
+    RuntimeProfile::Counter* _partition_sort_timer;
+    RuntimeProfile::Counter* _get_sorted_timer;
+    RuntimeProfile::Counter* _selector_block_timer;
+
+    RuntimeProfile::Counter* _hash_table_size_counter;
+    //only for profile record
+    std::vector<int> partition_profile_output_rows;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 8afc3fde4c..ed54a8c3d2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -57,6 +57,7 @@ enum TPlanNodeType {
   FILE_SCAN_NODE,
   JDBC_SCAN_NODE,
   TEST_EXTERNAL_SCAN_NODE,
+  PARTITION_SORT_NODE,
 }
 
 // phases of an execution node
@@ -765,6 +766,19 @@ struct TSortNode {
   7: optional bool use_topn_opt
 }
 
+enum TopNAlgorithm {
+   RANK,
+   DENSE_RANK,
+   ROW_NUMBER
+ }
+
+ struct TPartitionSortNode {
+   1: optional list<Exprs.TExpr> partition_exprs
+   2: optional TSortInfo sort_info
+   3: optional bool has_global_limit
+   4: optional TopNAlgorithm top_n_algorithm
+   5: optional i64 partition_inner_limit
+ }
 enum TAnalyticWindowType {
   // Specifies the window as a logical offset
   RANGE,
@@ -1072,6 +1086,7 @@ struct TPlanNode {
 
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id
+  103: optional TPartitionSortNode partition_sort_node
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to