github-actions[bot] commented on code in PR #30746:
URL: https://github.com/apache/doris/pull/30746#discussion_r1475590585


##########
be/src/vec/exec/scan/scanner_scheduler.cpp:
##########
@@ -139,155 +117,111 @@ Status ScannerScheduler::init(ExecEnv* env) {
     return Status::OK();
 }
 
-Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) {
+void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
+                              std::shared_ptr<RunningScanner> running_scanner) 
{
+    running_scanner->last_submit_time = GetCurrentTimeNanos();
     if (ctx->done()) {
-        return Status::EndOfFile("ScannerContext is done");
-    }
-    ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
-    if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
-        return Status::InternalError("failed to submit scanner context to 
scheduler");
-    }
-    return Status::OK();
-}
-
-std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
-        ThreadPool::ExecutionMode mode, int max_concurrency) {
-    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
-}
-
-void ScannerScheduler::_schedule_thread(int queue_id) {
-    BlockingQueue<std::shared_ptr<ScannerContext>>* queue = 
_pending_queues[queue_id];
-    while (!_is_closed) {
-        std::shared_ptr<ScannerContext> ctx;
-        bool ok = queue->blocking_get(&ctx);
-        if (!ok) {
-            // maybe closed
-            continue;
-        }
-
-        _schedule_scanners(ctx);
-        // If ctx is done, no need to schedule it again.
-        // But should notice that there may still scanners running in scanner 
pool.
+        running_scanner->eos = true;
+        running_scanner->status = Status::EndOfFile("ScannerContext is done");
+        ctx->append_block_to_queue(running_scanner);
+        return;
     }
-}
-
-void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) 
{
     auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
-        LOG(INFO) << "could not lock task execution context, query " << 
ctx->debug_string()
-                  << " maybe finished";
-        return;
-    }
-    MonotonicStopWatch watch;
-    watch.reset();
-    watch.start();
-    ctx->incr_num_ctx_scheduling(1);
-
-    if (ctx->done()) {
+        running_scanner->eos = true;
+        running_scanner->status =
+                Status::EndOfFile("could not lock task execution context, 
query maybe finished");
+        ctx->append_block_to_queue(running_scanner);
         return;
     }
 
-    std::list<std::weak_ptr<ScannerDelegate>> this_run;
-    ctx->get_next_batch_of_scanners(&this_run);
-    if (this_run.empty()) {
-        // There will be 2 cases when this_run is empty:
-        // 1. The blocks queue reaches limit.
-        //      The consumer will continue scheduling the ctx.
-        // 2. All scanners are running.
-        //      There running scanner will schedule the ctx after they are 
finished.
-        // So here we just return to stop scheduling ctx.
-        return;
-    }
-
-    ctx->inc_num_running_scanners(this_run.size());
-
     // Submit scanners to thread pool
     // TODO(cmy): How to handle this "nice"?
     int nice = 1;
-    auto iter = this_run.begin();
     if (ctx->thread_token != nullptr) {
-        // TODO llj tg how to treat this?
-        while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                // Has to ++, or there is a dead loop
-                iter++;
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            auto s = ctx->thread_token->submit_func([this, scanner_ref = 
*iter, ctx]() {
-                this->_scanner_scan(this, ctx, scanner_ref);
-            });
-            if (s.ok()) {
-                iter++;
-            } else {
-                ctx->set_status_on_error(s);
-                break;
-            }
+        std::shared_ptr<ScannerDelegate> scanner_delegate = 
running_scanner->scanner.lock();
+        if (scanner_delegate == nullptr) {
+            running_scanner->eos = true;
+            ctx->append_block_to_queue(running_scanner);
+            return;
+        }
+
+        scanner_delegate->_scanner->start_wait_worker_timer();
+        auto s = ctx->thread_token->submit_func([this, scanner_ref = 
running_scanner, ctx]() {
+            this->_scanner_scan(ctx, scanner_ref);
+        });
+        if (!s.ok()) {
+            running_scanner->status = s;
+            ctx->append_block_to_queue(running_scanner);
+            return;
         }
     } else {
-        while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                // Has to ++, or there is a dead loop
-                iter++;
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
-            bool ret = false;
-            if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-                if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
-                    auto work_func = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
-                    };
-                    SimplifiedScanTask simple_scan_task = {work_func, ctx};
-                    ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
-                } else {
-                    PriorityThreadPool::Task task;
-                    task.work_function = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
-                    };
-                    task.priority = nice;
-                    ret = _local_scan_thread_pool->offer(task);
-                }
+        std::shared_ptr<ScannerDelegate> scanner_delegate = 
running_scanner->scanner.lock();
+        if (scanner_delegate == nullptr) {
+            running_scanner->eos = true;
+            ctx->append_block_to_queue(running_scanner);
+            return;
+        }
+
+        scanner_delegate->_scanner->start_wait_worker_timer();
+        TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
+        bool ret = false;
+        if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+            if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+                auto work_func = [this, scanner_ref = running_scanner, ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
+                };
+                SimplifiedScanTask simple_scan_task = {work_func, ctx};
+                ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
             } else {
                 PriorityThreadPool::Task task;
-                task.work_function = [this, scanner_ref = *iter, ctx]() {
-                    this->_scanner_scan(this, ctx, scanner_ref);
+                task.work_function = [this, scanner_ref = running_scanner, 
ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
                 };
                 task.priority = nice;
-                ret = _remote_scan_thread_pool->offer(task);
-            }
-            if (ret) {
-                iter++;
-            } else {
-                ctx->set_status_on_error(
-                        Status::InternalError("failed to submit scanner to 
scanner pool"));
-                break;
+                ret = _local_scan_thread_pool->offer(task);
             }
+        } else {
+            PriorityThreadPool::Task task;
+            task.work_function = [this, scanner_ref = running_scanner, ctx]() {
+                this->_scanner_scan(ctx, scanner_ref);
+            };
+            task.priority = nice;
+            ret = _remote_scan_thread_pool->offer(task);
+        }
+        if (!ret) {
+            running_scanner->status =
+                    Status::InternalError("failed to submit scanner to scanner 
pool");
+            ctx->append_block_to_queue(running_scanner);
+            return;
         }
     }
-    ctx->incr_ctx_scheduling_time(watch.elapsed_time());
 }
 
-void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
-                                     std::shared_ptr<ScannerContext> ctx,
-                                     std::weak_ptr<ScannerDelegate> 
scanner_ref) {
+std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
+        ThreadPool::ExecutionMode mode, int max_concurrency) {
+    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
+}
+
+void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,

Review Comment:
   warning: method '_scanner_scan' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/exec/scan/scanner_scheduler.h:69:
   ```diff
   -     void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
   +     static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
   ```
   



##########
be/src/vec/exec/scan/scanner_scheduler.cpp:
##########
@@ -139,155 +117,111 @@
     return Status::OK();
 }
 
-Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) {
+void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
+                              std::shared_ptr<RunningScanner> running_scanner) 
{
+    running_scanner->last_submit_time = GetCurrentTimeNanos();
     if (ctx->done()) {
-        return Status::EndOfFile("ScannerContext is done");
-    }
-    ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
-    if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
-        return Status::InternalError("failed to submit scanner context to 
scheduler");
-    }
-    return Status::OK();
-}
-
-std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
-        ThreadPool::ExecutionMode mode, int max_concurrency) {
-    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
-}
-
-void ScannerScheduler::_schedule_thread(int queue_id) {
-    BlockingQueue<std::shared_ptr<ScannerContext>>* queue = 
_pending_queues[queue_id];
-    while (!_is_closed) {
-        std::shared_ptr<ScannerContext> ctx;
-        bool ok = queue->blocking_get(&ctx);
-        if (!ok) {
-            // maybe closed
-            continue;
-        }
-
-        _schedule_scanners(ctx);
-        // If ctx is done, no need to schedule it again.
-        // But should notice that there may still scanners running in scanner 
pool.
+        running_scanner->eos = true;
+        running_scanner->status = Status::EndOfFile("ScannerContext is done");
+        ctx->append_block_to_queue(running_scanner);
+        return;
     }
-}
-
-void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) 
{
     auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
-        LOG(INFO) << "could not lock task execution context, query " << 
ctx->debug_string()
-                  << " maybe finished";
-        return;
-    }
-    MonotonicStopWatch watch;
-    watch.reset();
-    watch.start();
-    ctx->incr_num_ctx_scheduling(1);
-
-    if (ctx->done()) {
+        running_scanner->eos = true;
+        running_scanner->status =
+                Status::EndOfFile("could not lock task execution context, 
query maybe finished");
+        ctx->append_block_to_queue(running_scanner);
         return;
     }
 
-    std::list<std::weak_ptr<ScannerDelegate>> this_run;
-    ctx->get_next_batch_of_scanners(&this_run);
-    if (this_run.empty()) {
-        // There will be 2 cases when this_run is empty:
-        // 1. The blocks queue reaches limit.
-        //      The consumer will continue scheduling the ctx.
-        // 2. All scanners are running.
-        //      There running scanner will schedule the ctx after they are 
finished.
-        // So here we just return to stop scheduling ctx.
-        return;
-    }
-
-    ctx->inc_num_running_scanners(this_run.size());
-
     // Submit scanners to thread pool
     // TODO(cmy): How to handle this "nice"?
     int nice = 1;
-    auto iter = this_run.begin();
     if (ctx->thread_token != nullptr) {
-        // TODO llj tg how to treat this?
-        while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                // Has to ++, or there is a dead loop
-                iter++;
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            auto s = ctx->thread_token->submit_func([this, scanner_ref = 
*iter, ctx]() {
-                this->_scanner_scan(this, ctx, scanner_ref);
-            });
-            if (s.ok()) {
-                iter++;
-            } else {
-                ctx->set_status_on_error(s);
-                break;
-            }
+        std::shared_ptr<ScannerDelegate> scanner_delegate = 
running_scanner->scanner.lock();
+        if (scanner_delegate == nullptr) {
+            running_scanner->eos = true;
+            ctx->append_block_to_queue(running_scanner);
+            return;
+        }
+
+        scanner_delegate->_scanner->start_wait_worker_timer();
+        auto s = ctx->thread_token->submit_func([this, scanner_ref = 
running_scanner, ctx]() {
+            this->_scanner_scan(ctx, scanner_ref);
+        });
+        if (!s.ok()) {
+            running_scanner->status = s;
+            ctx->append_block_to_queue(running_scanner);
+            return;
         }
     } else {
-        while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                // Has to ++, or there is a dead loop
-                iter++;
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
-            bool ret = false;
-            if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-                if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
-                    auto work_func = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
-                    };
-                    SimplifiedScanTask simple_scan_task = {work_func, ctx};
-                    ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
-                } else {
-                    PriorityThreadPool::Task task;
-                    task.work_function = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
-                    };
-                    task.priority = nice;
-                    ret = _local_scan_thread_pool->offer(task);
-                }
+        std::shared_ptr<ScannerDelegate> scanner_delegate = 
running_scanner->scanner.lock();
+        if (scanner_delegate == nullptr) {
+            running_scanner->eos = true;
+            ctx->append_block_to_queue(running_scanner);
+            return;
+        }
+
+        scanner_delegate->_scanner->start_wait_worker_timer();
+        TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
+        bool ret = false;
+        if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+            if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+                auto work_func = [this, scanner_ref = running_scanner, ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
+                };
+                SimplifiedScanTask simple_scan_task = {work_func, ctx};
+                ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
             } else {
                 PriorityThreadPool::Task task;
-                task.work_function = [this, scanner_ref = *iter, ctx]() {
-                    this->_scanner_scan(this, ctx, scanner_ref);
+                task.work_function = [this, scanner_ref = running_scanner, 
ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
                 };
                 task.priority = nice;
-                ret = _remote_scan_thread_pool->offer(task);
-            }
-            if (ret) {
-                iter++;
-            } else {
-                ctx->set_status_on_error(
-                        Status::InternalError("failed to submit scanner to 
scanner pool"));
-                break;
+                ret = _local_scan_thread_pool->offer(task);
             }
+        } else {
+            PriorityThreadPool::Task task;
+            task.work_function = [this, scanner_ref = running_scanner, ctx]() {
+                this->_scanner_scan(ctx, scanner_ref);
+            };
+            task.priority = nice;
+            ret = _remote_scan_thread_pool->offer(task);
+        }
+        if (!ret) {
+            running_scanner->status =
+                    Status::InternalError("failed to submit scanner to scanner 
pool");
+            ctx->append_block_to_queue(running_scanner);
+            return;
         }
     }
-    ctx->incr_ctx_scheduling_time(watch.elapsed_time());
 }
 
-void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
-                                     std::shared_ptr<ScannerContext> ctx,
-                                     std::weak_ptr<ScannerDelegate> 
scanner_ref) {
+std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
+        ThreadPool::ExecutionMode mode, int max_concurrency) {
+    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
+}
+
+void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,

Review Comment:
   warning: function '_scanner_scan' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                          ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/vec/exec/scan/scanner_scheduler.cpp:205:** 116 lines including 
whitespace and comments (threshold 80)
   ```cpp
   void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                          ^
   ```
   
   </details>
   



##########
be/src/vec/exec/scan/scanner_context.h:
##########
@@ -80,89 +81,48 @@ class ScannerContext : public 
std::enable_shared_from_this<ScannerContext>,
     virtual ~ScannerContext() = default;
     virtual Status init();
 
-    vectorized::BlockUPtr get_free_block();
-    void return_free_block(std::unique_ptr<vectorized::Block> block);
+    vectorized::BlockUPtr get_free_block(int batch_size);
+    void return_free_block(vectorized::BlockUPtr block);
 
-    // Append blocks from scanners to the blocks queue.
-    virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks);
-    // Get next block from blocks queue. Called by ScanNode
+    // Get next block from blocks queue. Called by ScanNode/ScanOperator
     // Set eos to true if there is no more data to read.
-    // And if eos is true, the block returned must be nullptr.
-    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                        bool* eos, int id);
+    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::Block* block, bool* eos,
+                                        int id, bool wait = true);
 
     [[nodiscard]] Status validate_block_schema(Block* block);
 
-    // When a scanner complete a scan, this method will be called
-    // to return the scanner to the list for next scheduling.
-    void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> 
scanner);
+    // submit the running scanner to thread pool in `ScannerScheduler`
+    // set the next scanned block to `RunningScanner::current_block`
+    // set the error state to `RunningScanner::status`
+    // set the `eos` to `RunningScanner::eos` if there is no more data in 
current scanner
+    void submit_running_scanner(std::shared_ptr<RunningScanner> 
running_scanner);
 
-    void set_status_on_error(const Status& status, bool need_lock = true);
-
-    Status status() {
-        if (_process_status.is<ErrorCode::END_OF_FILE>()) {
-            return Status::OK();
-        }
-        return _process_status;
-    }
+    // append the running scanner and its cached block to `_blocks_queue`
+    virtual void append_block_to_queue(std::shared_ptr<RunningScanner> 
running_scanner);
 
     // Return true if this ScannerContext need no more process
     bool done() const { return _is_finished || _should_stop; }
     bool is_finished() { return _is_finished.load(); }
     bool should_stop() { return _should_stop.load(); }
 
-    void inc_num_running_scanners(int32_t scanner_inc);
-
-    int get_num_running_scanners() const { return _num_running_scanners; }
-
-    int get_num_unfinished_scanners() const { return _num_unfinished_scanners; 
}
-
-    void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>* 
current_run);
-
     virtual std::string debug_string();
 
     RuntimeState* state() { return _state; }
-
-    void incr_num_ctx_scheduling(int64_t num) { 
_scanner_ctx_sched_counter->update(num); }
-
-    int64_t num_ctx_scheduled() { return _scanner_ctx_sched_counter->value(); }
     void incr_ctx_scheduling_time(int64_t num) { 
_scanner_ctx_sched_time->update(num); }
 
     std::string parent_name();
 
     virtual bool empty_in_queue(int id);
 
-    // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
-    inline bool should_be_scheduled() const {
-        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
-               (_serving_blocks_num < allowed_blocks_num());
-    }
-
-    int get_available_thread_slot_num() {
-        int thread_slot_num = 0;
-        thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / 
_block_per_scanner;
-        thread_slot_num = std::min(thread_slot_num, _max_thread_num - 
_num_running_scanners);
-        if (thread_slot_num <= 0) {
-            thread_slot_num = 1;
-        }
-        return thread_slot_num;
-    }
-
-    int32_t allowed_blocks_num() const {
-        int32_t blocks_num = std::min(_free_blocks_capacity,
-                                      int32_t((_max_bytes_in_queue + 
_estimated_block_bytes - 1) /
-                                              _estimated_block_bytes));
-        return blocks_num;
-    }
-
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
-    virtual void reschedule_scanner_ctx();
     void stop_scanners(RuntimeState* state);
 
     int32_t get_max_thread_num() const { return _max_thread_num; }
     void set_max_thread_num(int32_t num) { _max_thread_num = num; }
 
+    int batch_size() { return _batch_size; }

Review Comment:
   warning: method 'batch_size' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       int batch_size() const { return _batch_size; }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to