This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 85362a907e [fix](mem tracker) Fix some memory leaks, inaccurate 
statistics, core dump, deadlock bugs (#10072)
85362a907e is described below

commit 85362a907e2c55f0a856daaaa34dd9e0d461d736
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Jun 14 21:38:37 2022 +0800

    [fix](mem tracker) Fix some memory leaks, inaccurate statistics, core dump, 
deadlock bugs (#10072)
    
    1. Fix the memory leak. When the load task is canceled, the `IndexChannel` 
and `NodeChannel` mem trackers cannot be destructed in time.
    2. Fix Load task being frequently canceled by oom and inaccurate 
`LoadChannel` mem tracker limit, and rewrite the variable name of `mem limit` 
in `LoadChannel`.
    3. Fix core dump, when logout task mem tracker, phmap erase fails, 
resulting in repeated logout of the same tracker.
    4. Fix the deadlock, when add_child_tracker mem limit exceeds, calling 
log_usage causes `_child_trackers_lock` deadlock.
    5. Fix frequent log printing when thread mem tracker limit exceeds, which 
will affect readability and performance.
    6. Optimize some details of mem tracker display.
---
 be/src/exec/tablet_sink.cpp                        |  1 -
 be/src/exec/tablet_sink.h                          |  1 -
 be/src/gutil/strings/numbers.cc                    |  2 +-
 be/src/olap/task/engine_alter_tablet_task.cpp      |  2 +-
 be/src/olap/task/engine_batch_load_task.cpp        |  2 +-
 be/src/olap/task/engine_checksum_task.cpp          |  2 +-
 be/src/olap/task/engine_clone_task.cpp             |  2 +-
 .../olap/task/engine_storage_migration_task_v2.cpp |  2 +-
 be/src/runtime/load_channel.cpp                    |  9 +++--
 be/src/runtime/load_channel.h                      |  5 ++-
 be/src/runtime/load_channel_mgr.cpp                | 44 +++++++++++-----------
 be/src/runtime/load_channel_mgr.h                  |  7 ++--
 be/src/runtime/mem_tracker.cpp                     |  6 ++-
 be/src/runtime/mem_tracker.h                       |  3 +-
 be/src/runtime/mem_tracker_task_pool.cpp           | 30 ++++++++++++---
 be/src/runtime/thread_context.cpp                  |  8 ++--
 be/src/runtime/thread_context.h                    |  3 +-
 be/src/runtime/thread_mem_tracker_mgr.cpp          | 17 ++++-----
 be/src/runtime/thread_mem_tracker_mgr.h            | 17 ++++++---
 19 files changed, 98 insertions(+), 65 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index e1d60f694c..c15f303200 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -558,7 +558,6 @@ Status NodeChannel::none_of(std::initializer_list<bool> 
vars) {
 }
 
 void NodeChannel::clear_all_batches() {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     std::lock_guard<std::mutex> lg(_pending_batches_lock);
     std::queue<AddBatchReq> empty;
     std::swap(_pending_batches, empty);
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8b3186e0c1..d797a1b80c 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -330,7 +330,6 @@ public:
 
     void for_each_node_channel(
             const std::function<void(const std::shared_ptr<NodeChannel>&)>& 
func) {
-        SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
         for (auto& it : _node_channels) {
             func(it.second);
         }
diff --git a/be/src/gutil/strings/numbers.cc b/be/src/gutil/strings/numbers.cc
index 46be289fb8..24c993b86a 100644
--- a/be/src/gutil/strings/numbers.cc
+++ b/be/src/gutil/strings/numbers.cc
@@ -1488,7 +1488,7 @@ string AccurateItoaKMGT(int64 i) {
         i = -i;
     }
 
-    string ret = std::to_string(i) + " = " + StringPrintf("%s", sign);
+    string ret = StringPrintf("%s", sign) + std::to_string(i) + " = " + 
StringPrintf("%s", sign);
     int64 val;
     if ((val = (i >> 40)) > 1) {
         ret += StringPrintf("%" PRId64
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp 
b/be/src/olap/task/engine_alter_tablet_task.cpp
index 7686a632ae..24496822d3 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const 
TAlterTabletReqV2& request)
         : _alter_tablet_req(request) {
     _mem_tracker = MemTracker::create_tracker(
             config::memory_limitation_per_thread_for_schema_change_bytes,
-            fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}",
+            fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
                         std::to_string(_alter_tablet_req.base_tablet_id),
                         std::to_string(_alter_tablet_req.new_tablet_id)),
             StorageEngine::instance()->schema_change_mem_tracker(), 
MemTrackerLevel::TASK);
diff --git a/be/src/olap/task/engine_batch_load_task.cpp 
b/be/src/olap/task/engine_batch_load_task.cpp
index 9c507d362d..7add215823 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -55,7 +55,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, 
std::vector<TTablet
     _download_status = Status::OK();
     _mem_tracker = MemTracker::create_tracker(
             -1,
-            fmt::format("EngineBatchLoadTask:pushType={}:tabletId={}", 
_push_req.push_type,
+            fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", 
_push_req.push_type,
                         std::to_string(_push_req.tablet_id)),
             StorageEngine::instance()->batch_load_mem_tracker(), 
MemTrackerLevel::TASK);
 }
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index 30ef01bc7e..37efd52e1d 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -27,7 +27,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, 
TSchemaHash schema_h
                                        TVersion version, uint32_t* checksum)
         : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), 
_checksum(checksum) {
     _mem_tracker = MemTracker::create_tracker(
-            -1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id),
+            -1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id),
             StorageEngine::instance()->consistency_mem_tracker(), 
MemTrackerLevel::TASK);
 }
 
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 5fcdd86e1f..04a7df7199 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, 
const TMasterInfo&
           _signature(signature),
           _master_info(master_info) {
     _mem_tracker = MemTracker::create_tracker(
-            -1, "EngineCloneTask:tabletId=" + 
std::to_string(_clone_req.tablet_id),
+            -1, "EngineCloneTask#tabletId=" + 
std::to_string(_clone_req.tablet_id),
             StorageEngine::instance()->clone_mem_tracker(), 
MemTrackerLevel::TASK);
 }
 
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp 
b/be/src/olap/task/engine_storage_migration_task_v2.cpp
index 5e865c9849..fe00536662 100644
--- a/be/src/olap/task/engine_storage_migration_task_v2.cpp
+++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp
@@ -28,7 +28,7 @@ 
EngineStorageMigrationTaskV2::EngineStorageMigrationTaskV2(const TStorageMigrati
         : _storage_migration_req(request) {
     _mem_tracker = MemTracker::create_tracker(
             config::memory_limitation_per_thread_for_storage_migration_bytes,
-            fmt::format("EngineStorageMigrationTaskV2: {}-{}",
+            
fmt::format("EngineStorageMigrationTaskV2#baseTabletId{}:newTabletId{}",
                         std::to_string(_storage_migration_req.base_tablet_id),
                         std::to_string(_storage_migration_req.new_tablet_id)),
             StorageEngine::instance()->storage_migration_mem_tracker(), 
MemTrackerLevel::TASK);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 1c270a3473..42fbe9dac5 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,17 +25,18 @@
 
 namespace doris {
 
-LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t 
timeout_s,
-                         bool is_high_priority, const std::string& sender_ip, 
bool is_vec)
+LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, 
int64_t channel_mem_limit,
+                         int64_t timeout_s, bool is_high_priority, const 
std::string& sender_ip,
+                         bool is_vec)
         : _load_id(load_id),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
           _sender_ip(sender_ip),
           _is_vec(is_vec) {
     _mem_tracker = MemTracker::create_tracker(
-            mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(),
+            channel_mem_limit, "LoadChannel#senderIp=" + sender_ip,
             
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker(
-                    _load_id.to_string(), mem_limit),
+                    _load_id.to_string(), load_mem_limit),
             MemTrackerLevel::TASK);
     // _last_updated_time should be set before being inserted to
     // _load_channels in load_channel_mgr, or it may be erased
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 644e546524..38cc2ac89f 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,8 +39,9 @@ class Cache;
 // corresponding to a certain load job
 class LoadChannel {
 public:
-    LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                bool is_high_priority, const std::string& sender_ip, bool 
is_vec);
+    LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t 
channel_mem_limit,
+                int64_t timeout_s, bool is_high_priority, const std::string& 
sender_ip,
+                bool is_vec);
     ~LoadChannel();
 
     // open a new load channel if not exist
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 2259eb2403..aea5479aa6 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -42,22 +42,22 @@ static int64_t calc_process_max_load_memory(int64_t 
process_mem_limit) {
     return std::min<int64_t>(max_load_memory_bytes, 
config::load_process_max_memory_limit_bytes);
 }
 
-// Calculate the memory limit for a single load job.
-static int64_t calc_job_max_load_memory(int64_t mem_limit_in_req, int64_t 
total_mem_limit) {
+// Calculate the memory limit for a single load channel.
+static int64_t calc_channel_max_load_memory(int64_t load_mem_limit, int64_t 
total_mem_limit) {
     // default mem limit is used to be compatible with old request.
     // new request should be set load_mem_limit.
-    constexpr int64_t default_load_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB
-    int64_t load_mem_limit = default_load_mem_limit;
-    if (mem_limit_in_req != -1) {
+    constexpr int64_t default_channel_mem_limit = 2 * 1024 * 1024 * 1024L; // 
2GB
+    int64_t channel_mem_limit = default_channel_mem_limit;
+    if (load_mem_limit != -1) {
         // mem-limit of a certain load should between config::write_buffer_size
         // and total-memory-limit
-        load_mem_limit = std::max<int64_t>(mem_limit_in_req, 
config::write_buffer_size);
-        load_mem_limit = std::min<int64_t>(load_mem_limit, total_mem_limit);
+        channel_mem_limit = std::max<int64_t>(load_mem_limit, 
config::write_buffer_size);
+        channel_mem_limit = std::min<int64_t>(channel_mem_limit, 
total_mem_limit);
     }
-    return load_mem_limit;
+    return channel_mem_limit;
 }
 
-static int64_t calc_job_timeout_s(int64_t timeout_in_req_s) {
+static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) {
     int64_t load_channel_timeout_s = 
config::streaming_load_rpc_max_alive_time_sec;
     if (timeout_in_req_s > 0) {
         load_channel_timeout_s = std::max<int64_t>(load_channel_timeout_s, 
timeout_in_req_s);
@@ -83,8 +83,8 @@ LoadChannelMgr::~LoadChannelMgr() {
 }
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
-    int64_t load_mem_limit = calc_process_max_load_memory(process_mem_limit);
-    _mem_tracker = MemTracker::create_tracker(load_mem_limit, "LoadChannelMgr",
+    int64_t load_mgr_mem_limit = 
calc_process_max_load_memory(process_mem_limit);
+    _mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit, 
"LoadChannelMgr",
                                               
MemTracker::get_process_tracker(),
                                               MemTrackerLevel::OVERVIEW);
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
@@ -95,10 +95,12 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
     return Status::OK();
 }
 
-LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, 
int64_t mem_limit,
-                                                  int64_t timeout_s, bool 
is_high_priority,
+LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, 
int64_t load_mem_limit,
+                                                  int64_t channel_mem_limit, 
int64_t timeout_s,
+                                                  bool is_high_priority,
                                                   const std::string& 
sender_ip, bool is_vec) {
-    return new LoadChannel(load_id, mem_limit, timeout_s, is_high_priority, 
sender_ip, is_vec);
+    return new LoadChannel(load_id, load_mem_limit, channel_mem_limit, 
timeout_s, is_high_priority,
+                           sender_ip, is_vec);
 }
 
 Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
@@ -112,18 +114,18 @@ Status LoadChannelMgr::open(const 
PTabletWriterOpenRequest& params) {
             channel = it->second;
         } else {
             // create a new load channel
-            int64_t mem_limit_in_req = params.has_load_mem_limit() ? 
params.load_mem_limit() : -1;
-            int64_t job_max_memory =
-                    calc_job_max_load_memory(mem_limit_in_req, 
_mem_tracker->limit());
+            int64_t load_mem_limit = params.has_load_mem_limit() ? 
params.load_mem_limit() : -1;
+            int64_t channel_mem_limit =
+                    calc_channel_max_load_memory(load_mem_limit, 
_mem_tracker->limit());
 
             int64_t timeout_in_req_s =
                     params.has_load_channel_timeout_s() ? 
params.load_channel_timeout_s() : -1;
-            int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s);
+            int64_t channel_timeout_s = 
calc_channel_timeout_s(timeout_in_req_s);
 
             bool is_high_priority = (params.has_is_high_priority() && 
params.is_high_priority());
-            channel.reset(_create_load_channel(load_id, job_max_memory, 
job_timeout_s,
-                                               is_high_priority, 
params.sender_ip(),
-                                               params.is_vectorized()));
+            channel.reset(_create_load_channel(load_id, load_mem_limit, 
channel_mem_limit,
+                                               channel_timeout_s, 
is_high_priority,
+                                               params.sender_ip(), 
params.is_vectorized()));
             _load_channels.insert({load_id, channel});
         }
     }
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 7e1c4450f0..65d72534f4 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -61,9 +61,10 @@ public:
     std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
 
 private:
-    static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t 
mem_limit,
-                                             int64_t timeout_s, bool 
is_high_priority,
-                                             const std::string& sender_ip, 
bool is_vec);
+    static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t 
load_mem_limit,
+                                             int64_t channel_mem_limit, 
int64_t timeout_s,
+                                             bool is_high_priority, const 
std::string& sender_ip,
+                                             bool is_vec);
 
     template <typename Request>
     Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& 
is_eof,
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index 359b0f5368..824c8b8530 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -124,14 +124,17 @@ std::shared_ptr<MemTracker> 
MemTracker::create_tracker_impl(
     std::string reset_label;
     MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker();
     if (task_parent_tracker) {
-        reset_label = fmt::format("{}:{}", label, 
split(task_parent_tracker->label(), ":")[1]);
+        reset_label = fmt::format("{}#{}", label, 
split(task_parent_tracker->label(), "#")[1]);
     } else {
         reset_label = label;
     }
+    if (byte_limit == -1) byte_limit = reset_parent->limit();
 
     std::shared_ptr<MemTracker> tracker(
             new MemTracker(byte_limit, reset_label, reset_parent,
                            level > reset_parent->_level ? level : 
reset_parent->_level, profile));
+    // Do not check limit exceed when add_child_tracker, otherwise it will 
cause deadlock when log_usage is called.
+    STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
     reset_parent->add_child_tracker(tracker);
     return tracker;
 }
@@ -285,6 +288,7 @@ std::string MemTracker::log_usage(int max_recursive_depth,
 
 Status MemTracker::mem_limit_exceeded(RuntimeState* state, const std::string& 
details,
                                       int64_t failed_allocation_size, Status 
failed_alloc) {
+    STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
     MemTracker* process_tracker = MemTracker::get_raw_process_tracker();
     std::string detail =
             "Memory exceed limit. fragment={}, details={}, on backend={}. 
Memory left in process "
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index c21d6d3db5..85a6550f7a 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -401,8 +401,7 @@ public:
     /// 'failed_allocation_size' is zero, nothing about the allocation size is 
logged.
     /// If 'state' is non-nullptr, logs the error to 'state'.
     Status mem_limit_exceeded(RuntimeState* state, const std::string& details 
= std::string(),
-                              int64_t failed_allocation = -1,
-                              Status failed_alloc = Status::OK()) 
WARN_UNUSED_RESULT;
+                              int64_t failed_allocation = -1, Status 
failed_alloc = Status::OK());
 
     // Usually, a negative values means that the statistics are not accurate,
     // 1. The released memory is not consumed.
diff --git a/be/src/runtime/mem_tracker_task_pool.cpp 
b/be/src/runtime/mem_tracker_task_pool.cpp
index 551d904111..84f1a951e0 100644
--- a/be/src/runtime/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/mem_tracker_task_pool.cpp
@@ -41,16 +41,17 @@ std::shared_ptr<MemTracker> 
MemTrackerTaskPool::register_query_mem_tracker(
     VLOG_FILE << "Register Query memory tracker, query id: " << query_id
               << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
     return register_task_mem_tracker_impl(query_id, mem_limit,
-                                          fmt::format("Query:queryId={}", 
query_id),
+                                          fmt::format("Query#queryId={}", 
query_id),
                                           
ExecEnv::GetInstance()->query_pool_mem_tracker());
 }
 
 std::shared_ptr<MemTracker> MemTrackerTaskPool::register_load_mem_tracker(
         const std::string& load_id, int64_t mem_limit) {
+    // In load, the query id of the fragment is executed, which is the same as 
the load id of the load channel.
     VLOG_FILE << "Register Load memory tracker, load id: " << load_id
               << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
     return register_task_mem_tracker_impl(load_id, mem_limit,
-                                          fmt::format("Load:loadId={}", 
load_id),
+                                          fmt::format("Load#loadId={}", 
load_id),
                                           
ExecEnv::GetInstance()->load_pool_mem_tracker());
 }
 
@@ -66,8 +67,13 @@ std::shared_ptr<MemTracker> 
MemTrackerTaskPool::get_task_mem_tracker(const std::
 void MemTrackerTaskPool::logout_task_mem_tracker() {
     std::vector<std::string> expired_tasks;
     for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); 
it++) {
-        // No RuntimeState uses this task MemTracker, it is only referenced by 
this map, delete it
-        if (it->second.use_count() == 1) {
+        if (!it->second) {
+            // when parallel querying, after phmap _task_mem_trackers.erase,
+            // there have been cases where the key still exists in 
_task_mem_trackers.
+            // https://github.com/apache/incubator-doris/issues/10006
+            expired_tasks.emplace_back(it->first);
+        } else if (it->second.use_count() == 1) {
+            // No RuntimeState uses this task MemTracker, it is only 
referenced by this map, delete it
             if (config::memory_leak_detection && it->second->consumption() != 
0) {
                 // If consumption is not equal to 0 before query mem tracker 
is destructed,
                 // there are two possibilities in theory.
@@ -86,6 +92,14 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
             it->second->parent()->consume_local(-it->second->consumption(),
                                                 
MemTracker::get_process_tracker().get());
             expired_tasks.emplace_back(it->first);
+        } else {
+            // Log limit exceeded query tracker.
+            if (it->second->limit_exceeded()) {
+                it->second->mem_limit_exceeded(
+                        nullptr,
+                        fmt::format("Task mem limit exceeded but no cancel, 
queryId:{}", it->first),
+                        0, Status::OK());
+            }
         }
     }
     for (auto tid : expired_tasks) {
@@ -93,9 +107,13 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
         // there are still task mem trackers that are get or register.
         // The only known case: after an load task ends all fragments on a 
BE,`tablet_writer_open` is still
         // called to create a channel, and the load task tracker will be 
re-registered in the channel open.
-        if (_task_mem_trackers[tid].use_count() == 1) {
+        // https://github.com/apache/incubator-doris/issues/9905
+        if (!_task_mem_trackers[tid]) {
+            _task_mem_trackers.erase(tid);
+            VLOG_FILE << "Deregister null task mem tracker, task id: " << tid;
+        } else if (_task_mem_trackers[tid].use_count() == 1) {
             _task_mem_trackers.erase(tid);
-            VLOG_FILE << "Deregister task memory tracker, task id: " << tid;
+            VLOG_FILE << "Deregister not used task mem tracker, task id: " << 
tid;
         }
     }
 }
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index b08642bf12..2139469393 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -136,12 +136,14 @@ 
SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() {
 #endif // USE_MEM_TRACKER
 }
 
-SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(
-        const std::string& action_type, bool cancel_work, ERRCALLBACK 
err_call_back_func) {
+SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(const 
std::string& action_type,
+                                                                     bool 
cancel_work,
+                                                                     
ERRCALLBACK err_call_back_func,
+                                                                     bool 
log_limit_exceeded) {
 #ifdef USE_MEM_TRACKER
     DCHECK(action_type != std::string());
     _old_tracker_cb = 
tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(
-            action_type, cancel_work, err_call_back_func);
+            action_type, cancel_work, err_call_back_func, log_limit_exceeded);
 #endif
 }
 
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 0572c2b08d..30952692d6 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -300,7 +300,8 @@ class SwitchThreadMemTrackerErrCallBack {
 public:
     explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type,
                                                bool cancel_work = true,
-                                               ERRCALLBACK err_call_back_func 
= nullptr);
+                                               ERRCALLBACK err_call_back_func 
= nullptr,
+                                               bool log_limit_exceeded = true);
 
     ~SwitchThreadMemTrackerErrCallBack();
 
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/thread_mem_tracker_mgr.cpp
index e9768bf86b..d3715b58df 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -60,25 +60,24 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const 
std::string& cancel_details
         ExecEnv::GetInstance()->fragment_mgr()->cancel(
                 _fragment_instance_id, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
                 cancel_details);
-        _fragment_instance_id = TUniqueId(); // Make sure it will only be 
canceled once
     }
 }
 
 void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) {
-    auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
-            nullptr, fmt::format("In TCMalloc Hook, {}", 
_consume_err_cb.cancel_msg), mem_usage,
-            st);
     if (_consume_err_cb.cb_func != nullptr) {
         _consume_err_cb.cb_func();
     }
     if (is_attach_task()) {
-        if (_consume_err_cb.cancel_task == true) {
+        if (_consume_err_cb.cancel_task) {
+            auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
+                    nullptr,
+                    fmt::format("Task mem limit exceeded and cancel it, 
msg:{}",
+                                _consume_err_cb.cancel_msg),
+                    mem_usage, st);
             exceeded_cancel_task(rst.to_string());
-        } else {
-            // TODO(zxy) Need other processing, or log (not too often).
+            _consume_err_cb.cancel_task = false; // Make sure it will only be 
canceled once
+            _consume_err_cb.log_limit_exceeded = false;
         }
-    } else {
-        // TODO(zxy) Need other processing, or log (not too often).
     }
 }
 } // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h 
b/be/src/runtime/thread_mem_tracker_mgr.h
index 754e231747..b476715612 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -28,18 +28,24 @@ typedef void (*ERRCALLBACK)();
 
 struct ConsumeErrCallBackInfo {
     std::string cancel_msg;
-    bool cancel_task; // Whether to cancel the task when the current tracker 
exceeds the limit
+    bool cancel_task; // Whether to cancel the task when the current tracker 
exceeds the limit.
     ERRCALLBACK cb_func;
+    bool log_limit_exceeded; // Whether to print log_usage of mem tracker when 
mem limit exceeded.
 
     ConsumeErrCallBackInfo() { init(); }
 
-    ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task, 
ERRCALLBACK cb_func)
-            : cancel_msg(cancel_msg), cancel_task(cancel_task), 
cb_func(cb_func) {}
+    ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task, 
ERRCALLBACK cb_func,
+                           bool log_limit_exceeded)
+            : cancel_msg(cancel_msg),
+              cancel_task(cancel_task),
+              cb_func(cb_func),
+              log_limit_exceeded(log_limit_exceeded) {}
 
     void init() {
         cancel_msg = "";
-        cancel_task = false;
+        cancel_task = true;
         cb_func = nullptr;
+        log_limit_exceeded = true;
     }
 };
 
@@ -94,11 +100,12 @@ public:
     void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
 
     ConsumeErrCallBackInfo update_consume_err_cb(const std::string& 
cancel_msg, bool cancel_task,
-                                                 ERRCALLBACK cb_func) {
+                                                 ERRCALLBACK cb_func, bool 
log_limit_exceeded) {
         _temp_consume_err_cb = _consume_err_cb;
         _consume_err_cb.cancel_msg = cancel_msg;
         _consume_err_cb.cancel_task = cancel_task;
         _consume_err_cb.cb_func = cb_func;
+        _consume_err_cb.log_limit_exceeded = log_limit_exceeded;
         return _temp_consume_err_cb;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to