This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 23240ef0df2 [fix](pipelineX) fix error open in scan #33069
23240ef0df2 is described below

commit 23240ef0df2ba31a4edcd75276d38e983b6225f2
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Sun Mar 31 20:29:34 2024 +0800

    [fix](pipelineX) fix error open in scan #33069
---
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 25 +++++++++++++++++--------
 1 file changed, 17 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 5831446c7c3..cf63074318b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -189,16 +189,25 @@ Status PipelineXTask::_open() {
     _dry_run = _sink->should_dry_run(_state);
     for (auto& o : _operators) {
         auto* local_state = _state->get_local_state(o->operator_id());
-        auto st = local_state->open(_state);
-        if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
-            DCHECK(_filter_dependency);
-            _blocked_dep = _filter_dependency->is_blocked_by(this);
-            if (_blocked_dep) {
-                set_state(PipelineTaskState::BLOCKED_FOR_RF);
+        // Here, it needs to loop twice because it's possible that when "open" 
happens,
+        // the filter is not ready yet.
+        // However, during the execution of "is_blocked_by," the filter may 
become ready,
+        // so it needs to be "open" again.
+        for (size_t i = 0; i < 2; i++) {
+            auto st = local_state->open(_state);
+            if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
+                DCHECK(_filter_dependency);
+                _blocked_dep = _filter_dependency->is_blocked_by(this);
+                if (_blocked_dep) {
+                    set_state(PipelineTaskState::BLOCKED_FOR_RF);
+                    RETURN_IF_ERROR(st);
+                } else if (i == 1) {
+                    return Status::InternalError("Unknown RF error, task was 
blocked by RF twice");
+                }
+            } else {
                 RETURN_IF_ERROR(st);
+                break;
             }
-        } else {
-            RETURN_IF_ERROR(st);
         }
     }
     RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to