This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ef1f7e511c [Bug](pipeline) try fix the exchange sink buffer result 
error (#27052)
7ef1f7e511c is described below

commit 7ef1f7e511c74d0d72ddeb0bd854ae4dd2d754e0
Author: HappenLee <[email protected]>
AuthorDate: Thu Nov 16 09:20:56 2023 +0800

    [Bug](pipeline) try fix the exchange sink buffer result error (#27052)
---
 be/src/pipeline/task_scheduler.cpp | 27 ++++++++++++++++-----------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index d1978782554..aa3891a5a2c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -333,24 +333,29 @@ void TaskScheduler::_do_work(size_t index) {
 void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState 
state,
                                     Status exec_status) {
     auto status = task->try_close(exec_status);
-    if (!status.ok() && state != PipelineTaskState::CANCELED) {
-        // Call `close` if `try_close` failed to make sure allocated resources 
are released
-        static_cast<void>(task->close(exec_status));
+    auto cancel = [&]() {
         task->query_context()->cancel(true, status.to_string(),
                                       Status::Cancelled(status.to_string()));
         state = PipelineTaskState::CANCELED;
-    } else if (task->is_pending_finish()) {
-        task->set_state(PipelineTaskState::PENDING_FINISH);
-        static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
-        return;
-    } else {
+    };
+
+    auto try_close_failed = !status.ok() && state != 
PipelineTaskState::CANCELED;
+    if (try_close_failed) {
+        cancel();
+        // Call `close` if `try_close` failed to make sure allocated resources 
are released
+        static_cast<void>(task->close(exec_status));
+    } else if (!task->is_pending_finish()) {
         status = task->close(exec_status);
         if (!status.ok() && state != PipelineTaskState::CANCELED) {
-            task->query_context()->cancel(true, status.to_string(),
-                                          
Status::Cancelled(status.to_string()));
-            state = PipelineTaskState::CANCELED;
+            cancel();
         }
     }
+
+    if (task->is_pending_finish()) {
+        task->set_state(PipelineTaskState::PENDING_FINISH);
+        static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+        return;
+    }
     task->set_state(state);
     task->set_close_pipeline_time();
     task->release_dependency();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to