This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 38493a9932805b369261c331d132ca551fd10e2b
Author: Kang <kxiao.ti...@gmail.com>
AuthorDate: Mon Sep 18 18:05:06 2023 +0800

    Revert "[schedule](pipeline) Remove wait schedule time in pipeline query 
engine (#23994) (#24247)"
    
    This reverts commit 91180530ffd144a48851b69c7a47c1e143229d50.
---
 be/src/pipeline/pipeline_task.cpp  |  2 ++
 be/src/pipeline/pipeline_task.h    |  4 ++++
 be/src/pipeline/task_scheduler.cpp | 31 +++++++++++++++++++------------
 be/src/pipeline/task_scheduler.h   |  1 +
 4 files changed, 26 insertions(+), 12 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 2b428ac5f14..afcd876f8bc 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -68,6 +68,7 @@ void PipelineTask::_fresh_profile_counter() {
     COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
     COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
     COUNTER_SET(_wait_worker_timer, 
(int64_t)_wait_worker_watcher.elapsed_time());
+    COUNTER_SET(_wait_schedule_timer, 
(int64_t)_wait_schedule_watcher.elapsed_time());
     COUNTER_SET(_begin_execute_timer, _begin_execute_time);
     COUNTER_SET(_eos_timer, _eos_time);
     COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
@@ -98,6 +99,7 @@ void PipelineTask::_init_profile() {
     _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
     _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
+    _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
     _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
     _block_by_source_counts = ADD_COUNTER(_task_profile, 
"NumBlockedBySrcTimes", TUnit::UNIT);
     _block_by_sink_counts = ADD_COUNTER(_task_profile, 
"NumBlockedBySinkTimes", TUnit::UNIT);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 696b335f0e1..27ca3cdd424 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -130,6 +130,8 @@ public:
         _wait_worker_watcher.start();
     }
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
+    void start_schedule_watcher() { _wait_schedule_watcher.start(); }
+    void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
     PipelineTaskState get_state() { return _cur_state; }
     void set_state(PipelineTaskState state);
 
@@ -308,6 +310,8 @@ private:
     MonotonicStopWatch _wait_worker_watcher;
     RuntimeProfile::Counter* _wait_worker_timer;
     // TODO we should calculate the time between when really runnable and 
runnable
+    MonotonicStopWatch _wait_schedule_watcher;
+    RuntimeProfile::Counter* _wait_schedule_timer;
     RuntimeProfile::Counter* _yield_counts;
     RuntimeProfile::Counter* _core_change_times;
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 9af60404c9f..1f5d5e58b62 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -84,6 +84,7 @@ void BlockedTaskScheduler::_schedule() {
     _started.store(true);
     std::list<PipelineTask*> local_blocked_tasks;
     int empty_times = 0;
+    std::vector<PipelineTask*> ready_tasks;
 
     while (!_shutdown) {
         {
@@ -103,7 +104,6 @@ void BlockedTaskScheduler::_schedule() {
             }
         }
 
-        auto origin_local_block_tasks_size = local_blocked_tasks.size();
         auto iter = local_blocked_tasks.begin();
         vectorized::VecDateTimeValue now = 
vectorized::VecDateTimeValue::local_time();
         while (iter != local_blocked_tasks.end()) {
@@ -114,14 +114,15 @@ void BlockedTaskScheduler::_schedule() {
                 if (task->is_pending_finish()) {
                     iter++;
                 } else {
-                    _make_task_run(local_blocked_tasks, iter, 
PipelineTaskState::PENDING_FINISH);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks,
+                                   PipelineTaskState::PENDING_FINISH);
                 }
             } else if (task->fragment_context()->is_canceled()) {
                 if (task->is_pending_finish()) {
                     task->set_state(PipelineTaskState::PENDING_FINISH);
                     iter++;
                 } else {
-                    _make_task_run(local_blocked_tasks, iter);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 }
             } else if (task->query_context()->is_timeout(now)) {
                 LOG(WARNING) << "Timeout, query_id=" << 
print_id(task->query_context()->query_id)
@@ -134,43 +135,47 @@ void BlockedTaskScheduler::_schedule() {
                     task->set_state(PipelineTaskState::PENDING_FINISH);
                     iter++;
                 } else {
-                    _make_task_run(local_blocked_tasks, iter);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
                 if (task->has_dependency()) {
                     iter++;
                 } else {
-                    _make_task_run(local_blocked_tasks, iter);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
                 if (task->source_can_read()) {
-                    _make_task_run(local_blocked_tasks, iter);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 } else {
                     iter++;
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
                 if (task->runtime_filters_are_ready_or_timeout()) {
-                    _make_task_run(local_blocked_tasks, iter);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 } else {
                     iter++;
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
                 if (task->sink_can_write()) {
-                    _make_task_run(local_blocked_tasks, iter);
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 } else {
                     iter++;
                 }
             } else {
                 // TODO: DCHECK the state
-                _make_task_run(local_blocked_tasks, iter);
+                _make_task_run(local_blocked_tasks, iter, ready_tasks);
             }
         }
 
-        if (origin_local_block_tasks_size == 0 ||
-            local_blocked_tasks.size() == origin_local_block_tasks_size) {
+        if (ready_tasks.empty()) {
             empty_times += 1;
         } else {
             empty_times = 0;
+            for (auto& task : ready_tasks) {
+                task->stop_schedule_watcher();
+                _task_queue->push_back(task);
+            }
+            ready_tasks.clear();
         }
 
         if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 
0) {
@@ -190,11 +195,13 @@ void BlockedTaskScheduler::_schedule() {
 
 void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& 
local_tasks,
                                           std::list<PipelineTask*>::iterator& 
task_itr,
+                                          std::vector<PipelineTask*>& 
ready_tasks,
                                           PipelineTaskState t_state) {
     auto task = *task_itr;
+    task->start_schedule_watcher();
     task->set_state(t_state);
     local_tasks.erase(task_itr++);
-    _task_queue->push_back(task);
+    ready_tasks.emplace_back(task);
 }
 
 TaskScheduler::~TaskScheduler() {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index ad69e10d8b0..bcdbcf1a48b 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -71,6 +71,7 @@ private:
     void _schedule();
     void _make_task_run(std::list<PipelineTask*>& local_tasks,
                         std::list<PipelineTask*>::iterator& task_itr,
+                        std::vector<PipelineTask*>& ready_tasks,
                         PipelineTaskState state = PipelineTaskState::RUNNABLE);
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to