This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new f2a3deefa2 [branch-1.2][improvement](scan) separate scanner into local 
and remote scanner pool (#16892)
f2a3deefa2 is described below

commit f2a3deefa21d5f8166932db45e534740aabdbad1
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Feb 21 14:13:46 2023 +0800

    [branch-1.2][improvement](scan) separate scanner into local and remote 
scanner pool (#16892)
    
    cherry-pick #16891
---
 be/src/common/config.h                     |  2 ++
 be/src/vec/exec/scan/new_olap_scanner.h    |  4 ++++
 be/src/vec/exec/scan/scanner_scheduler.cpp | 27 +++++++++++++++------------
 be/src/vec/exec/scan/scanner_scheduler.h   |  2 +-
 be/src/vec/exec/scan/vfile_scanner.h       |  4 ----
 be/src/vec/exec/scan/vscanner.h            |  2 +-
 6 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index ec64ca5dfb..f141b1b8eb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -178,6 +178,8 @@ CONF_Bool(doris_enable_scanner_thread_pool_per_disk, 
"true");
 CONF_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
 // number of olap scanner thread pool size
 CONF_Int32(doris_scanner_thread_pool_thread_num, "48");
+// max number of remote scanner thread pool size
+CONF_Int32(doris_max_remote_scanner_thread_pool_thread_num, "512");
 // number of olap scanner thread pool queue size
 CONF_Int32(doris_scanner_thread_pool_queue_size, "102400");
 // default thrift client connect timeout(in seconds)
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 97ba78aa2b..04d805ee38 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -49,6 +49,10 @@ public:
 
     const std::string& scan_disk() const { return _tablet->data_dir()->path(); 
}
 
+    doris::TabletStorageType get_storage_type() override {
+        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
+    }
+
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
     void _update_counters_before_close() override;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d62909b5bd..f3a27dff3b 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -46,10 +46,10 @@ ScannerScheduler::~ScannerScheduler() {
     _scheduler_pool->shutdown();
     _local_scan_thread_pool->shutdown();
     _remote_scan_thread_pool->shutdown();
+    _limited_scan_thread_pool->shutdown();
 
     _scheduler_pool->wait();
     _local_scan_thread_pool->join();
-    _remote_scan_thread_pool->join();
 
     for (int i = 0; i < QUEUE_NUM; i++) {
         delete _pending_queues[i];
@@ -76,10 +76,13 @@ Status ScannerScheduler::init(ExecEnv* env) {
             config::doris_scanner_thread_pool_queue_size, "local_scan"));
 
     // 3. remote scan thread pool
-    _remote_scan_thread_pool.reset(
-            new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-                                   
config::doris_scanner_thread_pool_queue_size, "remote_scan"));
+    ThreadPoolBuilder("RemoteScanThreadPool")
+            .set_min_threads(config::doris_scanner_thread_pool_thread_num)     
       // 48 default
+            
.set_max_threads(config::doris_max_remote_scanner_thread_pool_thread_num) // 
512 default
+            .set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
+            .build(&_remote_scan_thread_pool);
 
+    // 4. limited scan thread pool
     ThreadPoolBuilder("LimitedScanThreadPool")
             .set_min_threads(config::doris_scanner_thread_pool_thread_num)
             .set_max_threads(config::doris_scanner_thread_pool_thread_num)
@@ -162,20 +165,20 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
         }
     } else {
         while (iter != this_run.end()) {
-            PriorityThreadPool::Task task;
-            task.work_function = [this, scanner = *iter, ctx] {
-                this->_scanner_scan(this, ctx, scanner);
-            };
-            task.priority = nice;
-            task.queue_id = (*iter)->queue_id();
             (*iter)->start_wait_worker_timer();
-
             TabletStorageType type = (*iter)->get_storage_type();
             bool ret = false;
             if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+                PriorityThreadPool::Task task;
+                task.work_function = [this, scanner = *iter, ctx] {
+                    this->_scanner_scan(this, ctx, scanner);
+                };
+                task.priority = nice;
+                task.queue_id = (*iter)->queue_id();
                 ret = _local_scan_thread_pool->offer(task);
             } else {
-                ret = _remote_scan_thread_pool->offer(task);
+                ret = _remote_scan_thread_pool->submit_func(
+                        [this, scanner = *iter, ctx] { 
this->_scanner_scan(this, ctx, scanner); });
             }
             if (ret) {
                 this_run.erase(iter++);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index a0062500d9..33627fe538 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -81,7 +81,7 @@ private:
     // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, 
etc.)
     // _limited_scan_thread_pool is a special pool for queries with resource 
limit
     std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
-    std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
+    std::unique_ptr<ThreadPool> _remote_scan_thread_pool;
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
     // true is the scheduler is closed.
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index aa579232e0..aec6e776ab 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -45,10 +45,6 @@ public:
     Status prepare(VExprContext** vconjunct_ctx_ptr,
                    std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
-    doris::TabletStorageType get_storage_type() override {
-        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
-    }
-
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) 
override;
 
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index e8817670e7..8920f29921 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -90,7 +90,7 @@ public:
     int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); }
 
     virtual doris::TabletStorageType get_storage_type() {
-        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
+        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
     }
 
     bool need_to_close() { return _need_to_close; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to