This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 839d1adf6c3 [Bug](pipeline) fix pipeline task execute without wait 
second start rpc (#30659)
839d1adf6c3 is described below

commit 839d1adf6c391bf81acde9d9d6158df0b023204b
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Fri Feb 2 18:13:07 2024 +0800

    [Bug](pipeline) fix pipeline task execute without wait second start rpc 
(#30659)
---
 be/src/pipeline/pipeline_task.cpp | 25 ++++++++++++++++++++-----
 be/src/pipeline/pipeline_task.h   |  1 +
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 9b72762ebf9..6ea1b482c85 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -232,15 +232,30 @@ Status PipelineTask::execute(bool* eos) {
     if (!_opened) {
         {
             SCOPED_RAW_TIMER(&time_spent);
-            auto st = _open();
-            if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
+            // if _open_status is not ok, could know have execute open 
function,
+            // now execute open again, so need excluding PIP_WAIT_FOR_RF and 
PIP_WAIT_FOR_SC error out.
+            if (!_open_status.ok() && 
!_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>() &&
+                !_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
+                return _open_status;
+            }
+            // here execute open and not check dependency(eg: the second start 
rpc arrival)
+            // so if open have some error, and return error status directly, 
the query will be cancel.
+            // and then the rpc arrival will not found the query as have been 
canceled and remove.
+            _open_status = _open();
+            if (_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
                 set_state(PipelineTaskState::BLOCKED_FOR_RF);
                 return Status::OK();
-            } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
+            } else if (_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
                 set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
                 return Status::OK();
             }
-            RETURN_IF_ERROR(st);
+            //if status is not ok, and have dependency to push back to queue 
again.
+            if (!_open_status.ok() && has_dependency()) {
+                set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
+                return Status::OK();
+            }
+            // if not ok and no dependency, return error to cancel.
+            RETURN_IF_ERROR(_open_status);
         }
         if (has_dependency()) {
             set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
@@ -297,7 +312,7 @@ Status PipelineTask::execute(bool* eos) {
             }
         }
     }
-    if (*eos) { // now only join node have add_dependency, and join probe 
could start when the join sink is eos
+    if (*eos) { // now only join node/set operation node have add_dependency, 
and join probe could start when the join sink is eos
         _finish_p_dependency();
     }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dd512293e05..04bc55908e3 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -324,6 +324,7 @@ protected:
     // 3 update task statistics(update _queue_level/_core_id)
     int _queue_level = 0;
     int _core_id = 0;
+    Status _open_status = Status::OK();
 
     RuntimeProfile* _parent_profile = nullptr;
     std::unique_ptr<RuntimeProfile> _task_profile;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to