This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d38693f39a5 [refactor](spill) Refactor logics of spilling (#37120)
d38693f39a5 is described below

commit d38693f39a5d13da63be3e883104a3e92d01bd6e
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Jul 4 16:52:54 2024 +0800

    [refactor](spill) Refactor logics of spilling (#37120)
    
    Refactor spilling sort operator. Remove redundant code.
---
 be/src/agent/workload_group_listener.cpp           | 12 +++---
 be/src/pipeline/exec/sort_sink_operator.cpp        | 34 ++++-----------
 be/src/pipeline/exec/sort_sink_operator.h          | 11 ++---
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 44 +++++--------------
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  1 -
 .../pipeline/exec/spill_sort_source_operator.cpp   | 10 ++---
 be/src/pipeline/pipeline_fragment_context.cpp      |  8 +++-
 be/src/pipeline/pipeline_task.cpp                  |  6 +--
 be/src/pipeline/pipeline_task.h                    | 12 ++++--
 be/src/runtime/workload_group/workload_group.cpp   | 50 +++++++++++-----------
 be/src/runtime/workload_group/workload_group.h     | 34 ++++++++-------
 be/src/vec/common/sort/sorter.h                    |  2 +-
 .../java/org/apache/doris/planner/SortNode.java    | 19 ++++++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  9 ++--
 gensrc/thrift/PlanNodes.thrift                     |  7 +++
 15 files changed, 124 insertions(+), 135 deletions(-)

diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index 15c61be5156..61af4543196 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -35,16 +35,18 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
         is_set_workload_group_info = true;
 
         // 1 parse topic info to group info
-        WorkloadGroupInfo workload_group_info;
-        Status ret = 
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
-                                                         &workload_group_info);
+        WorkloadGroupInfo workload_group_info =
+                
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info);
         // it means FE has this wg, but may parse failed, so we should not 
delete it.
         if (workload_group_info.id != 0) {
             current_wg_ids.insert(workload_group_info.id);
         }
-        if (!ret.ok()) {
+        if (!workload_group_info.valid) {
             LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id="
-                      << workload_group_info.id << ", reason:" << 
ret.to_string();
+                      << workload_group_info.id << ", reason: 
[tworkload_group_info.__isset.id: "
+                      << topic_info.workload_group_info.__isset.id
+                      << ", tworkload_group_info.__isset.version: "
+                      << topic_info.workload_group_info.__isset.version << "]";
             continue;
         }
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index f2224383f86..7230116a1a0 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -43,19 +43,19 @@ Status SortSinkLocalState::open(RuntimeState* state) {
 
     RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
     switch (p._algorithm) {
-    case SortAlgorithm::HEAP_SORT: {
+    case TSortAlgorithm::HEAP_SORT: {
         _shared_state->sorter = vectorized::HeapSorter::create_unique(
                 _vsort_exec_exprs, p._limit, p._offset, p._pool, 
p._is_asc_order, p._nulls_first,
                 p._child_x->row_desc());
         break;
     }
-    case SortAlgorithm::TOPN_SORT: {
+    case TSortAlgorithm::TOPN_SORT: {
         _shared_state->sorter = vectorized::TopNSorter::create_unique(
                 _vsort_exec_exprs, p._limit, p._offset, p._pool, 
p._is_asc_order, p._nulls_first,
                 p._child_x->row_desc(), state, _profile);
         break;
     }
-    case SortAlgorithm::FULL_SORT: {
+    case TSortAlgorithm::FULL_SORT: {
         _shared_state->sorter = vectorized::FullSorter::create_unique(
                 _vsort_exec_exprs, p._limit, p._offset, p._pool, 
p._is_asc_order, p._nulls_first,
                 p._child_x->row_desc(), state, _profile);
@@ -73,14 +73,13 @@ Status SortSinkLocalState::open(RuntimeState* state) {
 }
 
 SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                     const DescriptorTbl& descs, bool 
require_bucket_distribution)
+                                     const DescriptorTbl& descs,
+                                     const bool require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id),
           _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
           _pool(pool),
-          _reuse_mem(true),
           _limit(tnode.limit),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
-          _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
           _merge_by_exchange(tnode.sort_node.merge_by_exchange),
           _is_colocate(tnode.sort_node.__isset.is_colocate && 
tnode.sort_node.is_colocate),
           _require_bucket_distribution(require_bucket_distribution),
@@ -88,7 +87,10 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int 
operator_id, const TP
                                     ? tnode.sort_node.is_analytic_sort
                                     : false),
           _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
-                                                               : 
std::vector<TExpr> {}) {}
+                                                               : 
std::vector<TExpr> {}),
+          _algorithm(tnode.sort_node.__isset.algorithm ? 
tnode.sort_node.algorithm
+                                                       : 
TSortAlgorithm::FULL_SORT),
+          _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
 
 Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
@@ -105,24 +107,6 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
 }
 
 Status SortSinkOperatorX::prepare(RuntimeState* state) {
-    const auto& row_desc = _child_x->row_desc();
-
-    // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap 
sort in priority.
-    // To do heap sorting, each income block will be filtered by heap-top row. 
There will be some
-    // `memcpy` operations. To ensure heap sort will not incur performance 
fallback, we should
-    // exclude cases which incoming blocks has string column which is 
sensitive to operations like
-    // `filter` and `memcpy`
-    if (_limit > 0 && _limit + _offset < 
vectorized::HeapSorter::HEAP_SORT_THRESHOLD &&
-        (_use_two_phase_read || 
state->get_query_ctx()->has_runtime_predicate(_node_id) ||
-         !row_desc.has_varlen_slots())) {
-        _algorithm = SortAlgorithm::HEAP_SORT;
-        _reuse_mem = false;
-    } else if (_limit > 0 && row_desc.has_varlen_slots() &&
-               _limit + _offset < vectorized::TopNSorter::TOPN_SORT_THRESHOLD) 
{
-        _algorithm = SortAlgorithm::TOPN_SORT;
-    } else {
-        _algorithm = SortAlgorithm::FULL_SORT;
-    }
     return _vsort_exec_exprs.prepare(state, _child_x->row_desc(), 
_row_descriptor);
 }
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index fa59b1715dc..b842a56f2ad 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -24,8 +24,6 @@
 
 namespace doris::pipeline {
 
-enum class SortAlgorithm { HEAP_SORT, TOPN_SORT, FULL_SORT };
-
 class SortSinkOperatorX;
 
 class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {
@@ -53,7 +51,7 @@ private:
 class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
 public:
     SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                      const DescriptorTbl& descs, bool 
require_bucket_distribution);
+                      const DescriptorTbl& descs, const bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<SortSinkLocalState>::_name);
@@ -77,8 +75,6 @@ public:
     }
     bool require_data_distribution() const override { return _is_colocate; }
 
-    bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; 
}
-
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
     Status prepare_for_spill(RuntimeState* state);
@@ -99,17 +95,16 @@ private:
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
 
-    bool _reuse_mem;
     const int64_t _limit;
-    SortAlgorithm _algorithm;
 
     const RowDescriptor _row_descriptor;
-    const bool _use_two_phase_read;
     const bool _merge_by_exchange;
     const bool _is_colocate = false;
     const bool _require_bucket_distribution = false;
     const bool _is_analytic_sort = false;
     const std::vector<TExpr> _partition_exprs;
+    const TSortAlgorithm::type _algorithm;
+    const bool _reuse_mem;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index b7fae82ca54..4c6eb290ef1 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -25,9 +25,8 @@
 namespace doris::pipeline {
 SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* 
parent, RuntimeState* state)
         : Base(parent, state) {
-    _finish_dependency =
-            std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
-                                         parent->get_name() + 
"_SPILL_DEPENDENCY", true);
+    _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                                      parent->get_name() + 
"_SPILL_DEPENDENCY");
 }
 
 Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
@@ -40,12 +39,8 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* 
state,
 
     RETURN_IF_ERROR(setup_in_memory_sort_op(state));
 
-    auto& parent = Base::_parent->template cast<Parent>();
-    Base::_shared_state->enable_spill = parent._enable_spill;
-    
Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill(parent._enable_spill);
-    if (parent._enable_spill) {
-        _finish_dependency->block();
-    }
+    Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill();
+    _finish_dependency->block();
     return Status::OK();
 }
 
@@ -78,10 +73,7 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
 }
 
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
-    auto& parent = Base::_parent->template cast<Parent>();
-    if (parent._enable_spill) {
-        dec_running_big_mem_op_num(state);
-    }
+    dec_running_big_mem_op_num(state);
     return Status::OK();
 }
 Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
@@ -133,8 +125,6 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
 Status SpillSortSinkOperatorX::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::prepare(state));
     RETURN_IF_ERROR(_sort_sink_operator->prepare(state));
-    _enable_spill = _sort_sink_operator->is_full_sort();
-    LOG(INFO) << "spill sort sink, enable spill: " << _enable_spill;
     return Status::OK();
 }
 Status SpillSortSinkOperatorX::open(RuntimeState* state) {
@@ -142,16 +132,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
     return _sort_sink_operator->open(state);
 }
 Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) {
-    if (!_enable_spill) {
-        return Status::OK();
-    }
     auto& local_state = get_local_state(state);
     return local_state.revoke_memory(state);
 }
 size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
-    if (!_enable_spill) {
-        return 0;
-    }
     auto& local_state = get_local_state(state);
     if (!local_state.Base::_shared_state->sink_status.ok()) {
         return UINT64_MAX;
@@ -161,9 +145,7 @@ size_t 
SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
 Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
                                     bool eos) {
     auto& local_state = get_local_state(state);
-    if (_enable_spill) {
-        local_state.inc_running_big_mem_op_num(state);
-    }
+    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
@@ -177,17 +159,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     local_state._mem_tracker->set_consumption(
             
local_state._shared_state->in_mem_shared_state->sorter->data_size());
     if (eos) {
-        if (_enable_spill) {
-            if (local_state._shared_state->is_spilled) {
-                if (revocable_mem_size(state) > 0) {
-                    RETURN_IF_ERROR(revoke_memory(state));
-                } else {
-                    local_state._dependency->set_ready_to_read();
-                    local_state._finish_dependency->set_ready();
-                }
+        if (local_state._shared_state->is_spilled) {
+            if (revocable_mem_size(state) > 0) {
+                RETURN_IF_ERROR(revoke_memory(state));
             } else {
-                RETURN_IF_ERROR(
-                        
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
                 local_state._dependency->set_ready_to_read();
                 local_state._finish_dependency->set_ready();
             }
@@ -195,6 +170,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
             RETURN_IF_ERROR(
                     
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
             local_state._dependency->set_ready_to_read();
+            local_state._finish_dependency->set_ready();
         }
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 5347f22d11f..c5b70d6fcea 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -96,6 +96,5 @@ public:
 private:
     friend class SpillSortSinkLocalState;
     std::unique_ptr<SortSinkOperatorX> _sort_sink_operator;
-    bool _enable_spill = false;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index b322f33caa2..72304291f6d 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -66,9 +66,7 @@ Status SpillSortLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    if (Base::_shared_state->enable_spill) {
-        dec_running_big_mem_op_num(state);
-    }
+    dec_running_big_mem_op_num(state);
     return Base::close(state);
 }
 int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
@@ -274,13 +272,11 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
             local_state._current_merging_streams.clear();
         }
     }};
-    if (local_state.Base::_shared_state->enable_spill) {
-        local_state.inc_running_big_mem_op_num(state);
-    }
+    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
-    if (local_state.Base::_shared_state->enable_spill && 
local_state._shared_state->is_spilled) {
+    if (local_state._shared_state->is_spilled) {
         if (!local_state._merger) {
             local_state._status = 
local_state.initiate_merge_sort_spill_streams(state);
             return local_state._status;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0968de7951e..9ef551df6db 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -105,6 +105,8 @@
 #include "util/container_util.hpp"
 #include "util/debug_util.h"
 #include "util/uid_util.h"
+#include "vec/common/sort/heap_sorter.h"
+#include "vec/common/sort/topn_sorter.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
 namespace doris::pipeline {
@@ -1332,7 +1334,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         break;
     }
     case TPlanNodeType::SORT_NODE: {
-        if (_runtime_state->enable_sort_spill()) {
+        const auto should_spill = _runtime_state->enable_sort_spill() &&
+                                  tnode.sort_node.algorithm == 
TSortAlgorithm::FULL_SORT;
+        if (should_spill) {
             op.reset(new SpillSortSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
         } else {
             op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
@@ -1347,7 +1351,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        if (_runtime_state->enable_sort_spill()) {
+        if (should_spill) {
             sink.reset(new SpillSortSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs,
                                                   
_require_bucket_distribution));
         } else {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 52951e1c9c0..20c225dcba6 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -59,8 +59,8 @@ PipelineTask::PipelineTask(
           _fragment_context(fragment_context),
           _parent_profile(parent_profile),
           _operators(pipeline->operator_xs()),
-          _source(_operators.front()),
-          _root(_operators.back()),
+          _source(_operators.front().get()),
+          _root(_operators.back().get()),
           _sink(pipeline->sink_shared_pointer()),
           _le_state_map(std::move(le_state_map)),
           _task_idx(task_idx),
@@ -414,7 +414,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_m
         DCHECK(big_memory_operator_num >= 0);
         int64_t mem_limit_of_op;
         if (0 == big_memory_operator_num) {
-            mem_limit_of_op = int64_t(query_weighted_limit * 0.8);
+            return false;
         } else {
             mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
         }
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 9983b315e82..63f464c03ad 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -132,8 +132,6 @@ public:
 
     DataSinkOperatorXPtr sink() const { return _sink; }
 
-    OperatorXPtr source() const { return _source; }
-
     int task_id() const { return _index; };
     bool is_finalized() const { return _finalized; }
 
@@ -178,6 +176,12 @@ public:
     void set_core_id(int core_id) { this->_core_id = core_id; }
     int get_core_id() const { return this->_core_id; }
 
+    /**
+     * Return true if:
+     * 1. `enable_force_spill` is true which forces this task to spill data.
+     * 2. Or memory consumption reaches the high water mark of current 
workload group (80% of memory limitation by default) and revocable_mem_bytes is 
bigger than min_revocable_mem_bytes.
+     * 3. Or memory consumption is higher than the low water mark of current 
workload group (50% of memory limitation by default) and 
`query_weighted_consumption >= query_weighted_limit` and revocable memory is 
big enough.
+     */
     static bool should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes);
 
     void put_in_runnable_queue() {
@@ -278,8 +282,8 @@ private:
     MonotonicStopWatch _pipeline_task_watcher;
 
     OperatorXs _operators; // left is _source, right is _root
-    OperatorXPtr _source;
-    OperatorXPtr _root;
+    OperatorXBase* _source;
+    OperatorXBase* _root;
     DataSinkOperatorXPtr _sink;
 
     // `_read_dependencies` is stored as same order as `_operators`
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 1c496cde8d0..64a5c7aeffb 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -245,46 +245,41 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile,
     return freed_mem;
 }
 
-Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& 
tworkload_group_info,
-                                           WorkloadGroupInfo* 
workload_group_info) {
+WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
+        const TWorkloadGroupInfo& tworkload_group_info) {
     // 1 id
-    int tg_id = 0;
+    uint64_t tg_id = 0;
     if (tworkload_group_info.__isset.id) {
         tg_id = tworkload_group_info.id;
     } else {
-        return Status::InternalError<false>("workload group id is required");
+        return {.valid = false};
     }
-    workload_group_info->id = tg_id;
 
     // 2 name
     std::string name = "INVALID_NAME";
     if (tworkload_group_info.__isset.name) {
         name = tworkload_group_info.name;
     }
-    workload_group_info->name = name;
 
     // 3 version
     int version = 0;
     if (tworkload_group_info.__isset.version) {
         version = tworkload_group_info.version;
     } else {
-        return Status::InternalError<false>("workload group version is 
required");
+        return {.valid = false};
     }
-    workload_group_info->version = version;
 
     // 4 cpu_share
     uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
     if (tworkload_group_info.__isset.cpu_share) {
         cpu_share = tworkload_group_info.cpu_share;
     }
-    workload_group_info->cpu_share = cpu_share;
 
     // 5 cpu hard limit
     int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE;
     if (tworkload_group_info.__isset.cpu_hard_limit) {
         cpu_hard_limit = tworkload_group_info.cpu_hard_limit;
     }
-    workload_group_info->cpu_hard_limit = cpu_hard_limit;
 
     // 6 mem_limit
     std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE;
@@ -294,44 +289,37 @@ Status WorkloadGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& tworkload_g
     bool is_percent = true;
     int64_t mem_limit =
             ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), 
&is_percent);
-    workload_group_info->memory_limit = mem_limit;
 
     // 7 mem overcommit
     bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE;
     if (tworkload_group_info.__isset.enable_memory_overcommit) {
         enable_memory_overcommit = 
tworkload_group_info.enable_memory_overcommit;
     }
-    workload_group_info->enable_memory_overcommit = enable_memory_overcommit;
 
     // 8 cpu soft limit or hard limit
     bool enable_cpu_hard_limit = false;
     if (tworkload_group_info.__isset.enable_cpu_hard_limit) {
         enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit;
     }
-    workload_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
 
     // 9 scan thread num
-    workload_group_info->scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    int scan_thread_num = config::doris_scanner_thread_pool_thread_num;
     if (tworkload_group_info.__isset.scan_thread_num && 
tworkload_group_info.scan_thread_num > 0) {
-        workload_group_info->scan_thread_num = 
tworkload_group_info.scan_thread_num;
+        scan_thread_num = tworkload_group_info.scan_thread_num;
     }
 
     // 10 max remote scan thread num
-    workload_group_info->max_remote_scan_thread_num =
-            vectorized::ScannerScheduler::get_remote_scan_thread_num();
+    int max_remote_scan_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
     if (tworkload_group_info.__isset.max_remote_scan_thread_num &&
         tworkload_group_info.max_remote_scan_thread_num > 0) {
-        workload_group_info->max_remote_scan_thread_num =
-                tworkload_group_info.max_remote_scan_thread_num;
+        max_remote_scan_thread_num = 
tworkload_group_info.max_remote_scan_thread_num;
     }
 
     // 11 min remote scan thread num
-    workload_group_info->min_remote_scan_thread_num =
-            vectorized::ScannerScheduler::get_remote_scan_thread_num();
+    int min_remote_scan_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
     if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
         tworkload_group_info.min_remote_scan_thread_num > 0) {
-        workload_group_info->min_remote_scan_thread_num =
-                tworkload_group_info.min_remote_scan_thread_num;
+        min_remote_scan_thread_num = 
tworkload_group_info.min_remote_scan_thread_num;
     }
 
     // 12 spill low watermark
@@ -339,16 +327,26 @@ Status WorkloadGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& tworkload_g
     if (tworkload_group_info.__isset.spill_threshold_low_watermark) {
         spill_low_watermark = 
tworkload_group_info.spill_threshold_low_watermark;
     }
-    workload_group_info->spill_low_watermark = spill_low_watermark;
 
     // 13 spil high watermark
     int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
     if (tworkload_group_info.__isset.spill_threshold_high_watermark) {
         spill_high_watermark = 
tworkload_group_info.spill_threshold_high_watermark;
     }
-    workload_group_info->spill_high_watermark = spill_high_watermark;
 
-    return Status::OK();
+    return {tg_id,
+            name,
+            cpu_share,
+            mem_limit,
+            enable_memory_overcommit,
+            version,
+            cpu_hard_limit,
+            enable_cpu_hard_limit,
+            scan_thread_num,
+            max_remote_scan_thread_num,
+            min_remote_scan_thread_num,
+            spill_low_watermark,
+            spill_high_watermark};
 }
 
 void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* 
exec_env) {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 971cc1cb023..a82efab0904 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -168,7 +168,9 @@ private:
     const uint64_t _id;
     std::string _name;
     int64_t _version;
-    int64_t _memory_limit;                      // bytes
+    int64_t _memory_limit; // bytes
+    // `_weighted_mem_used` is a rough memory usage in this group,
+    // because we can only get a precise memory usage by MemTracker which is 
not include page cache.
     std::atomic_int64_t _weighted_mem_used = 0; // bytes
     bool _enable_memory_overcommit;
     std::atomic<uint64_t> _cpu_share;
@@ -197,25 +199,25 @@ private:
 using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
 
 struct WorkloadGroupInfo {
-    uint64_t id;
-    std::string name;
-    uint64_t cpu_share;
-    int64_t memory_limit;
-    bool enable_memory_overcommit;
-    int64_t version;
-    int cpu_hard_limit;
-    bool enable_cpu_hard_limit;
-    int scan_thread_num;
-    int max_remote_scan_thread_num;
-    int min_remote_scan_thread_num;
-    int spill_low_watermark;
-    int spill_high_watermark;
+    const uint64_t id = 0;
+    const std::string name;
+    const uint64_t cpu_share = 0;
+    const int64_t memory_limit = 0;
+    const bool enable_memory_overcommit = false;
+    const int64_t version = 0;
+    const int cpu_hard_limit = 0;
+    const bool enable_cpu_hard_limit = false;
+    const int scan_thread_num = 0;
+    const int max_remote_scan_thread_num = 0;
+    const int min_remote_scan_thread_num = 0;
+    const int spill_low_watermark = 0;
+    const int spill_high_watermark = 0;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
     int cgroup_cpu_hard_limit = 0;
+    const bool valid = true;
 
-    static Status parse_topic_info(const TWorkloadGroupInfo& 
tworkload_group_info,
-                                   WorkloadGroupInfo* workload_group_info);
+    static WorkloadGroupInfo parse_topic_info(const TWorkloadGroupInfo& 
tworkload_group_info);
 };
 
 } // namespace doris
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 2525ca8c0c1..478e91c0783 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -136,7 +136,7 @@ public:
     int64_t limit() const { return _limit; }
     int64_t offset() const { return _offset; }
 
-    void set_enable_spill(bool b) { _enable_spill = b; }
+    void set_enable_spill() { _enable_spill = true; }
 
 protected:
     Status partial_sort(Block& src_block, Block& dest_block);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 24b384d4453..4cdc04d1f1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -34,6 +34,7 @@ import org.apache.doris.statistics.StatsRecursiveDerive;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TSortAlgorithm;
 import org.apache.doris.thrift.TSortInfo;
 import org.apache.doris.thrift.TSortNode;
 
@@ -63,6 +64,7 @@ public class SortNode extends PlanNode {
     private final boolean  useTopN;
     private boolean useTopnOpt = false;
     private boolean useTwoPhaseReadOpt;
+    private boolean hasRuntimePredicate = false;
 
     // If mergeByexchange is set to true, the sort information is pushed to the
     // exchange node, and the sort node is used for the ORDER BY .
@@ -323,6 +325,19 @@ public class SortNode extends PlanNode {
         msg.sort_node.setMergeByExchange(this.mergeByexchange);
         msg.sort_node.setIsAnalyticSort(isAnalyticSort);
         msg.sort_node.setIsColocate(isColocate);
+
+        boolean isFixedLength = info.getOrderingExprs().stream().allMatch(e -> 
!e.getType().isStringType()
+                && !e.getType().isCollectionType());
+        TSortAlgorithm algorithm;
+        if (limit > 0 && limit + offset < 1024 && (useTwoPhaseReadOpt || 
hasRuntimePredicate
+                || isFixedLength)) {
+            algorithm = TSortAlgorithm.HEAP_SORT;
+        } else if (limit > 0 && !isFixedLength && limit + offset < 256) {
+            algorithm = TSortAlgorithm.TOPN_SORT;
+        } else {
+            algorithm = TSortAlgorithm.FULL_SORT;
+        }
+        msg.sort_node.setAlgorithm(algorithm);
     }
 
     @Override
@@ -348,4 +363,8 @@ public class SortNode extends PlanNode {
     public void setColocate(boolean colocate) {
         isColocate = colocate;
     }
+
+    public void setHasRuntimePredicate() {
+        this.hasRuntimePredicate = true;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2c2d4437441..9e7431e07c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -65,6 +65,7 @@ import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
+import org.apache.doris.planner.SortNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -3012,10 +3013,12 @@ public class Coordinator implements CoordInterface {
 
         List<TExecPlanFragmentParams> toThrift(int backendNum) {
             List<TExecPlanFragmentParams> paramsList = Lists.newArrayList();
-            Set<Integer> topnFilterSources = scanNodes.stream()
+            Set<SortNode> topnSortNodes = scanNodes.stream()
                     .filter(scanNode -> scanNode instanceof OlapScanNode)
-                    .flatMap(scanNode -> ((OlapScanNode) 
scanNode).getTopnFilterSortNodes().stream())
-                    .map(sort -> 
sort.getId().asInt()).collect(Collectors.toSet());
+                    .flatMap(scanNode -> 
scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
+            topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
+            Set<Integer> topnFilterSources = topnSortNodes.stream().map(
+                    sort -> sort.getId().asInt()).collect(Collectors.toSet());
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
                 TExecPlanFragmentParams params = new TExecPlanFragmentParams();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1281a7fba49..cdc5e49decc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -944,6 +944,12 @@ struct TPreAggregationNode {
   2: required list<Exprs.TExpr> aggregate_exprs
 }
 
+enum TSortAlgorithm {
+   HEAP_SORT,
+   TOPN_SORT,
+   FULL_SORT
+ }
+
 struct TSortNode {
   1: required TSortInfo sort_info
   // Indicates whether the backend service should use topn vs. sorting
@@ -957,6 +963,7 @@ struct TSortNode {
   8: optional bool merge_by_exchange
   9: optional bool is_analytic_sort
   10: optional bool is_colocate
+  11: optional TSortAlgorithm algorithm
 }
 
 enum TopNAlgorithm {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to