This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7ef1f7e511c [Bug](pipeline) try fix the exchange sink buffer result
error (#27052)
7ef1f7e511c is described below
commit 7ef1f7e511c74d0d72ddeb0bd854ae4dd2d754e0
Author: HappenLee <[email protected]>
AuthorDate: Thu Nov 16 09:20:56 2023 +0800
[Bug](pipeline) try fix the exchange sink buffer result error (#27052)
---
be/src/pipeline/task_scheduler.cpp | 27 ++++++++++++++++-----------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index d1978782554..aa3891a5a2c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -333,24 +333,29 @@ void TaskScheduler::_do_work(size_t index) {
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState
state,
Status exec_status) {
auto status = task->try_close(exec_status);
- if (!status.ok() && state != PipelineTaskState::CANCELED) {
- // Call `close` if `try_close` failed to make sure allocated resources
are released
- static_cast<void>(task->close(exec_status));
+ auto cancel = [&]() {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
- } else if (task->is_pending_finish()) {
- task->set_state(PipelineTaskState::PENDING_FINISH);
- static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
- return;
- } else {
+ };
+
+ auto try_close_failed = !status.ok() && state !=
PipelineTaskState::CANCELED;
+ if (try_close_failed) {
+ cancel();
+ // Call `close` if `try_close` failed to make sure allocated resources
are released
+ static_cast<void>(task->close(exec_status));
+ } else if (!task->is_pending_finish()) {
status = task->close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
- task->query_context()->cancel(true, status.to_string(),
-
Status::Cancelled(status.to_string()));
- state = PipelineTaskState::CANCELED;
+ cancel();
}
}
+
+ if (task->is_pending_finish()) {
+ task->set_state(PipelineTaskState::PENDING_FINISH);
+ static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+ return;
+ }
task->set_state(state);
task->set_close_pipeline_time();
task->release_dependency();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]