This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch revert-27052-bug
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 047ba7aef6f9bf8256dbf7d2f1b642d5906c89c8
Author: yiguolei <676222...@qq.com>
AuthorDate: Sat Nov 18 02:10:28 2023 +0800

    Revert "[Bug](pipeline) try fix the exchange sink buffer result error 
(#27052)"
    
    This reverts commit 7ef1f7e511c74d0d72ddeb0bd854ae4dd2d754e0.
---
 be/src/pipeline/task_scheduler.cpp | 27 +++++++++++----------------
 1 file changed, 11 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index aa3891a5a2c..d1978782554 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -333,28 +333,23 @@ void TaskScheduler::_do_work(size_t index) {
 void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState 
state,
                                     Status exec_status) {
     auto status = task->try_close(exec_status);
-    auto cancel = [&]() {
+    if (!status.ok() && state != PipelineTaskState::CANCELED) {
+        // Call `close` if `try_close` failed to make sure allocated resources 
are released
+        static_cast<void>(task->close(exec_status));
         task->query_context()->cancel(true, status.to_string(),
                                       Status::Cancelled(status.to_string()));
         state = PipelineTaskState::CANCELED;
-    };
-
-    auto try_close_failed = !status.ok() && state != 
PipelineTaskState::CANCELED;
-    if (try_close_failed) {
-        cancel();
-        // Call `close` if `try_close` failed to make sure allocated resources 
are released
-        static_cast<void>(task->close(exec_status));
-    } else if (!task->is_pending_finish()) {
-        status = task->close(exec_status);
-        if (!status.ok() && state != PipelineTaskState::CANCELED) {
-            cancel();
-        }
-    }
-
-    if (task->is_pending_finish()) {
+    } else if (task->is_pending_finish()) {
         task->set_state(PipelineTaskState::PENDING_FINISH);
         static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
         return;
+    } else {
+        status = task->close(exec_status);
+        if (!status.ok() && state != PipelineTaskState::CANCELED) {
+            task->query_context()->cancel(true, status.to_string(),
+                                          
Status::Cancelled(status.to_string()));
+            state = PipelineTaskState::CANCELED;
+        }
     }
     task->set_state(state);
     task->set_close_pipeline_time();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to