This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 839d1adf6c3 [Bug](pipeline) fix pipeline task execute without wait second start rpc (#30659) 839d1adf6c3 is described below commit 839d1adf6c391bf81acde9d9d6158df0b023204b Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Fri Feb 2 18:13:07 2024 +0800 [Bug](pipeline) fix pipeline task execute without wait second start rpc (#30659) --- be/src/pipeline/pipeline_task.cpp | 25 ++++++++++++++++++++----- be/src/pipeline/pipeline_task.h | 1 + 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9b72762ebf9..6ea1b482c85 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -232,15 +232,30 @@ Status PipelineTask::execute(bool* eos) { if (!_opened) { { SCOPED_RAW_TIMER(&time_spent); - auto st = _open(); - if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) { + // if _open_status is not ok, could know have execute open function, + // now execute open again, so need excluding PIP_WAIT_FOR_RF and PIP_WAIT_FOR_SC error out. + if (!_open_status.ok() && !_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>() && + !_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) { + return _open_status; + } + // here execute open and not check dependency(eg: the second start rpc arrival) + // so if open have some error, and return error status directly, the query will be cancel. + // and then the rpc arrival will not found the query as have been canceled and remove. + _open_status = _open(); + if (_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>()) { set_state(PipelineTaskState::BLOCKED_FOR_RF); return Status::OK(); - } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) { + } else if (_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } - RETURN_IF_ERROR(st); + //if status is not ok, and have dependency to push back to queue again. + if (!_open_status.ok() && has_dependency()) { + set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); + return Status::OK(); + } + // if not ok and no dependency, return error to cancel. + RETURN_IF_ERROR(_open_status); } if (has_dependency()) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); @@ -297,7 +312,7 @@ Status PipelineTask::execute(bool* eos) { } } } - if (*eos) { // now only join node have add_dependency, and join probe could start when the join sink is eos + if (*eos) { // now only join node/set operation node have add_dependency, and join probe could start when the join sink is eos _finish_p_dependency(); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index dd512293e05..04bc55908e3 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -324,6 +324,7 @@ protected: // 3 update task statistics(update _queue_level/_core_id) int _queue_level = 0; int _core_id = 0; + Status _open_status = Status::OK(); RuntimeProfile* _parent_profile = nullptr; std::unique_ptr<RuntimeProfile> _task_profile; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org