This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch revert-27052-bug in repository https://gitbox.apache.org/repos/asf/doris.git
commit 047ba7aef6f9bf8256dbf7d2f1b642d5906c89c8 Author: yiguolei <676222...@qq.com> AuthorDate: Sat Nov 18 02:10:28 2023 +0800 Revert "[Bug](pipeline) try fix the exchange sink buffer result error (#27052)" This reverts commit 7ef1f7e511c74d0d72ddeb0bd854ae4dd2d754e0. --- be/src/pipeline/task_scheduler.cpp | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index aa3891a5a2c..d1978782554 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -333,28 +333,23 @@ void TaskScheduler::_do_work(size_t index) { void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { auto status = task->try_close(exec_status); - auto cancel = [&]() { + if (!status.ok() && state != PipelineTaskState::CANCELED) { + // Call `close` if `try_close` failed to make sure allocated resources are released + static_cast<void>(task->close(exec_status)); task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); state = PipelineTaskState::CANCELED; - }; - - auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED; - if (try_close_failed) { - cancel(); - // Call `close` if `try_close` failed to make sure allocated resources are released - static_cast<void>(task->close(exec_status)); - } else if (!task->is_pending_finish()) { - status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - cancel(); - } - } - - if (task->is_pending_finish()) { + } else if (task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); static_cast<void>(_blocked_task_scheduler->add_blocked_task(task)); return; + } else { + status = task->close(exec_status); + if (!status.ok() && state != PipelineTaskState::CANCELED) { + task->query_context()->cancel(true, status.to_string(), + Status::Cancelled(status.to_string())); + state = PipelineTaskState::CANCELED; + } } task->set_state(state); task->set_close_pipeline_time(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org