github-actions[bot] commented on code in PR #30746: URL: https://github.com/apache/doris/pull/30746#discussion_r1477883433
########## be/src/vec/exec/scan/scanner_scheduler.cpp: ########## @@ -139,155 +117,107 @@ Status ScannerScheduler::init(ExecEnv* env) { return Status::OK(); } -Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) { +void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, + std::shared_ptr<ScanTask> scan_task) { + scan_task->last_submit_time = GetCurrentTimeNanos(); if (ctx->done()) { - return Status::EndOfFile("ScannerContext is done"); - } - ctx->queue_idx = (_queue_idx++ % QUEUE_NUM); - if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) { - return Status::InternalError("failed to submit scanner context to scheduler"); - } - return Status::OK(); -} - -std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token( - ThreadPool::ExecutionMode mode, int max_concurrency) { - return _limited_scan_thread_pool->new_token(mode, max_concurrency); -} - -void ScannerScheduler::_schedule_thread(int queue_id) { - BlockingQueue<std::shared_ptr<ScannerContext>>* queue = _pending_queues[queue_id]; - while (!_is_closed) { - std::shared_ptr<ScannerContext> ctx; - bool ok = queue->blocking_get(&ctx); - if (!ok) { - // maybe closed - continue; - } - - _schedule_scanners(ctx); - // If ctx is done, no need to schedule it again. - // But should notice that there may still scanners running in scanner pool. + scan_task->set_status(Status::EndOfFile("ScannerContext is done")); + ctx->append_block_to_queue(scan_task); + return; } -} - -void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) { auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string() << " maybe finished"; - return; - } - MonotonicStopWatch watch; - watch.reset(); - watch.start(); - ctx->incr_num_ctx_scheduling(1); - - if (ctx->done()) { + ctx->append_block_to_queue(scan_task); return; } - std::list<std::weak_ptr<ScannerDelegate>> this_run; - ctx->get_next_batch_of_scanners(&this_run); - if (this_run.empty()) { - // There will be 2 cases when this_run is empty: - // 1. The blocks queue reaches limit. - // The consumer will continue scheduling the ctx. - // 2. All scanners are running. - // There running scanner will schedule the ctx after they are finished. - // So here we just return to stop scheduling ctx. - return; - } - - ctx->inc_num_running_scanners(this_run.size()); - // Submit scanners to thread pool // TODO(cmy): How to handle this "nice"? int nice = 1; - auto iter = this_run.begin(); if (ctx->thread_token != nullptr) { - // TODO llj tg how to treat this? - while (iter != this_run.end()) { - std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock(); - if (scanner_delegate == nullptr) { - // Has to ++, or there is a dead loop - iter++; - continue; - } - scanner_delegate->_scanner->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func([this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }); - if (s.ok()) { - iter++; - } else { - ctx->set_status_on_error(s); - break; - } + std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); + if (scanner_delegate == nullptr) { + scan_task->set_eos(true); + ctx->append_block_to_queue(scan_task); + return; + } + + scanner_delegate->_scanner->start_wait_worker_timer(); + auto s = ctx->thread_token->submit_func( + [this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); }); + if (!s.ok()) { + scan_task->set_status(s); + ctx->append_block_to_queue(scan_task); + return; } } else { - while (iter != this_run.end()) { - std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock(); - if (scanner_delegate == nullptr) { - // Has to ++, or there is a dead loop - iter++; - continue; - } - scanner_delegate->_scanner->start_wait_worker_timer(); - TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); - bool ret = false; - if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { - auto work_func = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }; - SimplifiedScanTask simple_scan_task = {work_func, ctx}; - ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); - } else { - PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }; - task.priority = nice; - ret = _local_scan_thread_pool->offer(task); - } + std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); + if (scanner_delegate == nullptr) { + scan_task->set_eos(true); + ctx->append_block_to_queue(scan_task); + return; + } + + scanner_delegate->_scanner->start_wait_worker_timer(); + TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); + bool ret = false; + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { + auto work_func = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); + }; + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); + task.work_function = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); }; task.priority = nice; - ret = _remote_scan_thread_pool->offer(task); - } - if (ret) { - iter++; - } else { - ctx->set_status_on_error( - Status::InternalError("failed to submit scanner to scanner pool")); - break; + ret = _local_scan_thread_pool->offer(task); } + } else { + PriorityThreadPool::Task task; + task.work_function = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); + }; + task.priority = nice; + ret = _remote_scan_thread_pool->offer(task); + } + if (!ret) { + scan_task->set_status( + Status::InternalError("failed to submit scanner to scanner pool")); + ctx->append_block_to_queue(scan_task); + return; } } - ctx->incr_ctx_scheduling_time(watch.elapsed_time()); } -void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, - std::shared_ptr<ScannerContext> ctx, - std::weak_ptr<ScannerDelegate> scanner_ref) { +std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token( + ThreadPool::ExecutionMode mode, int max_concurrency) { + return _limited_scan_thread_pool->new_token(mode, max_concurrency); +} + +void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, Review Comment: warning: function '_scanner_scan' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, ^ ``` <details> <summary>Additional context</summary> **be/src/vec/exec/scan/scanner_scheduler.cpp:202:** 107 lines including whitespace and comments (threshold 80) ```cpp void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org