This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf963c1875 [fix](BE) Fix inefficient problem in PriorTaskWorkerPool 
(#37169)
dbf963c1875 is described below

commit dbf963c187564e527f3d53aecd192bfcb1701926
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Fri Jul 5 22:41:48 2024 +0800

    [fix](BE) Fix inefficient problem in PriorTaskWorkerPool (#37169)
    
    ## Proposed changes
    
    In the original implementation of `PriorTaskWorkerPool`, although
    multiple threads were launched in the normal pool and high prior pool,
    only one thread was actually working in each pool (running `normal_loop`
    and `high_prior_loop`, respectively). This PR fixes this issue.
---
 be/src/agent/task_worker_pool.cpp | 38 +++++++++++++++-----------------------
 be/src/agent/task_worker_pool.h   |  6 +++---
 2 files changed, 18 insertions(+), 26 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 9b0a9592950..0e851fba17a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -540,26 +540,20 @@ Status TaskWorkerPool::submit_task(const 
TAgentTaskRequest& task) {
 }
 
 PriorTaskWorkerPool::PriorTaskWorkerPool(
-        std::string_view name, int normal_worker_count, int 
high_prior_worker_count,
+        const std::string& name, int normal_worker_count, int 
high_prior_worker_count,
         std::function<void(const TAgentTaskRequest& task)> callback)
         : _callback(std::move(callback)) {
-    auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
-                      .set_min_threads(normal_worker_count)
-                      .set_max_threads(normal_worker_count)
-                      .build(&_normal_pool);
-    CHECK(st.ok()) << name << ": " << st;
-
-    st = _normal_pool->submit_func([this] { normal_loop(); });
-    CHECK(st.ok()) << name << ": " << st;
-
-    st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
-                 .set_min_threads(high_prior_worker_count)
-                 .set_max_threads(high_prior_worker_count)
-                 .build(&_high_prior_pool);
-    CHECK(st.ok()) << name << ": " << st;
+    for (int i = 0; i < normal_worker_count; ++i) {
+        auto st = Thread::create(
+                "Normal", name, [this] { normal_loop(); }, 
&_workers.emplace_back());
+        CHECK(st.ok()) << name << ": " << st;
+    }
 
-    st = _high_prior_pool->submit_func([this] { high_prior_loop(); });
-    CHECK(st.ok()) << name << ": " << st;
+    for (int i = 0; i < high_prior_worker_count; ++i) {
+        auto st = Thread::create(
+                "HighPrior", name, [this] { high_prior_loop(); }, 
&_workers.emplace_back());
+        CHECK(st.ok()) << name << ": " << st;
+    }
 }
 
 PriorTaskWorkerPool::~PriorTaskWorkerPool() {
@@ -578,12 +572,10 @@ void PriorTaskWorkerPool::stop() {
     _normal_condv.notify_all();
     _high_prior_condv.notify_all();
 
-    if (_normal_pool) {
-        _normal_pool->shutdown();
-    }
-
-    if (_high_prior_pool) {
-        _high_prior_pool->shutdown();
+    for (auto&& w : _workers) {
+        if (w) {
+            w->join();
+        }
     }
 }
 
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 514692968b4..f51d6c2a4c0 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -79,7 +79,8 @@ private:
 
 class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
 public:
-    PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int 
high_prior_worker_count,
+    PriorTaskWorkerPool(const std::string& name, int normal_worker_count,
+                        int high_prior_worker_count,
                         std::function<void(const TAgentTaskRequest& task)> 
callback);
 
     ~PriorTaskWorkerPool() override;
@@ -101,8 +102,7 @@ private:
     std::condition_variable _high_prior_condv;
     std::deque<std::unique_ptr<TAgentTaskRequest>> _high_prior_queue;
 
-    std::unique_ptr<ThreadPool> _normal_pool;
-    std::unique_ptr<ThreadPool> _high_prior_pool;
+    std::vector<scoped_refptr<Thread>> _workers;
 
     std::function<void(const TAgentTaskRequest&)> _callback;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to