This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c0bb2e33a8 [improvement](scan) separate scanner into local and remote scanner pool (#16891) c0bb2e33a8 is described below commit c0bb2e33a805617ea3c30441527c14dd77368ab2 Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Feb 21 14:13:09 2023 +0800 [improvement](scan) separate scanner into local and remote scanner pool (#16891) There are 2 kinds for scanner thread pool, local and remote. Local is for local file read, specially for olap scanner. Remote is for other external data source, such as file scanner, jdbc scanner. This PR mainly changes: For olap scanner, use cold or hot rowset to decide whether to use local or remote pool. For other scanner, user remote pool by default. Add a new BE config doris_max_remote_scanner_thread_pool_thread_num, default is 512, indicate the max thread number of the remote scanner thread pool This will alleviate the problem of interaction between olap queries with load job and external queries. --- be/src/common/config.h | 5 ++++- be/src/vec/exec/scan/new_olap_scanner.cpp | 15 +++++++++++++ be/src/vec/exec/scan/new_olap_scanner.h | 2 ++ be/src/vec/exec/scan/scanner_scheduler.cpp | 28 ++++++++++++++---------- be/src/vec/exec/scan/scanner_scheduler.h | 2 +- be/src/vec/exec/scan/vfile_scanner.h | 4 ---- be/src/vec/exec/scan/vscanner.h | 2 +- docs/en/docs/admin-manual/config/be-config.md | 6 +++++ docs/zh-CN/docs/admin-manual/config/be-config.md | 6 +++++ 9 files changed, 51 insertions(+), 19 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8168600890..8516ea4fa9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -178,8 +178,11 @@ CONF_mInt32(status_report_interval, "5"); CONF_Bool(doris_enable_scanner_thread_pool_per_disk, "true"); // the timeout of a work thread to wait the blocking priority queue to get a task CONF_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500"); -// number of olap scanner thread pool size +// number of scanner thread pool size for olap table +// and the min thread num of remote scanner thread pool CONF_Int32(doris_scanner_thread_pool_thread_num, "48"); +// max number of remote scanner thread pool size +CONF_Int32(doris_max_remote_scanner_thread_pool_thread_num, "512"); // number of olap scanner thread pool queue size CONF_Int32(doris_scanner_thread_pool_queue_size, "102400"); // default thrift client connect timeout(in seconds) diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 2d73a5b339..272fd6b6cc 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -385,6 +385,21 @@ Status NewOlapScanner::_init_return_columns() { return Status::OK(); } +doris::TabletStorageType NewOlapScanner::get_storage_type() { + int local_reader = 0; + for (const auto& reader : _tablet_reader_params.rs_readers) { + local_reader += reader->rowset()->is_local(); + } + int total_reader = _tablet_reader_params.rs_readers.size(); + + if (local_reader == total_reader) { + return doris::TabletStorageType::STORAGE_TYPE_LOCAL; + } else if (local_reader == 0) { + return doris::TabletStorageType::STORAGE_TYPE_REMOTE; + } + return doris::TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL; +} + Status NewOlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { // Read one block from block reader // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface, diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index b22537a09f..1b3bfcb364 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -50,6 +50,8 @@ public: void set_compound_filters(const std::vector<TCondition>& compound_filters); + doris::TabletStorageType get_storage_type() override; + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _update_counters_before_close() override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e2d9881236..0fc72a90b6 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -48,10 +48,10 @@ ScannerScheduler::~ScannerScheduler() { _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); + _limited_scan_thread_pool->shutdown(); _scheduler_pool->wait(); _local_scan_thread_pool->join(); - _remote_scan_thread_pool->join(); for (int i = 0; i < QUEUE_NUM; i++) { delete _pending_queues[i]; @@ -78,10 +78,13 @@ Status ScannerScheduler::init(ExecEnv* env) { config::doris_scanner_thread_pool_queue_size, "local_scan")); // 3. remote scan thread pool - _remote_scan_thread_pool.reset( - new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "remote_scan")); + ThreadPoolBuilder("RemoteScanThreadPool") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) // 48 default + .set_max_threads(config::doris_max_remote_scanner_thread_pool_thread_num) // 512 default + .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) + .build(&_remote_scan_thread_pool); + // 4. limited scan thread pool ThreadPoolBuilder("LimitedScanThreadPool") .set_min_threads(config::doris_scanner_thread_pool_thread_num) .set_max_threads(config::doris_scanner_thread_pool_thread_num) @@ -172,20 +175,21 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { } } else { while (iter != this_run.end()) { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - task.priority = nice; - task.queue_id = (*iter)->queue_id(); (*iter)->start_wait_worker_timer(); - TabletStorageType type = (*iter)->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + task.priority = nice; + task.queue_id = (*iter)->queue_id(); ret = _local_scan_thread_pool->offer(task); } else { - ret = _remote_scan_thread_pool->offer(task); + ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }); } if (ret) { this_run.erase(iter++); diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index a0062500d9..33627fe538 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -81,7 +81,7 @@ private: // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) // _limited_scan_thread_pool is a special pool for queries with resource limit std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool; - std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool; + std::unique_ptr<ThreadPool> _remote_scan_thread_pool; std::unique_ptr<ThreadPool> _limited_scan_thread_pool; // true is the scheduler is closed. diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 2ca6f2a6d5..096892e8e6 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -44,10 +44,6 @@ public: Status prepare(VExprContext** vconjunct_ctx_ptr, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); - doris::TabletStorageType get_storage_type() override { - return doris::TabletStorageType::STORAGE_TYPE_REMOTE; - } - protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index a909e91ee3..86c9cd60e5 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -90,7 +90,7 @@ public: int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); } virtual doris::TabletStorageType get_storage_type() { - return doris::TabletStorageType::STORAGE_TYPE_LOCAL; + return doris::TabletStorageType::STORAGE_TYPE_REMOTE; } bool need_to_close() { return _need_to_close; } diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index e224649bd9..685fcec6b3 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -385,6 +385,12 @@ There are two ways to configure BE configuration items: * Description: The number of threads in the Scanner thread pool. In Doris' scanning tasks, each Scanner will be submitted as a thread task to the thread pool to be scheduled. This parameter determines the size of the Scanner thread pool. * Default value: 48 +#### `doris_max_remote_scanner_thread_pool_thread_num` + +* Type: int32 +* Description: Max thread number of Remote scanner thread pool. Remote scanner thread pool is used for scan task of all external data sources. +* Default: 512 + #### `enable_prefetch` * Type: bool diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 65531b8824..3e50d421e7 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -396,6 +396,12 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下 * 描述:Scanner线程池线程数目。在Doris的扫描任务之中,每一个Scanner会作为一个线程task提交到线程池之中等待被调度,该参数决定了Scanner线程池的大小。 * 默认值:48 +#### `doris_max_remote_scanner_thread_pool_thread_num` + +* 类型:int32 +* 描述:Remote scanner thread pool 的最大线程数。Remote scanner thread pool 用于除内表外的所有 scan 任务的执行。 +* 默认值:512 + #### `enable_prefetch` * 类型:bool --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org