This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit aa88a718c1c7fbb20ee266bb7d9d8ae1667907a9
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Apr 25 10:11:56 2024 +0800

    Fix remote scan pool (#33976)
---
 be/src/runtime/workload_group/workload_group.cpp   | 31 +++++------
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 24 +++++---
 be/src/vec/exec/scan/scanner_scheduler.h           | 26 +++++----
 .../resource/workloadgroup/WorkloadGroup.java      | 11 ++++
 .../data/workload_manager_p0/test_curd_wlg.out     | 24 ++++++++
 .../workload_manager_p0/test_curd_wlg.groovy       | 64 ++++++++++++++++++++++
 6 files changed, 145 insertions(+), 35 deletions(-)

diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 673263f1a17..c82346f040e 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -295,7 +295,8 @@ Status WorkloadGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& tworkload_g
     }
 
     // 10 max remote scan thread num
-    workload_group_info->max_remote_scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    workload_group_info->max_remote_scan_thread_num =
+            vectorized::ScannerScheduler::get_remote_scan_thread_num();
     if (tworkload_group_info.__isset.max_remote_scan_thread_num &&
         tworkload_group_info.max_remote_scan_thread_num > 0) {
         workload_group_info->max_remote_scan_thread_num =
@@ -303,7 +304,8 @@ Status WorkloadGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& tworkload_g
     }
 
     // 11 min remote scan thread num
-    workload_group_info->min_remote_scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    workload_group_info->min_remote_scan_thread_num =
+            vectorized::ScannerScheduler::get_remote_scan_thread_num();
     if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
         tworkload_group_info.min_remote_scan_thread_num > 0) {
         workload_group_info->min_remote_scan_thread_num =
@@ -384,23 +386,18 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         }
     }
     if (scan_thread_num > 0 && _scan_task_sched) {
-        _scan_task_sched->reset_thread_num(scan_thread_num);
+        _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
     }
 
     if (_remote_scan_task_sched == nullptr) {
-        int remote_max_thread_num =
-                config::doris_max_remote_scanner_thread_pool_thread_num != -1
-                        ? 
config::doris_max_remote_scanner_thread_pool_thread_num
-                        : std::max(512, CpuInfo::num_cores() * 10);
-        remote_max_thread_num =
-                std::max(remote_max_thread_num, 
config::doris_scanner_thread_pool_thread_num);
-
+        int remote_max_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
+        int remote_scan_thread_queue_size =
+                
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
                 std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ tg_name,
                                                                       
cg_cpu_ctl_ptr);
-        Status ret =
-                remote_scan_scheduler->start(remote_max_thread_num, 
remote_max_thread_num,
-                                             
config::doris_remote_scanner_thread_pool_queue_size);
+        Status ret = remote_scan_scheduler->start(remote_max_thread_num, 
remote_max_thread_num,
+                                                  
remote_scan_thread_queue_size);
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
@@ -408,11 +405,9 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                       << tg_id;
         }
     }
-    if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
-        
_remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num);
-    }
-    if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
-        
_remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num);
+    if (max_remote_scan_thread_num >= min_remote_scan_thread_num && 
_remote_scan_task_sched) {
+        _remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
+                                                  min_remote_scan_thread_num);
     }
 
     if (_non_pipe_thread_pool == nullptr) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 571df35e55e..eba62dcf19a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -97,14 +97,10 @@ Status ScannerScheduler::init(ExecEnv* env) {
             config::doris_scanner_thread_pool_queue_size, "local_scan");
 
     // 2. remote scan thread pool
-    _remote_thread_pool_max_size = 
config::doris_max_remote_scanner_thread_pool_thread_num != -1
-                                           ? 
config::doris_max_remote_scanner_thread_pool_thread_num
-                                           : std::max(512, 
CpuInfo::num_cores() * 10);
-    _remote_thread_pool_max_size =
-            std::max(_remote_thread_pool_max_size, 
config::doris_scanner_thread_pool_thread_num);
+    _remote_thread_pool_max_size = 
ScannerScheduler::get_remote_scan_thread_num();
+    int remote_scan_pool_queue_size = 
ScannerScheduler::get_remote_scan_thread_queue_size();
     _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
-            _remote_thread_pool_max_size, 
config::doris_remote_scanner_thread_pool_queue_size,
-            "RemoteScanThreadPool");
+            _remote_thread_pool_max_size, remote_scan_pool_queue_size, 
"RemoteScanThreadPool");
 
     // 3. limited scan thread pool
     static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool")
@@ -329,4 +325,18 @@ void ScannerScheduler::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
     DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
 }
+
+int ScannerScheduler::get_remote_scan_thread_num() {
+    int remote_max_thread_num = 
config::doris_max_remote_scanner_thread_pool_thread_num != -1
+                                        ? 
config::doris_max_remote_scanner_thread_pool_thread_num
+                                        : std::max(512, CpuInfo::num_cores() * 
10);
+    remote_max_thread_num =
+            std::max(remote_max_thread_num, 
config::doris_scanner_thread_pool_thread_num);
+    return remote_max_thread_num;
+}
+
+int ScannerScheduler::get_remote_scan_thread_queue_size() {
+    return config::doris_remote_scanner_thread_pool_queue_size;
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 01e08596434..b3d02860f9a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -65,6 +65,10 @@ public:
 
     int remote_thread_pool_max_size() const { return 
_remote_thread_pool_max_size; }
 
+    static int get_remote_scan_thread_num();
+
+    static int get_remote_scan_thread_queue_size();
+
 private:
     static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
                               std::shared_ptr<ScanTask> scan_task);
@@ -136,16 +140,18 @@ public:
         }
     }
 
-    void reset_thread_num(int thread_num) {
-        int max_thread_num = _scan_thread_pool->max_threads();
-        if (max_thread_num != thread_num) {
-            if (thread_num > max_thread_num) {
-                
static_cast<void>(_scan_thread_pool->set_max_threads(thread_num));
-                
static_cast<void>(_scan_thread_pool->set_min_threads(thread_num));
-            } else {
-                
static_cast<void>(_scan_thread_pool->set_min_threads(thread_num));
-                
static_cast<void>(_scan_thread_pool->set_max_threads(thread_num));
-            }
+    void reset_thread_num(int new_max_thread_num, int new_min_thread_num) {
+        int cur_max_thread_num = _scan_thread_pool->max_threads();
+        int cur_min_thread_num = _scan_thread_pool->min_threads();
+        if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == 
new_min_thread_num) {
+            return;
+        }
+        if (new_max_thread_num >= cur_max_thread_num) {
+            
static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num));
+            
static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num));
+        } else {
+            
static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num));
+            
static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num));
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 482d2f6f11a..e5ec2c619b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -272,6 +272,7 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             }
         }
 
+        int maxRemoteScanNum = -1;
         if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) {
             String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
             try {
@@ -279,12 +280,14 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 if (intValue <= 0 && intValue != -1) {
                     throw new NumberFormatException();
                 }
+                maxRemoteScanNum = intValue;
             } catch (NumberFormatException e) {
                 throw new DdlException(
                         MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive 
integer or -1. but input value is " + value);
             }
         }
 
+        int minRemoteScanNum = -1;
         if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) {
             String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
             try {
@@ -292,12 +295,20 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 if (intValue <= 0 && intValue != -1) {
                     throw new NumberFormatException();
                 }
+                minRemoteScanNum = intValue;
             } catch (NumberFormatException e) {
                 throw new DdlException(
                         MIN_REMOTE_SCAN_THREAD_NUM + " must be a positive 
integer or -1. but input value is " + value);
             }
         }
 
+        if ((maxRemoteScanNum == -1 && minRemoteScanNum != -1) || 
(maxRemoteScanNum != -1 && minRemoteScanNum == -1)) {
+            throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " and " + 
MIN_REMOTE_SCAN_THREAD_NUM
+                    + " must be specified simultaneously");
+        } else if (maxRemoteScanNum < minRemoteScanNum) {
+            throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " must bigger 
or equal " + MIN_REMOTE_SCAN_THREAD_NUM);
+        }
+
         // check queue property
         if (properties.containsKey(MAX_CONCURRENCY)) {
             try {
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out 
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index fca16d077e4..876be32601a 100644
--- a/regression-test/data/workload_manager_p0/test_curd_wlg.out
+++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out
@@ -64,3 +64,27 @@ tag1_wg1     0%      10%     tag1
 tag1_wg2       0%      10%     tag1
 tag1_wg3       0%      80%     tag1
 
+-- !select_remote_scan_num --
+20     10
+
+-- !select_remote_scan_num_2 --
+21     10
+
+-- !select_remote_scan_num_3 --
+21     2
+
+-- !select_remote_scan_num_4 --
+40     20
+
+-- !select_remote_scan_num_5 --
+10     5
+
+-- !select_remote_scan_num_6 --
+3      3
+
+-- !select_remote_scan_num_7 --
+10     5
+
+-- !select_remote_scan_num_8 --
+-1     -1
+
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 8acfc8cb4ac..00514f9c54c 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -528,6 +528,70 @@ suite("test_crud_wlg") {
     sql "set bypass_workload_group = true;"
     sql "select count(1) from information_schema.active_queries;"
 
+    // test set remote scan pool
+    sql "drop workload group if exists test_remote_scan_wg;"
+    test {
+        sql "create workload group test_remote_scan_wg 
properties('min_remote_scan_thread_num'='123');"
+        exception "must be specified simultaneously"
+    }
+
+    test {
+        sql "create workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='123');"
+        exception "must be specified simultaneously"
+    }
+
+    test {
+        sql "create workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='10', 
'min_remote_scan_thread_num'='123');"
+        exception "must bigger or equal "
+    }
+
+    sql "create workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='20', 
'min_remote_scan_thread_num'='10');"
+    qt_select_remote_scan_num "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='21')"
+    qt_select_remote_scan_num_2 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    test {
+        sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='5')"
+        exception "must bigger or equal"
+    }
+
+    sql "alter workload group test_remote_scan_wg 
properties('min_remote_scan_thread_num'='2')"
+    qt_select_remote_scan_num_3 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    test {
+        sql "alter workload group test_remote_scan_wg 
properties('min_remote_scan_thread_num'='30')"
+        exception "must bigger or equal"
+    }
+
+    sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='40', 
'min_remote_scan_thread_num'='20')"
+    qt_select_remote_scan_num_4 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='10', 'min_remote_scan_thread_num'='5')"
+    qt_select_remote_scan_num_5 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='3', 'min_remote_scan_thread_num'='3')"
+    qt_select_remote_scan_num_6 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    sql "drop workload group test_remote_scan_wg;"
+    sql "create workload group test_remote_scan_wg 
properties('cpu_share'='1024');"
+    test {
+        sql "alter workload group test_remote_scan_wg 
properties('min_remote_scan_thread_num'='30')"
+        exception "must be specified simultaneously"
+    }
+
+    test {
+        sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='30')"
+        exception "must be specified simultaneously"
+    }
+
+    sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='10', 'min_remote_scan_thread_num'='5')"
+    qt_select_remote_scan_num_7 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+
+    sql "alter workload group test_remote_scan_wg 
properties('max_remote_scan_thread_num'='-1', 
'min_remote_scan_thread_num'='-1')"
+    qt_select_remote_scan_num_8 "select 
MAX_REMOTE_SCAN_THREAD_NUM,MIN_REMOTE_SCAN_THREAD_NUM from 
information_schema.workload_groups where name='test_remote_scan_wg';"
+    sql "drop workload group test_remote_scan_wg"
+
     sql "drop workload group tag1_wg1;"
     sql "drop workload group tag1_wg2;"
     sql "drop workload group if exists tag2_wg1;"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to