yiguolei commented on code in PR #49753: URL: https://github.com/apache/doris/pull/49753#discussion_r2024023695
########## be/src/pipeline/pipeline_task.cpp: ########## @@ -745,11 +753,65 @@ Status PipelineTask::wake_up(Dependency* dep) { // call by dependency DCHECK_EQ(_blocked_dep, dep) << "dep : " << dep->debug_string(0) << "task: " << debug_string(); _blocked_dep = nullptr; + auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this()); + DCHECK(_scheduler); + _scheduler->erase_blocked_task(&_blocked_iterator); RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE)); - return get_task_queue()->push_back(this); + RETURN_IF_ERROR(get_task_queue()->push_back(holder)); + return Status::OK(); +} + +Status PipelineTask::_state_transition(State new_state) { + if (_exec_state != new_state) { + _state_change_watcher.reset(); + _state_change_watcher.start(); + } + _task_profile->add_info_string("TaskState", _to_string(new_state)); + _task_profile->add_info_string("BlockedByDependency", _blocked_dep ? _blocked_dep->name() : ""); + switch (new_state) { + case State::RUNNABLE: + if (_exec_state != State::RUNNABLE && _exec_state != State::BLOCKED && + _exec_state != State::INITED) { + return Status::InternalError( + "Task state transition from {} to {} is not allowed! Task info: {}", + _to_string(_exec_state), _to_string(new_state), debug_string()); + } + break; + case State::BLOCKED: + // If this task is blocked, the blocking dependency will hold the raw pointer of this task. + // To ensure this task will not be freed, we use scheduler to hold the shared pointer. + DCHECK(_scheduler); + _scheduler->hold_blocked_task(shared_from_this(), &_blocked_iterator); + if (_exec_state != State::RUNNABLE && _exec_state != State::FINISHED) { + return Status::InternalError( + "Task state transition from {} to {} is not allowed! Task info: {}", + _to_string(_exec_state), _to_string(new_state), debug_string()); + } + break; + case State::FINISHED: + if (_exec_state != State::RUNNABLE) { + return Status::InternalError( + "Task state transition from {} to {} is not allowed! Task info: {}", + _to_string(_exec_state), _to_string(new_state), debug_string()); + } + break; + case State::FINALIZED: + if (_exec_state != State::FINISHED && _exec_state != State::INITED) { + return Status::InternalError( + "Task state transition from {} to {} is not allowed! Task info: {}", + _to_string(_exec_state), _to_string(new_state), debug_string()); + } + break; + default: + return Status::InternalError( + "Task state transition from {} to {} is not allowed! Task info: {}", + _to_string(_exec_state), _to_string(new_state), debug_string()); + } + _exec_state = new_state; + return Status::OK(); } QueryContext* PipelineTask::query_context() { - return _fragment_context->get_query_ctx(); + return _fragment_context.lock()->get_query_ctx(); Review Comment: 如果lock 结果是一个空怎么办? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org