This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dbf963c1875 [fix](BE) Fix inefficient problem in PriorTaskWorkerPool (#37169) dbf963c1875 is described below commit dbf963c187564e527f3d53aecd192bfcb1701926 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Fri Jul 5 22:41:48 2024 +0800 [fix](BE) Fix inefficient problem in PriorTaskWorkerPool (#37169) ## Proposed changes In the original implementation of `PriorTaskWorkerPool`, although multiple threads were launched in the normal pool and high prior pool, only one thread was actually working in each pool (running `normal_loop` and `high_prior_loop`, respectively). This PR fixes this issue. --- be/src/agent/task_worker_pool.cpp | 38 +++++++++++++++----------------------- be/src/agent/task_worker_pool.h | 6 +++--- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 9b0a9592950..0e851fba17a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -540,26 +540,20 @@ Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { } PriorTaskWorkerPool::PriorTaskWorkerPool( - std::string_view name, int normal_worker_count, int high_prior_worker_count, + const std::string& name, int normal_worker_count, int high_prior_worker_count, std::function<void(const TAgentTaskRequest& task)> callback) : _callback(std::move(callback)) { - auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name)) - .set_min_threads(normal_worker_count) - .set_max_threads(normal_worker_count) - .build(&_normal_pool); - CHECK(st.ok()) << name << ": " << st; - - st = _normal_pool->submit_func([this] { normal_loop(); }); - CHECK(st.ok()) << name << ": " << st; - - st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name)) - .set_min_threads(high_prior_worker_count) - .set_max_threads(high_prior_worker_count) - .build(&_high_prior_pool); - CHECK(st.ok()) << name << ": " << st; + for (int i = 0; i < normal_worker_count; ++i) { + auto st = Thread::create( + "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back()); + CHECK(st.ok()) << name << ": " << st; + } - st = _high_prior_pool->submit_func([this] { high_prior_loop(); }); - CHECK(st.ok()) << name << ": " << st; + for (int i = 0; i < high_prior_worker_count; ++i) { + auto st = Thread::create( + "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back()); + CHECK(st.ok()) << name << ": " << st; + } } PriorTaskWorkerPool::~PriorTaskWorkerPool() { @@ -578,12 +572,10 @@ void PriorTaskWorkerPool::stop() { _normal_condv.notify_all(); _high_prior_condv.notify_all(); - if (_normal_pool) { - _normal_pool->shutdown(); - } - - if (_high_prior_pool) { - _high_prior_pool->shutdown(); + for (auto&& w : _workers) { + if (w) { + w->join(); + } } } diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 514692968b4..f51d6c2a4c0 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -79,7 +79,8 @@ private: class PriorTaskWorkerPool final : public TaskWorkerPoolIf { public: - PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_count, + PriorTaskWorkerPool(const std::string& name, int normal_worker_count, + int high_prior_worker_count, std::function<void(const TAgentTaskRequest& task)> callback); ~PriorTaskWorkerPool() override; @@ -101,8 +102,7 @@ private: std::condition_variable _high_prior_condv; std::deque<std::unique_ptr<TAgentTaskRequest>> _high_prior_queue; - std::unique_ptr<ThreadPool> _normal_pool; - std::unique_ptr<ThreadPool> _high_prior_pool; + std::vector<scoped_refptr<Thread>> _workers; std::function<void(const TAgentTaskRequest&)> _callback; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org