This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new bfc92605070 [bugfix](deadlock) avoid deadlock in memtracker cancel query (#33400) bfc92605070 is described below commit bfc9260507015d66949525ae8ed4c7dd307713f1 Author: yiguolei <676222...@qq.com> AuthorDate: Tue Apr 9 12:19:28 2024 +0800 [bugfix](deadlock) avoid deadlock in memtracker cancel query (#33400) get_query_ctx(hold query ctx map lock) ---> QueryCtx ---> runtime statistics mgr ---> runtime statistics mgr ---> allocate block memory ---> cancel query memtracker will try to cancel query when memory is not available during allocator. BUT the allocator is a foundermental API, if it call the upper API it may deadlock. Should not call any API during allocator. --- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 32 ++++++++++++++++++++++-- be/src/runtime/memory/thread_mem_tracker_mgr.h | 1 + be/src/runtime/query_context.cpp | 10 ++++++-- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 1feca6976b8..8596951acfa 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -24,6 +24,23 @@ namespace doris { +class AsyncCancelQueryTask : public Runnable { + ENABLE_FACTORY_CREATOR(AsyncCancelQueryTask); + +public: + AsyncCancelQueryTask(TUniqueId query_id, const std::string& exceed_msg) + : _query_id(query_id), _exceed_msg(exceed_msg) {} + ~AsyncCancelQueryTask() override = default; + void run() override { + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + _query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, _exceed_msg); + } + +private: + TUniqueId _query_id; + const std::string _exceed_msg; +}; + void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr<MemTrackerLimiter>& mem_tracker, const TUniqueId& query_id) { DCHECK(mem_tracker); @@ -46,8 +63,19 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( } void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - _query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); + if (is_attach_query() && !_is_query_cancelled) { + Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( + AsyncCancelQueryTask::create_shared(_query_id, exceed_msg)); + if (submit_st.ok()) { + // Use this flag to avoid the cancel request submit to pool many times, because even we cancel the query + // successfully, but the application may not use if (state.iscancelled) to exist quickly. And it may try to + // allocate memory and may failed again and the pool will be full. + _is_query_cancelled = true; + } else { + LOG(WARNING) << "Failed to submit cancel query task to pool, query_id " + << print_id(_query_id) << ", error st " << submit_st; + } + } } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index d54f77ce3fa..b2fa3df9f8c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -131,6 +131,7 @@ private: // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; TUniqueId _query_id = TUniqueId(); + bool _is_query_cancelled = false; }; inline bool ThreadMemTrackerMgr::init() { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2fedfc7814c..2e3fcd613c3 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -29,6 +29,8 @@ namespace doris { class DelayReleaseToken : public Runnable { + ENABLE_FACTORY_CREATOR(DelayReleaseToken); + public: DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); } ~DelayReleaseToken() override = default; @@ -115,8 +117,12 @@ QueryContext::~QueryContext() { // And also thread token need shutdown, it may take some time, may cause the thread that // release the token hang, the thread maybe a pipeline task scheduler thread. if (_thread_token) { - static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( - std::make_shared<DelayReleaseToken>(std::move(_thread_token)))); + Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( + DelayReleaseToken::create_shared(std::move(_thread_token))); + if (!submit_st.ok()) { + LOG(WARNING) << "Failed to release query context thread token, query_id " + << print_id(_query_id) << ", error status " << submit_st; + } } //TODO: check if pipeline and tracing both enabled --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org