This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 19b34c09b1 [fix] (mem tracker) Fix runtime instance tracker null pointer (#11272) 19b34c09b1 is described below commit 19b34c09b1d9301daf5ed76eea89bd031eb12bbe Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Jul 28 14:58:13 2022 +0800 [fix] (mem tracker) Fix runtime instance tracker null pointer (#11272) --- be/src/runtime/data_stream_mgr.cpp | 6 +++--- be/src/runtime/data_stream_recvr.cc | 8 +++++--- be/src/runtime/data_stream_recvr.h | 5 +++-- be/src/runtime/memory/mem_tracker_limiter.h | 1 + be/src/runtime/memory/mem_tracker_task_pool.cpp | 2 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 10 ++-------- be/src/runtime/memory/thread_mem_tracker_mgr.h | 4 ++-- be/src/runtime/runtime_state.cpp | 1 + be/src/vec/runtime/vdata_stream_mgr.cpp | 4 ++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 9 ++++++--- be/src/vec/runtime/vdata_stream_recvr.h | 6 +++--- 11 files changed, 29 insertions(+), 27 deletions(-) diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index b0d1dbd8f2..3e519f8987 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr( DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id << ", node=" << dest_node_id; - shared_ptr<DataStreamRecvr> recvr( - new DataStreamRecvr(this, row_desc, fragment_instance_id, dest_node_id, num_senders, - is_merging, buffer_size, profile, sub_plan_query_statistics_recvr)); + shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr( + this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id, + num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); lock_guard<mutex> l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index a89c39cea9..ffd20c58d7 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -447,8 +447,9 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) { DataStreamRecvr::DataStreamRecvr( DataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, - bool is_merging, int total_buffer_limit, RuntimeProfile* profile, + MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, + RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) : _mgr(stream_mgr), _fragment_instance_id(fragment_instance_id), @@ -459,7 +460,8 @@ DataStreamRecvr::DataStreamRecvr( _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { - _mem_tracker = std::make_unique<MemTracker>("DataStreamRecvr", nullptr, _profile); + _mem_tracker = std::make_unique<MemTracker>( + "DataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index efb036b5dd..31af9f2e37 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -116,8 +116,9 @@ private: class SenderQueue; DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, - bool is_merging, int total_buffer_limit, RuntimeProfile* profile, + MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, + int total_buffer_limit, RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr); // If receive queue is full, done is enqueue pending, and return with *done is nullptr diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 786ad945bf..3852cbe52d 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -111,6 +111,7 @@ public: // Note that 'f' must be valid for the lifetime of this tracker limiter. void add_gc_function(GcFunction f) { _gc_functions.push_back(f); } + // TODO Should be managed in a separate process_mem_mgr, not in MemTracker // If consumption is higher than max_consumption, attempts to free memory by calling // any added GC functions. Returns true if max_consumption is still exceeded. Takes gc_lock. // Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment, diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index a4831114cf..02b38acdb5 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -34,7 +34,7 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std: bool new_emplace = _task_mem_trackers.lazy_emplace_l( task_id, [&](std::shared_ptr<MemTrackerLimiter>) {}, [&](const auto& ctor) { - ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit, label, parent)); + ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit, label, parent)); }); if (new_emplace) { LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 53a47202b1..7841a7cb6a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -37,14 +37,8 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg, } void ThreadMemTrackerMgr::detach_limiter_tracker() { -#ifndef BE_TEST - // Unexpectedly, the runtime state is destructed before the end of the query sub-thread, - // (_hash_table_build_thread has appeared) which is not a graceful exit. - // consider replacing CHECK with a conditional statement and checking for runtime state survival. - CHECK(_task_id == "" || - ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id)); -#endif - flush_untracked_mem<false>(); + // Do not flush untracked mem, instance executor thread may exit after instance fragment executor thread, + // `instance_mem_tracker` will be null pointer, which is not a graceful exit. _task_id = ""; _fragment_instance_id = TUniqueId(); _exceed_cb.cancel_msg = ""; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 8ccb6f70b1..1862c2830a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -210,8 +210,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // If you do not want this check, set_check_attach=true // TODO(zxy) The current p0 test cannot guarantee that all threads are checked, // so disable it and try to open it when memory tracking is not on time. - DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || - _limiter_tracker->label() != "Process"); + // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || + // _limiter_tracker->label() != "Process"); #endif Status st = _limiter_tracker->try_consume(_untracked_mem); if (!st) { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 02660730c3..f3284d05f6 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -265,6 +265,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { } Status RuntimeState::init_instance_mem_tracker() { + _query_mem_tracker = nullptr; _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, "RuntimeState:instance"); return Status::OK(); } diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 511fbbe19d..bee1fcf10c 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id << ", node=" << dest_node_id; std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr( - this, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, - buffer_size, profile, sub_plan_query_statistics_recvr)); + this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id, + num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); std::lock_guard<std::mutex> l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 19393b4499..bce11043ac 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -249,8 +249,9 @@ void VDataStreamRecvr::SenderQueue::close() { VDataStreamRecvr::VDataStreamRecvr( VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, - bool is_merging, int total_buffer_limit, RuntimeProfile* profile, + MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, + RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) : _mgr(stream_mgr), _fragment_instance_id(fragment_instance_id), @@ -262,8 +263,10 @@ VDataStreamRecvr::VDataStreamRecvr( _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { + // DataStreamRecvr may be destructed after the instance execution thread ends, `instance_mem_tracker` + // will be a null pointer, and remove_child fails when _mem_tracker is destructed. _mem_tracker = std::make_unique<MemTracker>( - "VDataStreamRecvr:" + print_id(_fragment_instance_id), nullptr, _profile); + "VDataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Create one queue per sender if is_merging is true. diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index bedd18bbce..87024a917a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -51,9 +51,9 @@ class VExprContext; class VDataStreamRecvr { public: VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int num_senders, bool is_merging, int total_buffer_limit, - RuntimeProfile* profile, + MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, + int total_buffer_limit, RuntimeProfile* profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr); ~VDataStreamRecvr(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org