This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e167394dc1 [Fix](pipeline) close sink when fragment context destructs (#21668) e167394dc1 is described below commit e167394dc16e52e4044a32d52226559ff923464c Author: airborne12 <airborn...@gmail.com> AuthorDate: Thu Jul 13 11:52:24 2023 +0800 [Fix](pipeline) close sink when fragment context destructs (#21668) Co-authored-by: airborne12 <airborn...@gmail.com> --- be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++++ be/src/pipeline/pipeline_fragment_context.h | 1 + be/src/runtime/fragment_mgr.cpp | 4 ++++ 3 files changed, 15 insertions(+) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8eef37d931..7e85b19205 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -705,6 +705,16 @@ Status PipelineFragmentContext::submit() { } } +void PipelineFragmentContext::close_sink() { + if (_sink) { + if (_prepared) { + _sink->close(_runtime_state.get(), Status::RuntimeError("prepare failed")); + } else { + _sink->close(_runtime_state.get(), Status::OK()); + } + } +} + void PipelineFragmentContext::close_if_prepare_failed() { if (_tasks.empty()) { if (_root_plan) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 262794154b..cda6206d9b 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -83,6 +83,7 @@ public: Status submit(); void close_if_prepare_failed(); + void close_sink(); void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6d0cf288ae..3f19d489d9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -348,6 +348,10 @@ FragmentMgr::~FragmentMgr() { std::lock_guard<std::mutex> lock(_lock); _fragment_map.clear(); _query_ctx_map.clear(); + for (auto& pipeline : _pipeline_map) { + pipeline.second->close_sink(); + } + _pipeline_map.clear(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org