This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 19b34c09b1 [fix] (mem tracker) Fix runtime instance tracker null 
pointer (#11272)
19b34c09b1 is described below

commit 19b34c09b1d9301daf5ed76eea89bd031eb12bbe
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Jul 28 14:58:13 2022 +0800

    [fix] (mem tracker) Fix runtime instance tracker null pointer (#11272)
---
 be/src/runtime/data_stream_mgr.cpp               |  6 +++---
 be/src/runtime/data_stream_recvr.cc              |  8 +++++---
 be/src/runtime/data_stream_recvr.h               |  5 +++--
 be/src/runtime/memory/mem_tracker_limiter.h      |  1 +
 be/src/runtime/memory/mem_tracker_task_pool.cpp  |  2 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 10 ++--------
 be/src/runtime/memory/thread_mem_tracker_mgr.h   |  4 ++--
 be/src/runtime/runtime_state.cpp                 |  1 +
 be/src/vec/runtime/vdata_stream_mgr.cpp          |  4 ++--
 be/src/vec/runtime/vdata_stream_recvr.cpp        |  9 ++++++---
 be/src/vec/runtime/vdata_stream_recvr.h          |  6 +++---
 11 files changed, 29 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/data_stream_mgr.cpp 
b/be/src/runtime/data_stream_mgr.cpp
index b0d1dbd8f2..3e519f8987 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
               << ", node=" << dest_node_id;
-    shared_ptr<DataStreamRecvr> recvr(
-            new DataStreamRecvr(this, row_desc, fragment_instance_id, 
dest_node_id, num_senders,
-                                is_merging, buffer_size, profile, 
sub_plan_query_statistics_recvr));
+    shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
+            this, row_desc, state->query_mem_tracker(), fragment_instance_id, 
dest_node_id,
+            num_senders, is_merging, buffer_size, profile, 
sub_plan_query_statistics_recvr));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     lock_guard<mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
diff --git a/be/src/runtime/data_stream_recvr.cc 
b/be/src/runtime/data_stream_recvr.cc
index a89c39cea9..ffd20c58d7 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -447,8 +447,9 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* 
transfer_batch) {
 
 DataStreamRecvr::DataStreamRecvr(
         DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
-        bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
+        MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
+        PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
+        RuntimeProfile* profile,
         std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
         : _mgr(stream_mgr),
           _fragment_instance_id(fragment_instance_id),
@@ -459,7 +460,8 @@ DataStreamRecvr::DataStreamRecvr(
           _num_buffered_bytes(0),
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
-    _mem_tracker = std::make_unique<MemTracker>("DataStreamRecvr", nullptr, 
_profile);
+    _mem_tracker = std::make_unique<MemTracker>(
+            "DataStreamRecvr:" + print_id(_fragment_instance_id), 
query_mem_tracker, _profile);
 
     // Create one queue per sender if is_merging is true.
     int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_recvr.h 
b/be/src/runtime/data_stream_recvr.h
index efb036b5dd..31af9f2e37 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -116,8 +116,9 @@ private:
     class SenderQueue;
 
     DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-                    const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id, int num_senders,
-                    bool is_merging, int total_buffer_limit, RuntimeProfile* 
profile,
+                    MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
+                    PlanNodeId dest_node_id, int num_senders, bool is_merging,
+                    int total_buffer_limit, RuntimeProfile* profile,
                     std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
 
     // If receive queue is full, done is enqueue pending, and return with 
*done is nullptr
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 786ad945bf..3852cbe52d 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -111,6 +111,7 @@ public:
     // Note that 'f' must be valid for the lifetime of this tracker limiter.
     void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }
 
+    // TODO Should be managed in a separate process_mem_mgr, not in MemTracker
     // If consumption is higher than max_consumption, attempts to free memory 
by calling
     // any added GC functions.  Returns true if max_consumption is still 
exceeded. Takes gc_lock.
     // Note: If the cache of segment/chunk is released due to insufficient 
query memory at a certain moment,
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp 
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index a4831114cf..02b38acdb5 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -34,7 +34,7 @@ MemTrackerLimiter* 
MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
     bool new_emplace = _task_mem_trackers.lazy_emplace_l(
             task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
             [&](const auto& ctor) {
-                ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit, 
label, parent));
+                ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit, 
label, parent));
             });
     if (new_emplace) {
         LOG(INFO) << "Register query/load memory tracker, query/load id: " << 
task_id
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 53a47202b1..7841a7cb6a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -37,14 +37,8 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const 
std::string& cancel_msg,
 }
 
 void ThreadMemTrackerMgr::detach_limiter_tracker() {
-#ifndef BE_TEST
-    // Unexpectedly, the runtime state is destructed before the end of the 
query sub-thread,
-    // (_hash_table_build_thread has appeared) which is not a graceful exit.
-    // consider replacing CHECK with a conditional statement and checking for 
runtime state survival.
-    CHECK(_task_id == "" ||
-          
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id));
-#endif
-    flush_untracked_mem<false>();
+    // Do not flush untracked mem, instance executor thread may exit after 
instance fragment executor thread,
+    // `instance_mem_tracker` will be null pointer, which is not a graceful 
exit.
     _task_id = "";
     _fragment_instance_id = TUniqueId();
     _exceed_cb.cancel_msg = "";
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 8ccb6f70b1..1862c2830a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -210,8 +210,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
         // If you do not want this check, set_check_attach=true
         // TODO(zxy) The current p0 test cannot guarantee that all threads are 
checked,
         // so disable it and try to open it when memory tracking is not on 
time.
-        DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
-               _limiter_tracker->label() != "Process");
+        // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
+        //        _limiter_tracker->label() != "Process");
 #endif
         Status st = _limiter_tracker->try_consume(_untracked_mem);
         if (!st) {
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 02660730c3..f3284d05f6 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -265,6 +265,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& 
query_id) {
 }
 
 Status RuntimeState::init_instance_mem_tracker() {
+    _query_mem_tracker = nullptr;
     _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, 
"RuntimeState:instance");
     return Status::OK();
 }
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 511fbbe19d..bee1fcf10c 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
     VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
               << ", node=" << dest_node_id;
     std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
-            this, row_desc, fragment_instance_id, dest_node_id, num_senders, 
is_merging,
-            buffer_size, profile, sub_plan_query_statistics_recvr));
+            this, row_desc, state->query_mem_tracker(), fragment_instance_id, 
dest_node_id,
+            num_senders, is_merging, buffer_size, profile, 
sub_plan_query_statistics_recvr));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     std::lock_guard<std::mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 19393b4499..bce11043ac 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -249,8 +249,9 @@ void VDataStreamRecvr::SenderQueue::close() {
 
 VDataStreamRecvr::VDataStreamRecvr(
         VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
-        bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
+        MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
+        PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
+        RuntimeProfile* profile,
         std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
         : _mgr(stream_mgr),
           _fragment_instance_id(fragment_instance_id),
@@ -262,8 +263,10 @@ VDataStreamRecvr::VDataStreamRecvr(
           _num_buffered_bytes(0),
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
+    // DataStreamRecvr may be destructed after the instance execution thread 
ends, `instance_mem_tracker`
+    // will be a null pointer, and remove_child fails when _mem_tracker is 
destructed.
     _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamRecvr:" + print_id(_fragment_instance_id), nullptr, 
_profile);
+            "VDataStreamRecvr:" + print_id(_fragment_instance_id), 
query_mem_tracker, _profile);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     // Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index bedd18bbce..87024a917a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -51,9 +51,9 @@ class VExprContext;
 class VDataStreamRecvr {
 public:
     VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-                     const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-                     int num_senders, bool is_merging, int total_buffer_limit,
-                     RuntimeProfile* profile,
+                     MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
+                     PlanNodeId dest_node_id, int num_senders, bool is_merging,
+                     int total_buffer_limit, RuntimeProfile* profile,
                      std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
 
     ~VDataStreamRecvr();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to