This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 591aeaa98d Revert "[schedule](pipeline) Remove wait schedule time in pipeline query engine (#23994)" (#24472) 591aeaa98d is described below commit 591aeaa98d1178e2e277278c7afeafef9bdb88d6 Author: shuke <37901441+shuke...@users.noreply.github.com> AuthorDate: Mon Sep 18 09:57:38 2023 +0800 Revert "[schedule](pipeline) Remove wait schedule time in pipeline query engine (#23994)" (#24472) This reverts commit 32a7eef96a09799c8336c1964bfe7d676b7e4c98. --- be/src/pipeline/pipeline_task.cpp | 2 ++ be/src/pipeline/pipeline_task.h | 4 ++++ be/src/pipeline/task_scheduler.cpp | 31 +++++++++++++++++++------------ be/src/pipeline/task_scheduler.h | 1 + 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index add92cabf3..bb27012b3d 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -86,6 +86,7 @@ void PipelineTask::_fresh_profile_counter() { COUNTER_SET(_schedule_counts, (int64_t)_schedule_time); COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time()); COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time()); + COUNTER_SET(_wait_schedule_timer, (int64_t)_wait_schedule_watcher.elapsed_time()); COUNTER_SET(_begin_execute_timer, _begin_execute_time); COUNTER_SET(_eos_timer, _eos_time); COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time); @@ -116,6 +117,7 @@ void PipelineTask::_init_profile() { _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime"); _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime"); _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); + _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime"); _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT); _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index b8b8e89215..4311c48f32 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -133,6 +133,8 @@ public: _wait_worker_watcher.start(); } void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } + void start_schedule_watcher() { _wait_schedule_watcher.start(); } + void stop_schedule_watcher() { _wait_schedule_watcher.stop(); } PipelineTaskState get_state() { return _cur_state; } void set_state(PipelineTaskState state); @@ -309,6 +311,8 @@ protected: MonotonicStopWatch _wait_worker_watcher; RuntimeProfile::Counter* _wait_worker_timer; // TODO we should calculate the time between when really runnable and runnable + MonotonicStopWatch _wait_schedule_watcher; + RuntimeProfile::Counter* _wait_schedule_timer; RuntimeProfile::Counter* _yield_counts; RuntimeProfile::Counter* _core_change_times; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c4278c3807..e4a4ec38af 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -85,6 +85,7 @@ void BlockedTaskScheduler::_schedule() { _started.store(true); std::list<PipelineTask*> local_blocked_tasks; int empty_times = 0; + std::vector<PipelineTask*> ready_tasks; while (!_shutdown) { { @@ -104,7 +105,6 @@ void BlockedTaskScheduler::_schedule() { } } - auto origin_local_block_tasks_size = local_blocked_tasks.size(); auto iter = local_blocked_tasks.begin(); vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time(); while (iter != local_blocked_tasks.end()) { @@ -116,52 +116,57 @@ void BlockedTaskScheduler::_schedule() { VLOG_DEBUG << "Task pending" << task->debug_string(); iter++; } else { - _make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH); + _make_task_run(local_blocked_tasks, iter, ready_tasks, + PipelineTaskState::PENDING_FINISH); } } else if (task->query_context()->is_cancelled()) { - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } else if (task->query_context()->is_timeout(now)) { LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id()) << ", instance_id=" << print_id(task->instance_id()) << ", task info: " << task->debug_string(); task->query_context()->cancel(true, "", Status::Cancelled("")); - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { if (task->has_dependency()) { iter++; } else { - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) { if (task->source_can_read()) { - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } else { iter++; } } else if (state == PipelineTaskState::BLOCKED_FOR_RF) { if (task->runtime_filters_are_ready_or_timeout()) { - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } else { iter++; } } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) { if (task->sink_can_write()) { - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } else { iter++; } } else { // TODO: DCHECK the state - _make_task_run(local_blocked_tasks, iter); + _make_task_run(local_blocked_tasks, iter, ready_tasks); } } - if (origin_local_block_tasks_size == 0 || - local_blocked_tasks.size() == origin_local_block_tasks_size) { + if (ready_tasks.empty()) { empty_times += 1; } else { empty_times = 0; + for (auto& task : ready_tasks) { + task->stop_schedule_watcher(); + _task_queue->push_back(task); + } + ready_tasks.clear(); } if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) { @@ -181,11 +186,13 @@ void BlockedTaskScheduler::_schedule() { void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks, std::list<PipelineTask*>::iterator& task_itr, + std::vector<PipelineTask*>& ready_tasks, PipelineTaskState t_state) { auto task = *task_itr; + task->start_schedule_watcher(); task->set_state(t_state); local_tasks.erase(task_itr++); - _task_queue->push_back(task); + ready_tasks.emplace_back(task); } TaskScheduler::~TaskScheduler() { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 13b9e734d6..b9d3dfbac3 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -71,6 +71,7 @@ private: void _schedule(); void _make_task_run(std::list<PipelineTask*>& local_tasks, std::list<PipelineTask*>::iterator& task_itr, + std::vector<PipelineTask*>& ready_tasks, PipelineTaskState state = PipelineTaskState::RUNNABLE); }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org