This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit aa88a718c1c7fbb20ee266bb7d9d8ae1667907a9 Author: wangbo <wan...@apache.org> AuthorDate: Thu Apr 25 10:11:56 2024 +0800 Fix remote scan pool (#33976) --- be/src/runtime/workload_group/workload_group.cpp | 31 +++++------ be/src/vec/exec/scan/scanner_scheduler.cpp | 24 +++++--- be/src/vec/exec/scan/scanner_scheduler.h | 26 +++++---- .../resource/workloadgroup/WorkloadGroup.java | 11 ++++ .../data/workload_manager_p0/test_curd_wlg.out | 24 ++++++++ .../workload_manager_p0/test_curd_wlg.groovy | 64 ++++++++++++++++++++++ 6 files changed, 145 insertions(+), 35 deletions(-) diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 673263f1a17..c82346f040e 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -295,7 +295,8 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g } // 10 max remote scan thread num - workload_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + workload_group_info->max_remote_scan_thread_num = + vectorized::ScannerScheduler::get_remote_scan_thread_num(); if (tworkload_group_info.__isset.max_remote_scan_thread_num && tworkload_group_info.max_remote_scan_thread_num > 0) { workload_group_info->max_remote_scan_thread_num = @@ -303,7 +304,8 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g } // 11 min remote scan thread num - workload_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + workload_group_info->min_remote_scan_thread_num = + vectorized::ScannerScheduler::get_remote_scan_thread_num(); if (tworkload_group_info.__isset.min_remote_scan_thread_num && tworkload_group_info.min_remote_scan_thread_num > 0) { workload_group_info->min_remote_scan_thread_num = @@ -384,23 +386,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e } } if (scan_thread_num > 0 && _scan_task_sched) { - _scan_task_sched->reset_thread_num(scan_thread_num); + _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num); } if (_remote_scan_task_sched == nullptr) { - int remote_max_thread_num = - config::doris_max_remote_scanner_thread_pool_thread_num != -1 - ? config::doris_max_remote_scanner_thread_pool_thread_num - : std::max(512, CpuInfo::num_cores() * 10); - remote_max_thread_num = - std::max(remote_max_thread_num, config::doris_scanner_thread_pool_thread_num); - + int remote_max_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num(); + int remote_scan_thread_queue_size = + vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler = std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + tg_name, cg_cpu_ctl_ptr); - Status ret = - remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num, - config::doris_remote_scanner_thread_pool_queue_size); + Status ret = remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num, + remote_scan_thread_queue_size); if (ret.ok()) { _remote_scan_task_sched = std::move(remote_scan_scheduler); } else { @@ -408,11 +405,9 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e << tg_id; } } - if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) { - _remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num); - } - if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) { - _remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num); + if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) { + _remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num, + min_remote_scan_thread_num); } if (_non_pipe_thread_pool == nullptr) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 571df35e55e..eba62dcf19a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -97,14 +97,10 @@ Status ScannerScheduler::init(ExecEnv* env) { config::doris_scanner_thread_pool_queue_size, "local_scan"); // 2. remote scan thread pool - _remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1 - ? config::doris_max_remote_scanner_thread_pool_thread_num - : std::max(512, CpuInfo::num_cores() * 10); - _remote_thread_pool_max_size = - std::max(_remote_thread_pool_max_size, config::doris_scanner_thread_pool_thread_num); + _remote_thread_pool_max_size = ScannerScheduler::get_remote_scan_thread_num(); + int remote_scan_pool_queue_size = ScannerScheduler::get_remote_scan_thread_queue_size(); _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>( - _remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size, - "RemoteScanThreadPool"); + _remote_thread_pool_max_size, remote_scan_pool_queue_size, "RemoteScanThreadPool"); // 3. limited scan thread pool static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool") @@ -329,4 +325,18 @@ void ScannerScheduler::_deregister_metrics() { DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size); DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num); } + +int ScannerScheduler::get_remote_scan_thread_num() { + int remote_max_thread_num = config::doris_max_remote_scanner_thread_pool_thread_num != -1 + ? config::doris_max_remote_scanner_thread_pool_thread_num + : std::max(512, CpuInfo::num_cores() * 10); + remote_max_thread_num = + std::max(remote_max_thread_num, config::doris_scanner_thread_pool_thread_num); + return remote_max_thread_num; +} + +int ScannerScheduler::get_remote_scan_thread_queue_size() { + return config::doris_remote_scanner_thread_pool_queue_size; +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 01e08596434..b3d02860f9a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -65,6 +65,10 @@ public: int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; } + static int get_remote_scan_thread_num(); + + static int get_remote_scan_thread_queue_size(); + private: static void _scanner_scan(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task); @@ -136,16 +140,18 @@ public: } } - void reset_thread_num(int thread_num) { - int max_thread_num = _scan_thread_pool->max_threads(); - if (max_thread_num != thread_num) { - if (thread_num > max_thread_num) { - static_cast<void>(_scan_thread_pool->set_max_threads(thread_num)); - static_cast<void>(_scan_thread_pool->set_min_threads(thread_num)); - } else { - static_cast<void>(_scan_thread_pool->set_min_threads(thread_num)); - static_cast<void>(_scan_thread_pool->set_max_threads(thread_num)); - } + void reset_thread_num(int new_max_thread_num, int new_min_thread_num) { + int cur_max_thread_num = _scan_thread_pool->max_threads(); + int cur_min_thread_num = _scan_thread_pool->min_threads(); + if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) { + return; + } + if (new_max_thread_num >= cur_max_thread_num) { + static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num)); + static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num)); + } else { + static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num)); + static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 482d2f6f11a..e5ec2c619b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -272,6 +272,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + int maxRemoteScanNum = -1; if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) { String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM); try { @@ -279,12 +280,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (intValue <= 0 && intValue != -1) { throw new NumberFormatException(); } + maxRemoteScanNum = intValue; } catch (NumberFormatException e) { throw new DdlException( MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); } } + int minRemoteScanNum = -1; if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) { String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM); try { @@ -292,12 +295,20 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (intValue <= 0 && intValue != -1) { throw new NumberFormatException(); } + minRemoteScanNum = intValue; } catch (NumberFormatException e) { throw new DdlException( MIN_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); } } + if ((maxRemoteScanNum == -1 && minRemoteScanNum != -1) || (maxRemoteScanNum != -1 && minRemoteScanNum == -1)) { + throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " and " + MIN_REMOTE_SCAN_THREAD_NUM + + " must be specified simultaneously"); + } else if (maxRemoteScanNum < minRemoteScanNum) { + throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " must bigger or equal " + MIN_REMOTE_SCAN_THREAD_NUM); + } + // check queue property if (properties.containsKey(MAX_CONCURRENCY)) { try { diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index fca16d077e4..876be32601a 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -64,3 +64,27 @@ tag1_wg1 0% 10% tag1 tag1_wg2 0% 10% tag1 tag1_wg3 0% 80% tag1 +-- !select_remote_scan_num -- +20 10 + +-- !select_remote_scan_num_2 -- +21 10 + +-- !select_remote_scan_num_3 -- +21 2 + +-- !select_remote_scan_num_4 -- +40 20 + +-- !select_remote_scan_num_5 -- +10 5 + +-- !select_remote_scan_num_6 -- +3 3 + +-- !select_remote_scan_num_7 -- +10 5 + +-- !select_remote_scan_num_8 -- +-1 -1 + diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 8acfc8cb4ac..00514f9c54c 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -528,6 +528,70 @@ suite("test_crud_wlg") { sql "set bypass_workload_group = true;" sql "select count(1) from information_schema.active_queries;" + // test set remote scan pool + sql "drop workload group if exists test_remote_scan_wg;" + test { + sql "create workload group test_remote_scan_wg properties('min_remote_scan_thread_num'='123');" + exception "must be specified simultaneously" + } + + test { + sql "create workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='123');" + exception "must be specified simultaneously" + } + + test { + sql "create workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='10', 'min_remote_scan_thread_num'='123');" + exception "must bigger or equal " + } + + sql "create workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='20', 'min_remote_scan_thread_num'='10');" + qt_select_remote_scan_num "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='21')" + qt_select_remote_scan_num_2 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + test { + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='5')" + exception "must bigger or equal" + } + + sql "alter workload group test_remote_scan_wg properties('min_remote_scan_thread_num'='2')" + qt_select_remote_scan_num_3 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + test { + sql "alter workload group test_remote_scan_wg properties('min_remote_scan_thread_num'='30')" + exception "must bigger or equal" + } + + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='40', 'min_remote_scan_thread_num'='20')" + qt_select_remote_scan_num_4 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='10', 'min_remote_scan_thread_num'='5')" + qt_select_remote_scan_num_5 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='3', 'min_remote_scan_thread_num'='3')" + qt_select_remote_scan_num_6 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + sql "drop workload group test_remote_scan_wg;" + sql "create workload group test_remote_scan_wg properties('cpu_share'='1024');" + test { + sql "alter workload group test_remote_scan_wg properties('min_remote_scan_thread_num'='30')" + exception "must be specified simultaneously" + } + + test { + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='30')" + exception "must be specified simultaneously" + } + + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='10', 'min_remote_scan_thread_num'='5')" + qt_select_remote_scan_num_7 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + + sql "alter workload group test_remote_scan_wg properties('max_remote_scan_thread_num'='-1', 'min_remote_scan_thread_num'='-1')" + qt_select_remote_scan_num_8 "select MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from information_schema.workload_groups where name='test_remote_scan_wg';" + sql "drop workload group test_remote_scan_wg" + sql "drop workload group tag1_wg1;" sql "drop workload group tag1_wg2;" sql "drop workload group if exists tag2_wg1;" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org