This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0349841069d [refactor](scheduler) Simplify TaskScheduler (#48118) 0349841069d is described below commit 0349841069dae08cdd2d9bc6c109f49dcaabe8b7 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Feb 20 19:32:29 2025 +0800 [refactor](scheduler) Simplify TaskScheduler (#48118) --- be/src/pipeline/pipeline_fragment_context.cpp | 4 +- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/pipeline/task_scheduler.cpp | 75 ++++++++++----------------- 3 files changed, 32 insertions(+), 49 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8797ecd49b2..00af07f7bdd 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1753,7 +1753,7 @@ void PipelineFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this())); } -void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { +bool PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); if (_pip_id_to_pipeline[pipeline_id]->close_task()) { @@ -1767,7 +1767,9 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { ++_closed_tasks; if (_closed_tasks == _total_tasks) { _close_fragment_instance(); + return true; } + return false; } Status PipelineFragmentContext::send_report(bool done) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index bd3a350d0a2..6fa4925e302 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -100,7 +100,7 @@ public: [[nodiscard]] int get_fragment_id() const { return _fragment_id; } - void close_a_pipeline(PipelineId pipeline_id); + bool decrement_running_task(PipelineId pipeline_id); Status send_report(bool); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 60d9efa66ad..7948a853799 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -69,31 +69,34 @@ Status TaskScheduler::schedule_task(PipelineTask* task) { return _task_queue.push_back(task); } -// after _close_task, task maybe destructed. -void _close_task(PipelineTask* task, Status exec_status) { +// after close_task, task maybe destructed. +bool close_task(PipelineTask* task, Status exec_status) { + if (exec_status.ok() && task->is_pending_finish()) { + // Close phase is blocked by dependency. + return false; + } // Has to attach memory tracker here, because the close task will also release some memory. // Should count the memory to the query or the query's memory will not decrease when part of // task finished. SCOPED_ATTACH_TASK(task->runtime_state()); if (task->is_finalized()) { - task->set_running(false); - return; + return false; + } + if (!exec_status.ok()) { + task->fragment_context()->cancel(exec_status); + LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", + print_id(task->query_context()->query_id()), + exec_status.to_string()); } - // close_a_pipeline may delete fragment context and will core in some defer - // code, because the defer code will access fragment context it self. + // decrement_running_task may delete fragment context and will core in some defer + // code, because the defer code will access fragment context itself. auto lock_for_context = task->fragment_context()->shared_from_this(); - // is_pending_finish does not check status, so has to check status in close API. - // For example, in async writer, the writer may failed during dealing with eos_block - // but it does not return error status. Has to check the error status in close API. - // We have already refactor all source and sink api, the close API does not need waiting - // for pending finish now. So that could call close directly. Status status = task->close(exec_status); if (!status.ok()) { task->fragment_context()->cancel(status); } task->finalize(); - task->set_running(false); - task->fragment_context()->close_a_pipeline(task->pipeline_id()); + return task->fragment_context()->decrement_running_task(task->pipeline_id()); } void TaskScheduler::_do_work(int index) { @@ -111,28 +114,28 @@ void TaskScheduler::_do_work(int index) { } task->log_detail_if_need(); task->set_running(true); + bool fragment_is_finished = false; + Defer task_running_defer {[&]() { + // If fragment is finished, fragment context will be de-constructed with all tasks in it. + if (!fragment_is_finished) { + task->set_running(false); + } + }}; task->set_task_queue(&_task_queue); auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled(); - // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish - // has to return false. The task is finished and need to close now. + // Close task if canceled if (canceled) { - // may change from pending FINISH,should called cancel - // also may change form BLOCK, other task called cancel - - // If pipeline is canceled, it will report after pipeline closed, and will propagate - // errors to downstream through exchange. So, here we needn't send_report. - // fragment_ctx->send_report(true); - _close_task(task, fragment_ctx->get_query_ctx()->exec_status()); + fragment_is_finished = close_task(task, fragment_ctx->get_query_ctx()->exec_status()); continue; } - // task exec bool eos = false; auto status = Status::OK(); task->set_core_id(index); + // Main logics of execution ASSIGN_STATUS_IF_CATCH_EXCEPTION( //TODO: use a better enclose to abstracting these if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { @@ -151,33 +154,11 @@ void TaskScheduler::_do_work(int index) { start_time, end_time}); } else { status = task->execute(&eos); }, status); - - if (!status.ok()) { - // Print detail informations below when you debugging here. - // - // LOG(WARNING)<< "task:\n"<<task->debug_string(); - - // exec failed,cancel all fragment instance - fragment_ctx->cancel(status); - LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", - print_id(task->query_context()->query_id()), - status.to_string()); - _close_task(task, status); - continue; - } fragment_ctx->trigger_report_if_necessary(); - if (eos) { - // is pending finish will add the task to dependency's blocking queue, and then the task will be - // added to running queue when dependency is ready. - if (!task->is_pending_finish()) { - Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, exec_status); - continue; - } + if (eos || !status.ok()) { + fragment_is_finished = close_task(task, status); } - - task->set_running(false); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org