This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0349841069d [refactor](scheduler) Simplify TaskScheduler (#48118)
0349841069d is described below

commit 0349841069dae08cdd2d9bc6c109f49dcaabe8b7
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Feb 20 19:32:29 2025 +0800

    [refactor](scheduler) Simplify TaskScheduler (#48118)
---
 be/src/pipeline/pipeline_fragment_context.cpp |  4 +-
 be/src/pipeline/pipeline_fragment_context.h   |  2 +-
 be/src/pipeline/task_scheduler.cpp            | 75 ++++++++++-----------------
 3 files changed, 32 insertions(+), 49 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8797ecd49b2..00af07f7bdd 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1753,7 +1753,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
             
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
-void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
+bool PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
     // If all tasks of this pipeline has been closed, upstream tasks is never 
needed, and we just make those runnable here
     DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
     if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
@@ -1767,7 +1767,9 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId 
pipeline_id) {
     ++_closed_tasks;
     if (_closed_tasks == _total_tasks) {
         _close_fragment_instance();
+        return true;
     }
+    return false;
 }
 
 Status PipelineFragmentContext::send_report(bool done) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index bd3a350d0a2..6fa4925e302 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -100,7 +100,7 @@ public:
 
     [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
 
-    void close_a_pipeline(PipelineId pipeline_id);
+    bool decrement_running_task(PipelineId pipeline_id);
 
     Status send_report(bool);
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 60d9efa66ad..7948a853799 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -69,31 +69,34 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
     return _task_queue.push_back(task);
 }
 
-// after _close_task, task maybe destructed.
-void _close_task(PipelineTask* task, Status exec_status) {
+// after close_task, task maybe destructed.
+bool close_task(PipelineTask* task, Status exec_status) {
+    if (exec_status.ok() && task->is_pending_finish()) {
+        // Close phase is blocked by dependency.
+        return false;
+    }
     // Has to attach memory tracker here, because the close task will also 
release some memory.
     // Should count the memory to the query or the query's memory will not 
decrease when part of
     // task finished.
     SCOPED_ATTACH_TASK(task->runtime_state());
     if (task->is_finalized()) {
-        task->set_running(false);
-        return;
+        return false;
+    }
+    if (!exec_status.ok()) {
+        task->fragment_context()->cancel(exec_status);
+        LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} 
reason: {}",
+                                    
print_id(task->query_context()->query_id()),
+                                    exec_status.to_string());
     }
-    // close_a_pipeline may delete fragment context and will core in some defer
-    // code, because the defer code will access fragment context it self.
+    // decrement_running_task may delete fragment context and will core in 
some defer
+    // code, because the defer code will access fragment context itself.
     auto lock_for_context = task->fragment_context()->shared_from_this();
-    // is_pending_finish does not check status, so has to check status in 
close API.
-    // For example, in async writer, the writer may failed during dealing with 
eos_block
-    // but it does not return error status. Has to check the error status in 
close API.
-    // We have already refactor all source and sink api, the close API does 
not need waiting
-    // for pending finish now. So that could call close directly.
     Status status = task->close(exec_status);
     if (!status.ok()) {
         task->fragment_context()->cancel(status);
     }
     task->finalize();
-    task->set_running(false);
-    task->fragment_context()->close_a_pipeline(task->pipeline_id());
+    return 
task->fragment_context()->decrement_running_task(task->pipeline_id());
 }
 
 void TaskScheduler::_do_work(int index) {
@@ -111,28 +114,28 @@ void TaskScheduler::_do_work(int index) {
         }
         task->log_detail_if_need();
         task->set_running(true);
+        bool fragment_is_finished = false;
+        Defer task_running_defer {[&]() {
+            // If fragment is finished, fragment context will be 
de-constructed with all tasks in it.
+            if (!fragment_is_finished) {
+                task->set_running(false);
+            }
+        }};
         task->set_task_queue(&_task_queue);
         auto* fragment_ctx = task->fragment_context();
         bool canceled = fragment_ctx->is_canceled();
 
-        // If the state is PENDING_FINISH, then the task is come from blocked 
queue, its is_pending_finish
-        // has to return false. The task is finished and need to close now.
+        // Close task if canceled
         if (canceled) {
-            // may change from pending FINISH,should called cancel
-            // also may change form BLOCK, other task called cancel
-
-            // If pipeline is canceled, it will report after pipeline closed, 
and will propagate
-            // errors to downstream through exchange. So, here we needn't 
send_report.
-            // fragment_ctx->send_report(true);
-            _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
+            fragment_is_finished = close_task(task, 
fragment_ctx->get_query_ctx()->exec_status());
             continue;
         }
 
-        // task exec
         bool eos = false;
         auto status = Status::OK();
         task->set_core_id(index);
 
+        // Main logics of execution
         ASSIGN_STATUS_IF_CATCH_EXCEPTION(
                 //TODO: use a better enclose to abstracting these
                 if 
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
@@ -151,33 +154,11 @@ void TaskScheduler::_do_work(int index) {
                              start_time, end_time});
                 } else { status = task->execute(&eos); },
                 status);
-
-        if (!status.ok()) {
-            // Print detail informations below when you debugging here.
-            //
-            // LOG(WARNING)<< "task:\n"<<task->debug_string();
-
-            // exec failed,cancel all fragment instance
-            fragment_ctx->cancel(status);
-            LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} 
reason: {}",
-                                        
print_id(task->query_context()->query_id()),
-                                        status.to_string());
-            _close_task(task, status);
-            continue;
-        }
         fragment_ctx->trigger_report_if_necessary();
 
-        if (eos) {
-            // is pending finish will add the task to dependency's blocking 
queue, and then the task will be
-            // added to running queue when dependency is ready.
-            if (!task->is_pending_finish()) {
-                Status exec_status = 
fragment_ctx->get_query_ctx()->exec_status();
-                _close_task(task, exec_status);
-                continue;
-            }
+        if (eos || !status.ok()) {
+            fragment_is_finished = close_task(task, status);
         }
-
-        task->set_running(false);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to