This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fdc063101f9 [pipeline](refactor) remove pipeline task state (#34527) fdc063101f9 is described below commit fdc063101f92de82586b4b1e8cbda6a4c4f0b5cb Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri May 10 09:30:39 2024 +0800 [pipeline](refactor) remove pipeline task state (#34527) --- be/src/pipeline/dependency.cpp | 9 ------ be/src/pipeline/dependency.h | 18 ----------- be/src/pipeline/pipeline_task.cpp | 43 +++++++++---------------- be/src/pipeline/pipeline_task.h | 64 +++----------------------------------- be/src/pipeline/pipeline_tracing.h | 5 ++- be/src/pipeline/task_scheduler.cpp | 53 +++++++------------------------ 6 files changed, 33 insertions(+), 159 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index de8d3e76c6f..2508040ea3f 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -80,15 +80,6 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) { return ready ? nullptr : this; } -Dependency* FinishDependency::is_blocked_by(PipelineTask* task) { - std::unique_lock<std::mutex> lc(_task_lock); - auto ready = _ready.load(); - if (!ready && task) { - _add_block_task(task); - } - return ready ? nullptr : this; -} - std::string Dependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 5a67881c23d..891cc52a712 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -189,24 +189,6 @@ struct FakeSharedState final : public BasicSharedState { ENABLE_FACTORY_CREATOR(FakeSharedState) }; -struct FakeDependency final : public Dependency { -public: - using SharedState = FakeSharedState; - FakeDependency(int id, int node_id, QueryContext* query_ctx) - : Dependency(id, node_id, "FakeDependency", query_ctx) {} - - [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task) override { return nullptr; } -}; - -struct FinishDependency : public Dependency { -public: - using SharedState = FakeSharedState; - FinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx) - : Dependency(id, node_id, name, true, query_ctx) {} - - [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task) override; -}; - struct CountedFinishDependency final : public Dependency { public: using SharedState = FakeSharedState; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index b0925ad875c..0ea82e305fd 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -57,7 +57,6 @@ PipelineTask::PipelineTask( _prepared(false), _opened(false), _state(state), - _cur_state(PipelineTaskState::NOT_READY), _fragment_context(fragment_context), _parent_profile(parent_profile), _operators(pipeline->operator_xs()), @@ -79,7 +78,6 @@ PipelineTask::PipelineTask( Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, QueryContext* query_ctx) { DCHECK(_sink); - DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state); _init_profile(); SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); @@ -116,8 +114,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const std::copy(deps.begin(), deps.end(), std::inserter(_filter_dependencies, _filter_dependencies.end())); } - // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters). - set_state(PipelineTaskState::RUNNABLE); _prepared = true; return Status::OK(); } @@ -205,6 +201,11 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); + *eos = _eos; + if (_eos) { + // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. + return Status::OK(); + } int64_t time_spent = 0; ThreadCpuStopWatch cpu_time_stop_watch; @@ -220,9 +221,7 @@ Status PipelineTask::execute(bool* eos) { cpu_qs->add_cpu_nanos(delta_cpu_time); } }}; - *eos = false; if (has_dependency() || _runtime_filter_blocked_dependency() != nullptr) { - set_state(PipelineTaskState::BLOCKED); return Status::OK(); } // The status must be runnable @@ -232,16 +231,13 @@ Status PipelineTask::execute(bool* eos) { RETURN_IF_ERROR(_open()); } if (!source_can_read() || !sink_can_write()) { - set_state(PipelineTaskState::BLOCKED); return Status::OK(); } } - Status status = Status::OK(); while (!_fragment_context->is_canceled()) { if ((_root->need_data_from_children(_state) && !source_can_read()) || !sink_can_write()) { - set_state(PipelineTaskState::BLOCKED); - break; + return Status::OK(); } /// When a task is cancelled, @@ -265,6 +261,7 @@ Status PipelineTask::execute(bool* eos) { continue; } + *eos = _eos; // Pull block from operator chain if (!_dry_run) { SCOPED_TIMER(_get_block_timer); @@ -277,22 +274,26 @@ Status PipelineTask::execute(bool* eos) { } } else { *eos = true; + _eos = true; } if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); + Status status = Status::OK(); status = _sink->sink(_state, block, *eos); if (!status.is<ErrorCode::END_OF_FILE>()) { RETURN_IF_ERROR(status); } *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos; if (*eos) { // just return, the scheduler will do finish work - break; + _eos = true; + return Status::OK(); } } } - return status; + static_cast<void>(get_task_queue()->push_back(this)); + return Status::OK(); } bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes) { @@ -349,20 +350,6 @@ void PipelineTask::finalize() { _le_state_map.clear(); } -// The FSM see PipelineTaskState's comment -void PipelineTask::set_state(PipelineTaskState state) { - DCHECK(_cur_state != PipelineTaskState::FINISHED); - - if (_cur_state == state) { - return; - } - if (_cur_state == PipelineTaskState::RUNNABLE && state != PipelineTaskState::RUNNABLE) { - COUNTER_UPDATE(_block_counts, 1); - } - - _cur_state = state; -} - Status PipelineTask::close(Status exec_status) { int64_t close_ns = 0; Defer defer {[&]() { @@ -404,9 +391,9 @@ std::string PipelineTask::debug_string() { auto elapsed = (MonotonicNanos() - _fragment_context->create_time()) / 1000000000.0; auto* cur_blocked_dep = _blocked_dep; fmt::format_to(debug_string_buffer, - "PipelineTask[this = {}, state = {}, dry run = {}, elapse time " + "PipelineTask[this = {}, dry run = {}, elapse time " "= {}s], block dependency = {}, is running = {}\noperators: ", - (void*)this, get_state_name(_cur_state), _dry_run, elapsed, + (void*)this, _dry_run, elapsed, cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL", is_running()); for (size_t i = 0; i < _operators.size(); i++) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index ddf6b190eb7..4ca3abbc4c5 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -42,60 +42,6 @@ class PipelineFragmentContext; namespace doris::pipeline { -/** - * PipelineTaskState indicates all possible states of a pipeline task. - * A FSM is described as below: - * - * |-----------------------------------------------------| - * |---| transfer 2 transfer 3 | transfer 4 - * |-------> BLOCKED ------------| |---------------------------------------> CANCELED - * |------| | | transfer 5 transfer 6| - * NOT_READY ---| transfer 0 |-----> RUNNABLE ---|---------> PENDING_FINISH ------| - * | | ^ | transfer 7| - * |------------------------------------| |--------|---------------------------------------> FINISHED - * transfer 1 transfer 9 transfer 8 - * BLOCKED include BLOCKED_FOR_DEPENDENCY, BLOCKED_FOR_SOURCE and BLOCKED_FOR_SINK. - * - * transfer 0 (NOT_READY -> BLOCKED): this pipeline task has some incomplete dependencies - * transfer 1 (NOT_READY -> RUNNABLE): this pipeline task has no incomplete dependencies - * transfer 2 (BLOCKED -> RUNNABLE): runnable condition for this pipeline task is met (e.g. get a new block from rpc) - * transfer 3 (RUNNABLE -> BLOCKED): runnable condition for this pipeline task is not met (e.g. sink operator send a block by RPC and wait for a response) - * transfer 4 (RUNNABLE -> CANCELED): current fragment is cancelled - * transfer 5 (RUNNABLE -> PENDING_FINISH): this pipeline task completed but wait for releasing resources hold by itself - * transfer 6 (PENDING_FINISH -> CANCELED): current fragment is cancelled - * transfer 7 (PENDING_FINISH -> FINISHED): this pipeline task completed and resources hold by itself have been released already - * transfer 8 (RUNNABLE -> FINISHED): this pipeline task completed and no resource need to be released - * transfer 9 (RUNNABLE -> RUNNABLE): this pipeline task yields CPU and re-enters the runnable queue if it is runnable and has occupied CPU for a max time slice - */ -enum class PipelineTaskState : uint8_t { - NOT_READY = 0, // do not prepare - BLOCKED = 1, // blocked by dependency - RUNNABLE = 2, // can execute - PENDING_FINISH = - 3, // compute task is over, but still hold resource. like some scan and sink task - FINISHED = 4, // finish with a regular state - CANCELED = 5, // being cancelled - -}; - -inline const char* get_state_name(PipelineTaskState idx) { - switch (idx) { - case PipelineTaskState::NOT_READY: - return "NOT_READY"; - case PipelineTaskState::BLOCKED: - return "BLOCKED"; - case PipelineTaskState::RUNNABLE: - return "RUNNABLE"; - case PipelineTaskState::PENDING_FINISH: - return "PENDING_FINISH"; - case PipelineTaskState::FINISHED: - return "FINISHED"; - case PipelineTaskState::CANCELED: - return "CANCELED"; - } - __builtin_unreachable(); -} - class TaskQueue; class PriorityTaskQueue; class Dependency; @@ -195,7 +141,7 @@ public: int task_id() const { return _index; }; void clear_blocking_state() { - if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH && _blocked_dep) { + if (!_finished && _blocked_dep) { _blocked_dep->set_ready(); _blocked_dep = nullptr; } @@ -237,8 +183,6 @@ public: } void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } - PipelineTaskState get_state() const { return _cur_state; } - void set_state(PipelineTaskState state); bool is_running() { return _running.load(); } void set_running(bool running) { _running = running; } @@ -264,8 +208,8 @@ public: LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|" << print_id(_state->fragment_instance_id()) << " current pipeline exceed run time " - << config::enable_debug_log_timeout_secs << " seconds. Task state " - << get_state_name(get_state()) << "/n task detail:" << debug_string(); + << config::enable_debug_log_timeout_secs << " seconds. " + << "/n task detail:" << debug_string(); } } @@ -332,7 +276,6 @@ private: RuntimeState* _state = nullptr; int _previous_schedule_id = -1; uint32_t _schedule_time = 0; - PipelineTaskState _cur_state; std::unique_ptr<doris::vectorized::Block> _block; PipelineFragmentContext* _fragment_context = nullptr; TaskQueue* _task_queue = nullptr; @@ -396,6 +339,7 @@ private: std::mutex _release_lock; std::atomic<bool> _running {false}; + std::atomic<bool> _eos {false}; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_tracing.h b/be/src/pipeline/pipeline_tracing.h index aad0a7f9ee8..5d446e6f3d3 100644 --- a/be/src/pipeline/pipeline_tracing.h +++ b/be/src/pipeline/pipeline_tracing.h @@ -39,12 +39,11 @@ struct ScheduleRecord { uint64_t thread_id; uint64_t start_time; uint64_t end_time; - std::string_view state_name; bool operator<(const ScheduleRecord& rhs) const { return start_time < rhs.start_time; } std::string to_string(uint64_t append_value) const { - return fmt::format("{}|{}|{}|{}|{}|{}|{}|{}\n", doris::to_string(query_id), task_id, - core_id, thread_id, start_time, end_time, state_name, append_value); + return fmt::format("{}|{}|{}|{}|{}|{}|{}\n", doris::to_string(query_id), task_id, core_id, + thread_id, start_time, end_time, append_value); } }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index fbb67afdf46..5dc8982e426 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -72,7 +72,7 @@ Status TaskScheduler::schedule_task(PipelineTask* task) { } // after _close_task, task maybe destructed. -void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { +void _close_task(PipelineTask* task, Status exec_status) { // Has to attach memory tracker here, because the close task will also release some memory. // Should count the memory to the query or the query's memory will not decrease when part of // task finished. @@ -86,12 +86,10 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status // We have already refactor all source and sink api, the close API does not need waiting // for pending finish now. So that could call close directly. Status status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { + if (!status.ok()) { task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, std::string(status.msg())); - state = PipelineTaskState::CANCELED; } - task->set_state(state); task->finalize(); task->set_running(false); task->fragment_context()->close_a_pipeline(); @@ -113,25 +111,18 @@ void TaskScheduler::_do_work(size_t index) { auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled(); - auto state = task->get_state(); // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish // has to return false. The task is finished and need to close now. - if (state == PipelineTaskState::PENDING_FINISH || canceled) { + if (canceled) { // may change from pending FINISH,should called cancel // also may change form BLOCK, other task called cancel // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, - exec_status); + _close_task(task, fragment_ctx->get_query_ctx()->exec_status()); continue; } - DCHECK(state != PipelineTaskState::FINISHED && state != PipelineTaskState::CANCELED) - << "task already finish: " << task->debug_string(); - - task->set_state(PipelineTaskState::RUNNABLE); // task exec bool eos = false; @@ -158,10 +149,8 @@ void TaskScheduler::_do_work(size_t index) { status = task->execute(&eos); uint64_t end_time = MonotonicMicros(); - std::string_view state_name = get_state_name(task->get_state()); ExecEnv::GetInstance()->pipeline_tracer_context()->record( - {query_id, task_name, core_id, thread_id, start_time, end_time, - state_name}); + {query_id, task_name, core_id, thread_id, start_time, end_time}); } else { status = task->execute(&eos); } @@ -171,14 +160,7 @@ void TaskScheduler::_do_work(size_t index) { task->set_previous_core_id(index); - if (status.is<ErrorCode::END_OF_FILE>()) { - // Sink operator finished, just close task now. - _close_task(task, PipelineTaskState::FINISHED, Status::OK()); - continue; - } else if (!status.ok()) { - LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", - print_id(task->query_context()->query_id()), - status.to_string()); + if (!status.ok()) { // Print detail informations below when you debugging here. // // LOG(WARNING)<< "task:\n"<<task->debug_string(); @@ -186,7 +168,10 @@ void TaskScheduler::_do_work(size_t index) { // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, std::string(status.to_string_no_stack())); - _close_task(task, PipelineTaskState::CANCELED, status); + LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", + print_id(task->query_context()->query_id()), + status.to_string()); + _close_task(task, status); continue; } fragment_ctx->trigger_report_if_necessary(); @@ -201,29 +186,15 @@ void TaskScheduler::_do_work(size_t index) { // added to running queue when dependency is ready. if (task->is_pending_finish()) { // Only meet eos, should set task to PENDING_FINISH state - task->set_state(PipelineTaskState::PENDING_FINISH); task->set_running(false); } else { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, PipelineTaskState::FINISHED, exec_status); + _close_task(task, exec_status); } continue; } - auto pipeline_state = task->get_state(); - switch (pipeline_state) { - case PipelineTaskState::BLOCKED: - task->set_running(false); - break; - case PipelineTaskState::RUNNABLE: - task->set_running(false); - static_cast<void>(_task_queue->push_back(task, index)); - break; - default: - DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state) - << " task: " << task->debug_string(); - break; - } + task->set_running(false); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org