This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 23240ef0df2 [fix](pipelineX) fix error open in scan #33069 23240ef0df2 is described below commit 23240ef0df2ba31a4edcd75276d38e983b6225f2 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Sun Mar 31 20:29:34 2024 +0800 [fix](pipelineX) fix error open in scan #33069 --- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 5831446c7c3..cf63074318b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -189,16 +189,25 @@ Status PipelineXTask::_open() { _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { auto* local_state = _state->get_local_state(o->operator_id()); - auto st = local_state->open(_state); - if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) { - DCHECK(_filter_dependency); - _blocked_dep = _filter_dependency->is_blocked_by(this); - if (_blocked_dep) { - set_state(PipelineTaskState::BLOCKED_FOR_RF); + // Here, it needs to loop twice because it's possible that when "open" happens, + // the filter is not ready yet. + // However, during the execution of "is_blocked_by," the filter may become ready, + // so it needs to be "open" again. + for (size_t i = 0; i < 2; i++) { + auto st = local_state->open(_state); + if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) { + DCHECK(_filter_dependency); + _blocked_dep = _filter_dependency->is_blocked_by(this); + if (_blocked_dep) { + set_state(PipelineTaskState::BLOCKED_FOR_RF); + RETURN_IF_ERROR(st); + } else if (i == 1) { + return Status::InternalError("Unknown RF error, task was blocked by RF twice"); + } + } else { RETURN_IF_ERROR(st); + break; } - } else { - RETURN_IF_ERROR(st); } } RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org