This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d38693f39a5 [refactor](spill) Refactor logics of spilling (#37120) d38693f39a5 is described below commit d38693f39a5d13da63be3e883104a3e92d01bd6e Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Jul 4 16:52:54 2024 +0800 [refactor](spill) Refactor logics of spilling (#37120) Refactor spilling sort operator. Remove redundant code. --- be/src/agent/workload_group_listener.cpp | 12 +++--- be/src/pipeline/exec/sort_sink_operator.cpp | 34 ++++----------- be/src/pipeline/exec/sort_sink_operator.h | 11 ++--- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 44 +++++-------------- be/src/pipeline/exec/spill_sort_sink_operator.h | 1 - .../pipeline/exec/spill_sort_source_operator.cpp | 10 ++--- be/src/pipeline/pipeline_fragment_context.cpp | 8 +++- be/src/pipeline/pipeline_task.cpp | 6 +-- be/src/pipeline/pipeline_task.h | 12 ++++-- be/src/runtime/workload_group/workload_group.cpp | 50 +++++++++++----------- be/src/runtime/workload_group/workload_group.h | 34 ++++++++------- be/src/vec/common/sort/sorter.h | 2 +- .../java/org/apache/doris/planner/SortNode.java | 19 ++++++++ .../main/java/org/apache/doris/qe/Coordinator.java | 9 ++-- gensrc/thrift/PlanNodes.thrift | 7 +++ 15 files changed, 124 insertions(+), 135 deletions(-) diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 15c61be5156..61af4543196 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -35,16 +35,18 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi is_set_workload_group_info = true; // 1 parse topic info to group info - WorkloadGroupInfo workload_group_info; - Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info, - &workload_group_info); + WorkloadGroupInfo workload_group_info = + WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info); // it means FE has this wg, but may parse failed, so we should not delete it. if (workload_group_info.id != 0) { current_wg_ids.insert(workload_group_info.id); } - if (!ret.ok()) { + if (!workload_group_info.valid) { LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id=" - << workload_group_info.id << ", reason:" << ret.to_string(); + << workload_group_info.id << ", reason: [tworkload_group_info.__isset.id: " + << topic_info.workload_group_info.__isset.id + << ", tworkload_group_info.__isset.version: " + << topic_info.workload_group_info.__isset.version << "]"; continue; } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index f2224383f86..7230116a1a0 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -43,19 +43,19 @@ Status SortSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); switch (p._algorithm) { - case SortAlgorithm::HEAP_SORT: { + case TSortAlgorithm::HEAP_SORT: { _shared_state->sorter = vectorized::HeapSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc()); break; } - case SortAlgorithm::TOPN_SORT: { + case TSortAlgorithm::TOPN_SORT: { _shared_state->sorter = vectorized::TopNSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc(), state, _profile); break; } - case SortAlgorithm::FULL_SORT: { + case TSortAlgorithm::FULL_SORT: { _shared_state->sorter = vectorized::FullSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc(), state, _profile); @@ -73,14 +73,13 @@ Status SortSinkLocalState::open(RuntimeState* state) { } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs, bool require_bucket_distribution) + const DescriptorTbl& descs, + const bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), - _reuse_mem(true), _limit(tnode.limit), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), - _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), _is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate), _require_bucket_distribution(require_bucket_distribution), @@ -88,7 +87,10 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP ? tnode.sort_node.is_analytic_sort : false), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector<TExpr> {}) {} + : std::vector<TExpr> {}), + _algorithm(tnode.sort_node.__isset.algorithm ? tnode.sort_node.algorithm + : TSortAlgorithm::FULL_SORT), + _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {} Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); @@ -105,24 +107,6 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { } Status SortSinkOperatorX::prepare(RuntimeState* state) { - const auto& row_desc = _child_x->row_desc(); - - // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap sort in priority. - // To do heap sorting, each income block will be filtered by heap-top row. There will be some - // `memcpy` operations. To ensure heap sort will not incur performance fallback, we should - // exclude cases which incoming blocks has string column which is sensitive to operations like - // `filter` and `memcpy` - if (_limit > 0 && _limit + _offset < vectorized::HeapSorter::HEAP_SORT_THRESHOLD && - (_use_two_phase_read || state->get_query_ctx()->has_runtime_predicate(_node_id) || - !row_desc.has_varlen_slots())) { - _algorithm = SortAlgorithm::HEAP_SORT; - _reuse_mem = false; - } else if (_limit > 0 && row_desc.has_varlen_slots() && - _limit + _offset < vectorized::TopNSorter::TOPN_SORT_THRESHOLD) { - _algorithm = SortAlgorithm::TOPN_SORT; - } else { - _algorithm = SortAlgorithm::FULL_SORT; - } return _vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index fa59b1715dc..b842a56f2ad 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -24,8 +24,6 @@ namespace doris::pipeline { -enum class SortAlgorithm { HEAP_SORT, TOPN_SORT, FULL_SORT }; - class SortSinkOperatorX; class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> { @@ -53,7 +51,7 @@ private: class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> { public: SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs, bool require_bucket_distribution); + const DescriptorTbl& descs, const bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<SortSinkLocalState>::_name); @@ -77,8 +75,6 @@ public: } bool require_data_distribution() const override { return _is_colocate; } - bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } - size_t get_revocable_mem_size(RuntimeState* state) const; Status prepare_for_spill(RuntimeState* state); @@ -99,17 +95,16 @@ private: std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; - bool _reuse_mem; const int64_t _limit; - SortAlgorithm _algorithm; const RowDescriptor _row_descriptor; - const bool _use_two_phase_read; const bool _merge_by_exchange; const bool _is_colocate = false; const bool _require_bucket_distribution = false; const bool _is_analytic_sort = false; const std::vector<TExpr> _partition_exprs; + const TSortAlgorithm::type _algorithm; + const bool _reuse_mem; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index b7fae82ca54..4c6eb290ef1 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -25,9 +25,8 @@ namespace doris::pipeline { SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { - _finish_dependency = - std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), - parent->get_name() + "_SPILL_DEPENDENCY", true); + _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), + parent->get_name() + "_SPILL_DEPENDENCY"); } Status SpillSortSinkLocalState::init(doris::RuntimeState* state, @@ -40,12 +39,8 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* state, RETURN_IF_ERROR(setup_in_memory_sort_op(state)); - auto& parent = Base::_parent->template cast<Parent>(); - Base::_shared_state->enable_spill = parent._enable_spill; - Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill(parent._enable_spill); - if (parent._enable_spill) { - _finish_dependency->block(); - } + Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill(); + _finish_dependency->block(); return Status::OK(); } @@ -78,10 +73,7 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { } Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { - auto& parent = Base::_parent->template cast<Parent>(); - if (parent._enable_spill) { - dec_running_big_mem_op_num(state); - } + dec_running_big_mem_op_num(state); return Status::OK(); } Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { @@ -133,8 +125,6 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) Status SpillSortSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::prepare(state)); RETURN_IF_ERROR(_sort_sink_operator->prepare(state)); - _enable_spill = _sort_sink_operator->is_full_sort(); - LOG(INFO) << "spill sort sink, enable spill: " << _enable_spill; return Status::OK(); } Status SpillSortSinkOperatorX::open(RuntimeState* state) { @@ -142,16 +132,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) { return _sort_sink_operator->open(state); } Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) { - if (!_enable_spill) { - return Status::OK(); - } auto& local_state = get_local_state(state); return local_state.revoke_memory(state); } size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { - if (!_enable_spill) { - return 0; - } auto& local_state = get_local_state(state); if (!local_state.Base::_shared_state->sink_status.ok()) { return UINT64_MAX; @@ -161,9 +145,7 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); - if (_enable_spill) { - local_state.inc_running_big_mem_op_num(state); - } + local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); @@ -177,17 +159,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc local_state._mem_tracker->set_consumption( local_state._shared_state->in_mem_shared_state->sorter->data_size()); if (eos) { - if (_enable_spill) { - if (local_state._shared_state->is_spilled) { - if (revocable_mem_size(state) > 0) { - RETURN_IF_ERROR(revoke_memory(state)); - } else { - local_state._dependency->set_ready_to_read(); - local_state._finish_dependency->set_ready(); - } + if (local_state._shared_state->is_spilled) { + if (revocable_mem_size(state) > 0) { + RETURN_IF_ERROR(revoke_memory(state)); } else { - RETURN_IF_ERROR( - local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read()); local_state._dependency->set_ready_to_read(); local_state._finish_dependency->set_ready(); } @@ -195,6 +170,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc RETURN_IF_ERROR( local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read()); local_state._dependency->set_ready_to_read(); + local_state._finish_dependency->set_ready(); } } return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 5347f22d11f..c5b70d6fcea 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -96,6 +96,5 @@ public: private: friend class SpillSortSinkLocalState; std::unique_ptr<SortSinkOperatorX> _sort_sink_operator; - bool _enable_spill = false; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index b322f33caa2..72304291f6d 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -66,9 +66,7 @@ Status SpillSortLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - if (Base::_shared_state->enable_spill) { - dec_running_big_mem_op_num(state); - } + dec_running_big_mem_op_num(state); return Base::close(state); } int SpillSortLocalState::_calc_spill_blocks_to_merge() const { @@ -274,13 +272,11 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc local_state._current_merging_streams.clear(); } }}; - if (local_state.Base::_shared_state->enable_spill) { - local_state.inc_running_big_mem_op_num(state); - } + local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); - if (local_state.Base::_shared_state->enable_spill && local_state._shared_state->is_spilled) { + if (local_state._shared_state->is_spilled) { if (!local_state._merger) { local_state._status = local_state.initiate_merge_sort_spill_streams(state); return local_state._status; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0968de7951e..9ef551df6db 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -105,6 +105,8 @@ #include "util/container_util.hpp" #include "util/debug_util.h" #include "util/uid_util.h" +#include "vec/common/sort/heap_sorter.h" +#include "vec/common/sort/topn_sorter.h" #include "vec/runtime/vdata_stream_mgr.h" namespace doris::pipeline { @@ -1332,7 +1334,9 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo break; } case TPlanNodeType::SORT_NODE: { - if (_runtime_state->enable_sort_spill()) { + const auto should_spill = _runtime_state->enable_sort_spill() && + tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT; + if (should_spill) { op.reset(new SpillSortSourceOperatorX(pool, tnode, next_operator_id(), descs)); } else { op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), descs)); @@ -1347,7 +1351,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - if (_runtime_state->enable_sort_spill()) { + if (should_spill) { sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } else { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 52951e1c9c0..20c225dcba6 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -59,8 +59,8 @@ PipelineTask::PipelineTask( _fragment_context(fragment_context), _parent_profile(parent_profile), _operators(pipeline->operator_xs()), - _source(_operators.front()), - _root(_operators.back()), + _source(_operators.front().get()), + _root(_operators.back().get()), _sink(pipeline->sink_shared_pointer()), _le_state_map(std::move(le_state_map)), _task_idx(task_idx), @@ -414,7 +414,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m DCHECK(big_memory_operator_num >= 0); int64_t mem_limit_of_op; if (0 == big_memory_operator_num) { - mem_limit_of_op = int64_t(query_weighted_limit * 0.8); + return false; } else { mem_limit_of_op = query_weighted_limit / big_memory_operator_num; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 9983b315e82..63f464c03ad 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -132,8 +132,6 @@ public: DataSinkOperatorXPtr sink() const { return _sink; } - OperatorXPtr source() const { return _source; } - int task_id() const { return _index; }; bool is_finalized() const { return _finalized; } @@ -178,6 +176,12 @@ public: void set_core_id(int core_id) { this->_core_id = core_id; } int get_core_id() const { return this->_core_id; } + /** + * Return true if: + * 1. `enable_force_spill` is true which forces this task to spill data. + * 2. Or memory consumption reaches the high water mark of current workload group (80% of memory limitation by default) and revocable_mem_bytes is bigger than min_revocable_mem_bytes. + * 3. Or memory consumption is higher than the low water mark of current workload group (50% of memory limitation by default) and `query_weighted_consumption >= query_weighted_limit` and revocable memory is big enough. + */ static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); void put_in_runnable_queue() { @@ -278,8 +282,8 @@ private: MonotonicStopWatch _pipeline_task_watcher; OperatorXs _operators; // left is _source, right is _root - OperatorXPtr _source; - OperatorXPtr _root; + OperatorXBase* _source; + OperatorXBase* _root; DataSinkOperatorXPtr _sink; // `_read_dependencies` is stored as same order as `_operators` diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 1c496cde8d0..64a5c7aeffb 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -245,46 +245,41 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, return freed_mem; } -Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info, - WorkloadGroupInfo* workload_group_info) { +WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( + const TWorkloadGroupInfo& tworkload_group_info) { // 1 id - int tg_id = 0; + uint64_t tg_id = 0; if (tworkload_group_info.__isset.id) { tg_id = tworkload_group_info.id; } else { - return Status::InternalError<false>("workload group id is required"); + return {.valid = false}; } - workload_group_info->id = tg_id; // 2 name std::string name = "INVALID_NAME"; if (tworkload_group_info.__isset.name) { name = tworkload_group_info.name; } - workload_group_info->name = name; // 3 version int version = 0; if (tworkload_group_info.__isset.version) { version = tworkload_group_info.version; } else { - return Status::InternalError<false>("workload group version is required"); + return {.valid = false}; } - workload_group_info->version = version; // 4 cpu_share uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE; if (tworkload_group_info.__isset.cpu_share) { cpu_share = tworkload_group_info.cpu_share; } - workload_group_info->cpu_share = cpu_share; // 5 cpu hard limit int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE; if (tworkload_group_info.__isset.cpu_hard_limit) { cpu_hard_limit = tworkload_group_info.cpu_hard_limit; } - workload_group_info->cpu_hard_limit = cpu_hard_limit; // 6 mem_limit std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE; @@ -294,44 +289,37 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g bool is_percent = true; int64_t mem_limit = ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent); - workload_group_info->memory_limit = mem_limit; // 7 mem overcommit bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE; if (tworkload_group_info.__isset.enable_memory_overcommit) { enable_memory_overcommit = tworkload_group_info.enable_memory_overcommit; } - workload_group_info->enable_memory_overcommit = enable_memory_overcommit; // 8 cpu soft limit or hard limit bool enable_cpu_hard_limit = false; if (tworkload_group_info.__isset.enable_cpu_hard_limit) { enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit; } - workload_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit; // 9 scan thread num - workload_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num; + int scan_thread_num = config::doris_scanner_thread_pool_thread_num; if (tworkload_group_info.__isset.scan_thread_num && tworkload_group_info.scan_thread_num > 0) { - workload_group_info->scan_thread_num = tworkload_group_info.scan_thread_num; + scan_thread_num = tworkload_group_info.scan_thread_num; } // 10 max remote scan thread num - workload_group_info->max_remote_scan_thread_num = - vectorized::ScannerScheduler::get_remote_scan_thread_num(); + int max_remote_scan_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num(); if (tworkload_group_info.__isset.max_remote_scan_thread_num && tworkload_group_info.max_remote_scan_thread_num > 0) { - workload_group_info->max_remote_scan_thread_num = - tworkload_group_info.max_remote_scan_thread_num; + max_remote_scan_thread_num = tworkload_group_info.max_remote_scan_thread_num; } // 11 min remote scan thread num - workload_group_info->min_remote_scan_thread_num = - vectorized::ScannerScheduler::get_remote_scan_thread_num(); + int min_remote_scan_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num(); if (tworkload_group_info.__isset.min_remote_scan_thread_num && tworkload_group_info.min_remote_scan_thread_num > 0) { - workload_group_info->min_remote_scan_thread_num = - tworkload_group_info.min_remote_scan_thread_num; + min_remote_scan_thread_num = tworkload_group_info.min_remote_scan_thread_num; } // 12 spill low watermark @@ -339,16 +327,26 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g if (tworkload_group_info.__isset.spill_threshold_low_watermark) { spill_low_watermark = tworkload_group_info.spill_threshold_low_watermark; } - workload_group_info->spill_low_watermark = spill_low_watermark; // 13 spil high watermark int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE; if (tworkload_group_info.__isset.spill_threshold_high_watermark) { spill_high_watermark = tworkload_group_info.spill_threshold_high_watermark; } - workload_group_info->spill_high_watermark = spill_high_watermark; - return Status::OK(); + return {tg_id, + name, + cpu_share, + mem_limit, + enable_memory_overcommit, + version, + cpu_hard_limit, + enable_cpu_hard_limit, + scan_thread_num, + max_remote_scan_thread_num, + min_remote_scan_thread_num, + spill_low_watermark, + spill_high_watermark}; } void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 971cc1cb023..a82efab0904 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -168,7 +168,9 @@ private: const uint64_t _id; std::string _name; int64_t _version; - int64_t _memory_limit; // bytes + int64_t _memory_limit; // bytes + // `_weighted_mem_used` is a rough memory usage in this group, + // because we can only get a precise memory usage by MemTracker which is not include page cache. std::atomic_int64_t _weighted_mem_used = 0; // bytes bool _enable_memory_overcommit; std::atomic<uint64_t> _cpu_share; @@ -197,25 +199,25 @@ private: using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>; struct WorkloadGroupInfo { - uint64_t id; - std::string name; - uint64_t cpu_share; - int64_t memory_limit; - bool enable_memory_overcommit; - int64_t version; - int cpu_hard_limit; - bool enable_cpu_hard_limit; - int scan_thread_num; - int max_remote_scan_thread_num; - int min_remote_scan_thread_num; - int spill_low_watermark; - int spill_high_watermark; + const uint64_t id = 0; + const std::string name; + const uint64_t cpu_share = 0; + const int64_t memory_limit = 0; + const bool enable_memory_overcommit = false; + const int64_t version = 0; + const int cpu_hard_limit = 0; + const bool enable_cpu_hard_limit = false; + const int scan_thread_num = 0; + const int max_remote_scan_thread_num = 0; + const int min_remote_scan_thread_num = 0; + const int spill_low_watermark = 0; + const int spill_high_watermark = 0; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; + const bool valid = true; - static Status parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info, - WorkloadGroupInfo* workload_group_info); + static WorkloadGroupInfo parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info); }; } // namespace doris diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 2525ca8c0c1..478e91c0783 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -136,7 +136,7 @@ public: int64_t limit() const { return _limit; } int64_t offset() const { return _offset; } - void set_enable_spill(bool b) { _enable_spill = b; } + void set_enable_spill() { _enable_spill = true; } protected: Status partial_sort(Block& src_block, Block& dest_block); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 24b384d4453..4cdc04d1f1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -34,6 +34,7 @@ import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TSortAlgorithm; import org.apache.doris.thrift.TSortInfo; import org.apache.doris.thrift.TSortNode; @@ -63,6 +64,7 @@ public class SortNode extends PlanNode { private final boolean useTopN; private boolean useTopnOpt = false; private boolean useTwoPhaseReadOpt; + private boolean hasRuntimePredicate = false; // If mergeByexchange is set to true, the sort information is pushed to the // exchange node, and the sort node is used for the ORDER BY . @@ -323,6 +325,19 @@ public class SortNode extends PlanNode { msg.sort_node.setMergeByExchange(this.mergeByexchange); msg.sort_node.setIsAnalyticSort(isAnalyticSort); msg.sort_node.setIsColocate(isColocate); + + boolean isFixedLength = info.getOrderingExprs().stream().allMatch(e -> !e.getType().isStringType() + && !e.getType().isCollectionType()); + TSortAlgorithm algorithm; + if (limit > 0 && limit + offset < 1024 && (useTwoPhaseReadOpt || hasRuntimePredicate + || isFixedLength)) { + algorithm = TSortAlgorithm.HEAP_SORT; + } else if (limit > 0 && !isFixedLength && limit + offset < 256) { + algorithm = TSortAlgorithm.TOPN_SORT; + } else { + algorithm = TSortAlgorithm.FULL_SORT; + } + msg.sort_node.setAlgorithm(algorithm); } @Override @@ -348,4 +363,8 @@ public class SortNode extends PlanNode { public void setColocate(boolean colocate) { isColocate = colocate; } + + public void setHasRuntimePredicate() { + this.hasRuntimePredicate = true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 2c2d4437441..9e7431e07c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -65,6 +65,7 @@ import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SetOperationNode; +import org.apache.doris.planner.SortNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; @@ -3012,10 +3013,12 @@ public class Coordinator implements CoordInterface { List<TExecPlanFragmentParams> toThrift(int backendNum) { List<TExecPlanFragmentParams> paramsList = Lists.newArrayList(); - Set<Integer> topnFilterSources = scanNodes.stream() + Set<SortNode> topnSortNodes = scanNodes.stream() .filter(scanNode -> scanNode instanceof OlapScanNode) - .flatMap(scanNode -> ((OlapScanNode) scanNode).getTopnFilterSortNodes().stream()) - .map(sort -> sort.getId().asInt()).collect(Collectors.toSet()); + .flatMap(scanNode -> scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet()); + topnSortNodes.forEach(SortNode::setHasRuntimePredicate); + Set<Integer> topnFilterSources = topnSortNodes.stream().map( + sort -> sort.getId().asInt()).collect(Collectors.toSet()); for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); TExecPlanFragmentParams params = new TExecPlanFragmentParams(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1281a7fba49..cdc5e49decc 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -944,6 +944,12 @@ struct TPreAggregationNode { 2: required list<Exprs.TExpr> aggregate_exprs } +enum TSortAlgorithm { + HEAP_SORT, + TOPN_SORT, + FULL_SORT + } + struct TSortNode { 1: required TSortInfo sort_info // Indicates whether the backend service should use topn vs. sorting @@ -957,6 +963,7 @@ struct TSortNode { 8: optional bool merge_by_exchange 9: optional bool is_analytic_sort 10: optional bool is_colocate + 11: optional TSortAlgorithm algorithm } enum TopNAlgorithm { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org