yiguolei commented on code in PR #31106:
URL: https://github.com/apache/doris/pull/31106#discussion_r1494332195


##########
be/src/vec/exec/scan/scanner_scheduler.h:
##########
@@ -128,27 +124,32 @@ class SimplifiedScanScheduler {
                                 
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
                                 .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                                 .build(&_scan_thread_pool));
-
-        for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) 
{
-            RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] { 
this->_work(); }));
-        }
         return Status::OK();
     }
 
-    BlockingQueue<SimplifiedScanTask>* get_scan_queue() { return 
_scan_task_queue.get(); }
+    Status submit_scan_task(SimplifiedScanTask scan_task) {
+        if (!_is_stop) {
+            return _scan_thread_pool->submit_func([scan_task] { 
scan_task.scan_func(); });
+        } else {
+            return Status::InternalError<false>("scanner pool {} is 
shutdown.", _wg_name);
+        }
+    }
 
-private:
-    void _work() {
-        while (!_is_stop.load()) {
-            SimplifiedScanTask scan_task;
-            if (_scan_task_queue->blocking_get(&scan_task)) {
-                scan_task.scan_func();
-            };
+    void reset_thread_num(int thread_num) {

Review Comment:
   And also, submit task will failed
   
   Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* 
token) {
       DCHECK(token);
       std::chrono::time_point<std::chrono::system_clock> submit_time =
               std::chrono::system_clock::now();
   
       std::unique_lock<std::mutex> l(_lock);
       if (PREDICT_FALSE(!_pool_status.ok())) {
           return _pool_status;
       }
   
       if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
           return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was 
shut down", _name);
       }
   
       // Size limit check.
       int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - 
_active_threads +
                                    static_cast<int64_t>(_max_queue_size) - 
_total_queued_tasks;
       if (capacity_remaining < 1) {
           return Status::Error<SERVICE_UNAVAILABLE>(
                   "Thread pool {} is at capacity ({}/{} tasks running, {}/{} 
tasks queued)", _name,
                   _num_threads + _num_threads_pending_start, _max_threads, 
_total_queued_tasks,
                   _max_queue_size);
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to