This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 28e63a74693e40ff0dcd24a87c7a91054c331bc8
Author: yiguolei <676222...@qq.com>
AuthorDate: Thu Sep 5 10:14:11 2024 +0800

    [refactor](codestyle) use pointer as return value (#40396)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/pipeline_fragment_context.cpp | 16 +++++++++-------
 be/src/pipeline/pipeline_fragment_context.h   |  2 +-
 be/src/pipeline/task_scheduler.cpp            |  6 +++++-
 be/src/runtime/query_context.cpp              | 14 +++++++-------
 be/src/runtime/query_context.h                |  4 ++--
 5 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 70a3c0d01af..d4a300e58a2 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1819,8 +1819,10 @@ Status PipelineFragmentContext::send_report(bool done) {
             req, 
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
-size_t PipelineFragmentContext::get_revocable_size(bool& has_running_task) 
const {
-    size_t revocable_size = 0;
+size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) 
const {
+    size_t res = 0;
+    // _tasks will be cleared during ~PipelineFragmentContext, so that it's 
safe
+    // here to traverse the vector.
     for (const auto& task_instances : _tasks) {
         for (const auto& task : task_instances) {
             if (task->is_running() || task->is_revoking()) {
@@ -1828,17 +1830,17 @@ size_t 
PipelineFragmentContext::get_revocable_size(bool& has_running_task) const
                                       << " is running, task: " << 
(void*)task.get()
                                       << ", task->is_revoking(): " << 
task->is_revoking() << ", "
                                       << task->is_running();
-                has_running_task = true;
+                *has_running_task = true;
                 return 0;
             }
 
-            size_t revocable_size_ = task->get_revocable_size();
-            if (revocable_size_ > _runtime_state->min_revocable_mem()) {
-                revocable_size += revocable_size_;
+            size_t revocable_size = task->get_revocable_size();
+            if (revocable_size > _runtime_state->min_revocable_mem()) {
+                res += revocable_size;
             }
         }
     }
-    return revocable_size;
+    return res;
 }
 
 std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() 
const {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 7ea39c7377b..a2c55214ba3 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -117,7 +117,7 @@ public:
 
     [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
 
-    [[nodiscard]] size_t get_revocable_size(bool& has_running_task) const;
+    [[nodiscard]] size_t get_revocable_size(bool* has_running_task) const;
 
     [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 5752256ce57..b61e0041e60 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -58,6 +58,9 @@ TaskScheduler::~TaskScheduler() {
 
 Status TaskScheduler::start() {
     int cores = _task_queue->cores();
+    // Init the thread pool with cores+1 thread
+    // some for pipeline task running
+    // 1 for spill disk query handler
     RETURN_IF_ERROR(ThreadPoolBuilder(_name)
                             .set_min_threads(cores + 1)
                             .set_max_threads(cores + 1)
@@ -282,7 +285,8 @@ void TaskScheduler::_paused_queries_handler() {
                         continue;
                     }
 
-                    query_ctx->get_revocable_info(revocable_size, 
memory_usage, has_running_task);
+                    query_ctx->get_revocable_info(&revocable_size, 
&memory_usage,
+                                                  &has_running_task);
                     if (has_running_task) {
                         has_running_query = true;
                         running_query = query_ctx;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 9a04658876d..072443729b2 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -398,24 +398,24 @@ void QueryContext::_report_query_profile() {
     
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
 }
 
-void QueryContext::get_revocable_info(size_t& revocable_size, size_t& 
memory_usage,
-                                      bool& has_running_task) const {
-    revocable_size = 0;
+void QueryContext::get_revocable_info(size_t* revocable_size, size_t* 
memory_usage,
+                                      bool* has_running_task) const {
+    *revocable_size = 0;
     for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
         auto fragment_ctx = fragment_wptr.lock();
         if (!fragment_ctx) {
             continue;
         }
 
-        revocable_size += fragment_ctx->get_revocable_size(has_running_task);
+        *revocable_size += fragment_ctx->get_revocable_size(has_running_task);
 
         // Should wait for all tasks are not running before revoking memory.
-        if (has_running_task) {
+        if (*has_running_task) {
             break;
         }
     }
 
-    memory_usage = query_mem_tracker->consumption();
+    *memory_usage = query_mem_tracker->consumption();
 }
 
 size_t QueryContext::get_revocable_size() const {
@@ -427,7 +427,7 @@ size_t QueryContext::get_revocable_size() const {
         }
 
         bool has_running_task = false;
-        revocable_size += fragment_ctx->get_revocable_size(has_running_task);
+        revocable_size += fragment_ctx->get_revocable_size(&has_running_task);
 
         // Should wait for all tasks are not running before revoking memory.
         if (has_running_task) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 1edb204f049..eb2beb2ba05 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -234,8 +234,8 @@ public:
 
     int get_revoking_tasks_count() const { return 
_revoking_tasks_count.load(); }
 
-    void get_revocable_info(size_t& revocable_size, size_t& memory_usage,
-                            bool& has_running_task) const;
+    void get_revocable_info(size_t* revocable_size, size_t* memory_usage,
+                            bool* has_running_task) const;
     size_t get_revocable_size() const;
 
     void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = 
spill_threshold; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to