This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new f2a3deefa2 [branch-1.2][improvement](scan) separate scanner into local and remote scanner pool (#16892) f2a3deefa2 is described below commit f2a3deefa21d5f8166932db45e534740aabdbad1 Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Feb 21 14:13:46 2023 +0800 [branch-1.2][improvement](scan) separate scanner into local and remote scanner pool (#16892) cherry-pick #16891 --- be/src/common/config.h | 2 ++ be/src/vec/exec/scan/new_olap_scanner.h | 4 ++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 27 +++++++++++++++------------ be/src/vec/exec/scan/scanner_scheduler.h | 2 +- be/src/vec/exec/scan/vfile_scanner.h | 4 ---- be/src/vec/exec/scan/vscanner.h | 2 +- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index ec64ca5dfb..f141b1b8eb 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -178,6 +178,8 @@ CONF_Bool(doris_enable_scanner_thread_pool_per_disk, "true"); CONF_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500"); // number of olap scanner thread pool size CONF_Int32(doris_scanner_thread_pool_thread_num, "48"); +// max number of remote scanner thread pool size +CONF_Int32(doris_max_remote_scanner_thread_pool_thread_num, "512"); // number of olap scanner thread pool queue size CONF_Int32(doris_scanner_thread_pool_queue_size, "102400"); // default thrift client connect timeout(in seconds) diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 97ba78aa2b..04d805ee38 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -49,6 +49,10 @@ public: const std::string& scan_disk() const { return _tablet->data_dir()->path(); } + doris::TabletStorageType get_storage_type() override { + return doris::TabletStorageType::STORAGE_TYPE_LOCAL; + } + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _update_counters_before_close() override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index d62909b5bd..f3a27dff3b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -46,10 +46,10 @@ ScannerScheduler::~ScannerScheduler() { _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); + _limited_scan_thread_pool->shutdown(); _scheduler_pool->wait(); _local_scan_thread_pool->join(); - _remote_scan_thread_pool->join(); for (int i = 0; i < QUEUE_NUM; i++) { delete _pending_queues[i]; @@ -76,10 +76,13 @@ Status ScannerScheduler::init(ExecEnv* env) { config::doris_scanner_thread_pool_queue_size, "local_scan")); // 3. remote scan thread pool - _remote_scan_thread_pool.reset( - new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "remote_scan")); + ThreadPoolBuilder("RemoteScanThreadPool") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) // 48 default + .set_max_threads(config::doris_max_remote_scanner_thread_pool_thread_num) // 512 default + .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) + .build(&_remote_scan_thread_pool); + // 4. limited scan thread pool ThreadPoolBuilder("LimitedScanThreadPool") .set_min_threads(config::doris_scanner_thread_pool_thread_num) .set_max_threads(config::doris_scanner_thread_pool_thread_num) @@ -162,20 +165,20 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { } } else { while (iter != this_run.end()) { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - task.priority = nice; - task.queue_id = (*iter)->queue_id(); (*iter)->start_wait_worker_timer(); - TabletStorageType type = (*iter)->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + task.priority = nice; + task.queue_id = (*iter)->queue_id(); ret = _local_scan_thread_pool->offer(task); } else { - ret = _remote_scan_thread_pool->offer(task); + ret = _remote_scan_thread_pool->submit_func( + [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }); } if (ret) { this_run.erase(iter++); diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index a0062500d9..33627fe538 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -81,7 +81,7 @@ private: // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) // _limited_scan_thread_pool is a special pool for queries with resource limit std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool; - std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool; + std::unique_ptr<ThreadPool> _remote_scan_thread_pool; std::unique_ptr<ThreadPool> _limited_scan_thread_pool; // true is the scheduler is closed. diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index aa579232e0..aec6e776ab 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -45,10 +45,6 @@ public: Status prepare(VExprContext** vconjunct_ctx_ptr, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); - doris::TabletStorageType get_storage_type() override { - return doris::TabletStorageType::STORAGE_TYPE_REMOTE; - } - protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index e8817670e7..8920f29921 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -90,7 +90,7 @@ public: int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); } virtual doris::TabletStorageType get_storage_type() { - return doris::TabletStorageType::STORAGE_TYPE_LOCAL; + return doris::TabletStorageType::STORAGE_TYPE_REMOTE; } bool need_to_close() { return _need_to_close; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org