This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4268634115f3acef3bb332fa6ee664b52cc3072a Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Mar 11 22:09:08 2024 +0800 [fix](memory) Fix Allocator cancel pipelinex query #32048 --- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 13 ++++++------- be/src/runtime/memory/thread_mem_tracker_mgr.h | 10 +++++----- be/src/runtime/thread_context.cpp | 7 +++---- be/src/runtime/thread_context.h | 13 ++++++------- be/src/vec/common/allocator.cpp | 4 ++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +- 6 files changed, 23 insertions(+), 26 deletions(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 0a1686704c0..1feca6976b8 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -25,12 +25,11 @@ namespace doris { void ThreadMemTrackerMgr::attach_limiter_tracker( - const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const TUniqueId& fragment_instance_id) { + const std::shared_ptr<MemTrackerLimiter>& mem_tracker, const TUniqueId& query_id) { DCHECK(mem_tracker); CHECK(init()); flush_untracked_mem(); - _fragment_instance_id = fragment_instance_id; + _query_id = query_id; _limiter_tracker = mem_tracker; _limiter_tracker_raw = mem_tracker.get(); _wait_gc = true; @@ -40,15 +39,15 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) { CHECK(init()); flush_untracked_mem(); - _fragment_instance_id = TUniqueId(); + _query_id = TUniqueId(); _limiter_tracker = old_mem_tracker; _limiter_tracker_raw = old_mem_tracker.get(); _wait_gc = false; } -void ThreadMemTrackerMgr::cancel_instance(const std::string& exceed_msg) { - ExecEnv::GetInstance()->fragment_mgr()->cancel_instance( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); +void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + _query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 4cb22b9e1ae..d54f77ce3fa 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -53,7 +53,7 @@ public: // After attach, the current thread Memory Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const TUniqueId& fragment_instance_id); + const TUniqueId& query_id); void detach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker = ExecEnv::GetInstance()->orphan_mem_tracker()); @@ -82,7 +82,7 @@ public: void consume(int64_t size, int skip_large_memory_check = 0); void flush_untracked_mem(); - bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } + bool is_attach_query() { return _query_id != TUniqueId(); } std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { CHECK(init()); @@ -95,7 +95,7 @@ public: void disable_wait_gc() { _wait_gc = false; } [[nodiscard]] bool wait_gc() const { return _wait_gc; } - void cancel_instance(const std::string& exceed_msg); + void cancel_query(const std::string& exceed_msg); std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; @@ -130,7 +130,7 @@ private: // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; - TUniqueId _fragment_instance_id = TUniqueId(); + TUniqueId _query_id = TUniqueId(); }; inline bool ThreadMemTrackerMgr::init() { @@ -185,7 +185,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che "malloc or new large memory: {}, {}, this is just a warning, not prevent memory " "alloc, stacktrace:\n{}", size, - is_attach_query() ? "in query or load: " + print_id(_fragment_instance_id) + is_attach_query() ? "in query or load: " + print_id(_query_id) : "not in query or load", get_stack_trace()); _stop_consume = false; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index fca09fcabc5..03c10986bd8 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -24,18 +24,17 @@ namespace doris { class MemTracker; AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const TUniqueId& task_id, const TUniqueId& fragment_instance_id) { + const TUniqueId& task_id) { ThreadLocalHandle::create_thread_local_if_not_exits(); signal::set_signal_task_id(task_id); - thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); + thread_context()->attach_task(task_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { ThreadLocalHandle::create_thread_local_if_not_exits(); signal::set_signal_task_id(runtime_state->query_id()); signal::set_signal_is_nereids(runtime_state->is_nereids()); - thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(), - runtime_state->query_mem_tracker()); + thread_context()->attach_task(runtime_state->query_id(), runtime_state->query_mem_tracker()); } AttachTask::~AttachTask() { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index f6ca46acaba..463dbf44349 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -45,8 +45,8 @@ // This will save some info about a working thread in the thread context. // Looking forward to tracking memory during thread execution into MemTrackerLimiter. #define SCOPED_ATTACH_TASK(arg1) auto VARNAME_LINENUM(attach_task) = AttachTask(arg1) -#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) \ - auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2, arg3) +#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \ + auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2) // Switch MemTrackerLimiter for count memory during thread execution. // Used after SCOPED_ATTACH_TASK, in order to count the memory into another @@ -60,7 +60,7 @@ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) #else #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 -#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) (void)0 +#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) (void)0 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0 #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 #endif @@ -134,7 +134,7 @@ public: ~ThreadContext() = default; - void attach_task(const TUniqueId& task_id, const TUniqueId& fragment_instance_id, + void attach_task(const TUniqueId& task_id, const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { #ifndef BE_TEST // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. @@ -144,7 +144,7 @@ public: << ", attach mem tracker label: " << mem_tracker->label(); #endif _task_id = task_id; - thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id); + thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, _task_id); } void detach_task() { @@ -292,8 +292,7 @@ private: class AttachTask { public: explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const TUniqueId& task_id = TUniqueId(), - const TUniqueId& fragment_instance_id = TUniqueId()); + const TUniqueId& task_id = TUniqueId()); explicit AttachTask(RuntimeState* runtime_state); diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 3d6d2a7a11e..f1dd2af83f7 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -110,7 +110,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t "Query:{} canceled asyn, after waiting for memory {}ms, {}.", print_id(doris::thread_context()->task_id()), wait_milliseconds, err_msg); - doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg); + doris::thread_context()->thread_mem_tracker_mgr->cancel_query(err_msg); } else { LOG(INFO) << fmt::format( "Query:{} throw exception, after waiting for memory {}ms, {}.", @@ -148,7 +148,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz if (!doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.", print_id(doris::thread_context()->task_id()), err_msg); - doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg); + doris::thread_context()->thread_mem_tracker_mgr->cancel_query(err_msg); } else { LOG(INFO) << fmt::format("query/load:{} throw exception, {}.", print_id(doris::thread_context()->task_id()), err_msg); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 0bef6b6eaab..1cfc359de30 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -425,7 +425,7 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id); + SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id); int use_sender_id = _is_merging ? sender_id : 0; return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org