yiguolei commented on code in PR #45207: URL: https://github.com/apache/doris/pull/45207#discussion_r1886082723
########## be/src/pipeline/pipeline_task.cpp: ########## @@ -361,47 +354,44 @@ Status PipelineTask::execute(bool* eos) { RETURN_IF_ERROR(_sink->revoke_memory(_state)); continue; } - *eos = _eos; DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", { Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed"); return status; }); - // `_dry_run` means sink operator need no more data // `_sink->is_finished(_state)` means sink operator should be finished - if (_dry_run || _sink->is_finished(_state)) { - *eos = true; - _eos = true; - } else { + if (_sink->is_finished(_state)) { + set_wake_up_and_dep_ready(); + } + + // `_dry_run` means sink operator need no more data + *eos = wake_up_early() || _dry_run; + if (!*eos) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); } if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); - Status status = Status::OK(); - // Define a lambda function to catch sink exception, because sink will check - // return error status with EOF, it is special, could not return directly. - auto sink_function = [&]() -> Status { - Status internal_st; - internal_st = _sink->sink(_state, block, *eos); - return internal_st; - }; - status = sink_function(); - if (!status.is<ErrorCode::END_OF_FILE>()) { - RETURN_IF_ERROR(status); + Status status = _sink->sink(_state, block, *eos); + + if (status.is<ErrorCode::END_OF_FILE>()) { + set_wake_up_and_dep_ready(); + } else if (!status) { + return status; } - *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos; + if (*eos) { // just return, the scheduler will do finish work - _eos = true; + RETURN_IF_ERROR(close(status, false)); Review Comment: 这个代码,应该在377 行之前,不能在sink 之后 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org