This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e167394dc1 [Fix](pipeline) close sink when fragment context destructs 
(#21668)
e167394dc1 is described below

commit e167394dc16e52e4044a32d52226559ff923464c
Author: airborne12 <airborn...@gmail.com>
AuthorDate: Thu Jul 13 11:52:24 2023 +0800

    [Fix](pipeline) close sink when fragment context destructs (#21668)
    
    Co-authored-by: airborne12 <airborn...@gmail.com>
---
 be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++++
 be/src/pipeline/pipeline_fragment_context.h   |  1 +
 be/src/runtime/fragment_mgr.cpp               |  4 ++++
 3 files changed, 15 insertions(+)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8eef37d931..7e85b19205 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -705,6 +705,16 @@ Status PipelineFragmentContext::submit() {
     }
 }
 
+void PipelineFragmentContext::close_sink() {
+    if (_sink) {
+        if (_prepared) {
+            _sink->close(_runtime_state.get(), Status::RuntimeError("prepare 
failed"));
+        } else {
+            _sink->close(_runtime_state.get(), Status::OK());
+        }
+    }
+}
+
 void PipelineFragmentContext::close_if_prepare_failed() {
     if (_tasks.empty()) {
         if (_root_plan) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 262794154b..cda6206d9b 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -83,6 +83,7 @@ public:
     Status submit();
 
     void close_if_prepare_failed();
+    void close_sink();
 
     void set_is_report_success(bool is_report_success) { _is_report_success = 
is_report_success; }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6d0cf288ae..3f19d489d9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -348,6 +348,10 @@ FragmentMgr::~FragmentMgr() {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_map.clear();
         _query_ctx_map.clear();
+        for (auto& pipeline : _pipeline_map) {
+            pipeline.second->close_sink();
+        }
+        _pipeline_map.clear();
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to