This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bc38b2fdfb [improvement](new-scan) graceful quit scanner scheduler (#12715) bc38b2fdfb is described below commit bc38b2fdfb2435f92f732a94f457c1881da3ba77 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Mon Sep 19 08:39:08 2022 +0800 [improvement](new-scan) graceful quit scanner scheduler (#12715) --- be/src/vec/exec/scan/scanner_scheduler.cpp | 29 ++++++++++++++++++++++++----- be/src/vec/exec/scan/scanner_scheduler.h | 14 +++----------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7108143967..9787ecae5d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -32,11 +32,28 @@ namespace doris::vectorized { ScannerScheduler::ScannerScheduler() {} ScannerScheduler::~ScannerScheduler() { + if (!_is_init) { + return; + } + + for (int i = 0; i < QUEUE_NUM; i++) { + _pending_queues[i]->shutdown(); + } + _is_closed = true; + _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); - // TODO: safely delete all objects and graceful exit + + _scheduler_pool->wait(); + _local_scan_thread_pool->join(); + _remote_scan_thread_pool->join(); + + for (int i = 0; i < QUEUE_NUM; i++) { + delete _pending_queues[i]; + } + delete[] _pending_queues; } Status ScannerScheduler::init(ExecEnv* env) { @@ -53,14 +70,16 @@ Status ScannerScheduler::init(ExecEnv* env) { } // 2. local scan thread pool - _local_scan_thread_pool = new PriorityWorkStealingThreadPool( + _local_scan_thread_pool.reset(new PriorityWorkStealingThreadPool( config::doris_scanner_thread_pool_thread_num, env->store_paths().size(), - config::doris_scanner_thread_pool_queue_size); + config::doris_scanner_thread_pool_queue_size)); // 3. remote scan thread pool - _remote_scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size); + _remote_scan_thread_pool.reset( + new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size)); + _is_init = true; return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index a72fd5021e..f8c1a8f3df 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -40,13 +40,6 @@ namespace doris::vectorized { // Each Scanner will act as a producer, read a group of blocks and put them into // the corresponding block queue. // The corresponding ScanNode will act as a consumer to consume blocks from the block queue. - -using ContextMap = phmap::parallel_flat_hash_map< - std::string, std::shared_ptr<ScannerContext>, phmap::priv::hash_default_hash<std::string>, - phmap::priv::hash_default_eq<std::string>, - std::allocator<std::pair<const std::string, std::shared_ptr<ScannerContext>>>, 12, - std::mutex>; - class Env; class ScannerScheduler { public: @@ -82,13 +75,12 @@ private: // execution thread pool // _local_scan_thread_pool is for local scan task(typically, olap scanner) // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) - PriorityThreadPool* _local_scan_thread_pool; - PriorityThreadPool* _remote_scan_thread_pool; + std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool; + std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool; // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; - - ContextMap _context_map; + bool _is_init = false; }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org