This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 28e63a74693e40ff0dcd24a87c7a91054c331bc8 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Sep 5 10:14:11 2024 +0800 [refactor](codestyle) use pointer as return value (#40396) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/pipeline/pipeline_fragment_context.cpp | 16 +++++++++------- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/pipeline/task_scheduler.cpp | 6 +++++- be/src/runtime/query_context.cpp | 14 +++++++------- be/src/runtime/query_context.h | 4 ++-- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 70a3c0d01af..d4a300e58a2 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1819,8 +1819,10 @@ Status PipelineFragmentContext::send_report(bool done) { req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this())); } -size_t PipelineFragmentContext::get_revocable_size(bool& has_running_task) const { - size_t revocable_size = 0; +size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const { + size_t res = 0; + // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe + // here to traverse the vector. for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { if (task->is_running() || task->is_revoking()) { @@ -1828,17 +1830,17 @@ size_t PipelineFragmentContext::get_revocable_size(bool& has_running_task) const << " is running, task: " << (void*)task.get() << ", task->is_revoking(): " << task->is_revoking() << ", " << task->is_running(); - has_running_task = true; + *has_running_task = true; return 0; } - size_t revocable_size_ = task->get_revocable_size(); - if (revocable_size_ > _runtime_state->min_revocable_mem()) { - revocable_size += revocable_size_; + size_t revocable_size = task->get_revocable_size(); + if (revocable_size > _runtime_state->min_revocable_mem()) { + res += revocable_size; } } } - return revocable_size; + return res; } std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 7ea39c7377b..a2c55214ba3 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -117,7 +117,7 @@ public: [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; } - [[nodiscard]] size_t get_revocable_size(bool& has_running_task) const; + [[nodiscard]] size_t get_revocable_size(bool* has_running_task) const; [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 5752256ce57..b61e0041e60 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -58,6 +58,9 @@ TaskScheduler::~TaskScheduler() { Status TaskScheduler::start() { int cores = _task_queue->cores(); + // Init the thread pool with cores+1 thread + // some for pipeline task running + // 1 for spill disk query handler RETURN_IF_ERROR(ThreadPoolBuilder(_name) .set_min_threads(cores + 1) .set_max_threads(cores + 1) @@ -282,7 +285,8 @@ void TaskScheduler::_paused_queries_handler() { continue; } - query_ctx->get_revocable_info(revocable_size, memory_usage, has_running_task); + query_ctx->get_revocable_info(&revocable_size, &memory_usage, + &has_running_task); if (has_running_task) { has_running_query = true; running_query = query_ctx; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 9a04658876d..072443729b2 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -398,24 +398,24 @@ void QueryContext::_report_query_profile() { ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile(); } -void QueryContext::get_revocable_info(size_t& revocable_size, size_t& memory_usage, - bool& has_running_task) const { - revocable_size = 0; +void QueryContext::get_revocable_info(size_t* revocable_size, size_t* memory_usage, + bool* has_running_task) const { + *revocable_size = 0; for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { auto fragment_ctx = fragment_wptr.lock(); if (!fragment_ctx) { continue; } - revocable_size += fragment_ctx->get_revocable_size(has_running_task); + *revocable_size += fragment_ctx->get_revocable_size(has_running_task); // Should wait for all tasks are not running before revoking memory. - if (has_running_task) { + if (*has_running_task) { break; } } - memory_usage = query_mem_tracker->consumption(); + *memory_usage = query_mem_tracker->consumption(); } size_t QueryContext::get_revocable_size() const { @@ -427,7 +427,7 @@ size_t QueryContext::get_revocable_size() const { } bool has_running_task = false; - revocable_size += fragment_ctx->get_revocable_size(has_running_task); + revocable_size += fragment_ctx->get_revocable_size(&has_running_task); // Should wait for all tasks are not running before revoking memory. if (has_running_task) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1edb204f049..eb2beb2ba05 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -234,8 +234,8 @@ public: int get_revoking_tasks_count() const { return _revoking_tasks_count.load(); } - void get_revocable_info(size_t& revocable_size, size_t& memory_usage, - bool& has_running_task) const; + void get_revocable_info(size_t* revocable_size, size_t* memory_usage, + bool* has_running_task) const; size_t get_revocable_size() const; void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org