This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bc38b2fdfb [improvement](new-scan) graceful quit scanner scheduler 
(#12715)
bc38b2fdfb is described below

commit bc38b2fdfb2435f92f732a94f457c1881da3ba77
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Mon Sep 19 08:39:08 2022 +0800

    [improvement](new-scan) graceful quit scanner scheduler (#12715)
---
 be/src/vec/exec/scan/scanner_scheduler.cpp | 29 ++++++++++++++++++++++++-----
 be/src/vec/exec/scan/scanner_scheduler.h   | 14 +++-----------
 2 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 7108143967..9787ecae5d 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -32,11 +32,28 @@ namespace doris::vectorized {
 ScannerScheduler::ScannerScheduler() {}
 
 ScannerScheduler::~ScannerScheduler() {
+    if (!_is_init) {
+        return;
+    }
+
+    for (int i = 0; i < QUEUE_NUM; i++) {
+        _pending_queues[i]->shutdown();
+    }
+
     _is_closed = true;
+
     _scheduler_pool->shutdown();
     _local_scan_thread_pool->shutdown();
     _remote_scan_thread_pool->shutdown();
-    // TODO: safely delete all objects and graceful exit
+
+    _scheduler_pool->wait();
+    _local_scan_thread_pool->join();
+    _remote_scan_thread_pool->join();
+
+    for (int i = 0; i < QUEUE_NUM; i++) {
+        delete _pending_queues[i];
+    }
+    delete[] _pending_queues;
 }
 
 Status ScannerScheduler::init(ExecEnv* env) {
@@ -53,14 +70,16 @@ Status ScannerScheduler::init(ExecEnv* env) {
     }
 
     // 2. local scan thread pool
-    _local_scan_thread_pool = new PriorityWorkStealingThreadPool(
+    _local_scan_thread_pool.reset(new PriorityWorkStealingThreadPool(
             config::doris_scanner_thread_pool_thread_num, 
env->store_paths().size(),
-            config::doris_scanner_thread_pool_queue_size);
+            config::doris_scanner_thread_pool_queue_size));
 
     // 3. remote scan thread pool
-    _remote_scan_thread_pool = new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-                                                      
config::doris_scanner_thread_pool_queue_size);
+    _remote_scan_thread_pool.reset(
+            new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+                                   
config::doris_scanner_thread_pool_queue_size));
 
+    _is_init = true;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index a72fd5021e..f8c1a8f3df 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -40,13 +40,6 @@ namespace doris::vectorized {
 //     Each Scanner will act as a producer, read a group of blocks and put 
them into
 //     the corresponding block queue.
 //     The corresponding ScanNode will act as a consumer to consume blocks 
from the block queue.
-
-using ContextMap = phmap::parallel_flat_hash_map<
-        std::string, std::shared_ptr<ScannerContext>, 
phmap::priv::hash_default_hash<std::string>,
-        phmap::priv::hash_default_eq<std::string>,
-        std::allocator<std::pair<const std::string, 
std::shared_ptr<ScannerContext>>>, 12,
-        std::mutex>;
-
 class Env;
 class ScannerScheduler {
 public:
@@ -82,13 +75,12 @@ private:
     // execution thread pool
     // _local_scan_thread_pool is for local scan task(typically, olap scanner)
     // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, 
etc.)
-    PriorityThreadPool* _local_scan_thread_pool;
-    PriorityThreadPool* _remote_scan_thread_pool;
+    std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
+    std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
 
     // true is the scheduler is closed.
     std::atomic_bool _is_closed = {false};
-
-    ContextMap _context_map;
+    bool _is_init = false;
 };
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to