This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 686fff6441 [schedule](pipeline) Remove wait schedule time in pipeline
query engine (#23994) (#24247)
686fff6441 is described below
commit 686fff6441507b3b99eba842733c183d3325bae5
Author: HappenLee <[email protected]>
AuthorDate: Tue Sep 12 17:40:40 2023 +0800
[schedule](pipeline) Remove wait schedule time in pipeline query engine
(#23994) (#24247)
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_task.cpp | 2 --
be/src/pipeline/pipeline_task.h | 4 ----
be/src/pipeline/task_scheduler.cpp | 31 ++++++++++++-------------------
be/src/pipeline/task_scheduler.h | 1 -
4 files changed, 12 insertions(+), 26 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index afcd876f8b..2b428ac5f1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -68,7 +68,6 @@ void PipelineTask::_fresh_profile_counter() {
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
COUNTER_SET(_wait_worker_timer,
(int64_t)_wait_worker_watcher.elapsed_time());
- COUNTER_SET(_wait_schedule_timer,
(int64_t)_wait_schedule_watcher.elapsed_time());
COUNTER_SET(_begin_execute_timer, _begin_execute_time);
COUNTER_SET(_eos_timer, _eos_time);
COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
@@ -99,7 +98,6 @@ void PipelineTask::_init_profile() {
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
- _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
_block_by_source_counts = ADD_COUNTER(_task_profile,
"NumBlockedBySrcTimes", TUnit::UNIT);
_block_by_sink_counts = ADD_COUNTER(_task_profile,
"NumBlockedBySinkTimes", TUnit::UNIT);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 27ca3cdd42..696b335f0e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -130,8 +130,6 @@ public:
_wait_worker_watcher.start();
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
- void start_schedule_watcher() { _wait_schedule_watcher.start(); }
- void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);
@@ -310,8 +308,6 @@ private:
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer;
// TODO we should calculate the time between when really runnable and
runnable
- MonotonicStopWatch _wait_schedule_watcher;
- RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 1f5d5e58b6..9af60404c9 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -84,7 +84,6 @@ void BlockedTaskScheduler::_schedule() {
_started.store(true);
std::list<PipelineTask*> local_blocked_tasks;
int empty_times = 0;
- std::vector<PipelineTask*> ready_tasks;
while (!_shutdown) {
{
@@ -104,6 +103,7 @@ void BlockedTaskScheduler::_schedule() {
}
}
+ auto origin_local_block_tasks_size = local_blocked_tasks.size();
auto iter = local_blocked_tasks.begin();
vectorized::VecDateTimeValue now =
vectorized::VecDateTimeValue::local_time();
while (iter != local_blocked_tasks.end()) {
@@ -114,15 +114,14 @@ void BlockedTaskScheduler::_schedule() {
if (task->is_pending_finish()) {
iter++;
} else {
- _make_task_run(local_blocked_tasks, iter, ready_tasks,
- PipelineTaskState::PENDING_FINISH);
+ _make_task_run(local_blocked_tasks, iter,
PipelineTaskState::PENDING_FINISH);
}
} else if (task->fragment_context()->is_canceled()) {
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
} else {
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
}
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" <<
print_id(task->query_context()->query_id)
@@ -135,47 +134,43 @@ void BlockedTaskScheduler::_schedule() {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
} else {
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (task->has_dependency()) {
iter++;
} else {
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
if (task->source_can_read()) {
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
if (task->runtime_filters_are_ready_or_timeout()) {
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
if (task->sink_can_write()) {
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else {
// TODO: DCHECK the state
- _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ _make_task_run(local_blocked_tasks, iter);
}
}
- if (ready_tasks.empty()) {
+ if (origin_local_block_tasks_size == 0 ||
+ local_blocked_tasks.size() == origin_local_block_tasks_size) {
empty_times += 1;
} else {
empty_times = 0;
- for (auto& task : ready_tasks) {
- task->stop_schedule_watcher();
- _task_queue->push_back(task);
- }
- ready_tasks.clear();
}
if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) ==
0) {
@@ -195,13 +190,11 @@ void BlockedTaskScheduler::_schedule() {
void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>&
local_tasks,
std::list<PipelineTask*>::iterator&
task_itr,
- std::vector<PipelineTask*>&
ready_tasks,
PipelineTaskState t_state) {
auto task = *task_itr;
- task->start_schedule_watcher();
task->set_state(t_state);
local_tasks.erase(task_itr++);
- ready_tasks.emplace_back(task);
+ _task_queue->push_back(task);
}
TaskScheduler::~TaskScheduler() {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index bcdbcf1a48..ad69e10d8b 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -71,7 +71,6 @@ private:
void _schedule();
void _make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
- std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState state = PipelineTaskState::RUNNABLE);
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]