This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 85362a907e [fix](mem tracker) Fix some memory leaks, inaccurate statistics, core dump, deadlock bugs (#10072) 85362a907e is described below commit 85362a907e2c55f0a856daaaa34dd9e0d461d736 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Jun 14 21:38:37 2022 +0800 [fix](mem tracker) Fix some memory leaks, inaccurate statistics, core dump, deadlock bugs (#10072) 1. Fix the memory leak. When the load task is canceled, the `IndexChannel` and `NodeChannel` mem trackers cannot be destructed in time. 2. Fix Load task being frequently canceled by oom and inaccurate `LoadChannel` mem tracker limit, and rewrite the variable name of `mem limit` in `LoadChannel`. 3. Fix core dump, when logout task mem tracker, phmap erase fails, resulting in repeated logout of the same tracker. 4. Fix the deadlock, when add_child_tracker mem limit exceeds, calling log_usage causes `_child_trackers_lock` deadlock. 5. Fix frequent log printing when thread mem tracker limit exceeds, which will affect readability and performance. 6. Optimize some details of mem tracker display. --- be/src/exec/tablet_sink.cpp | 1 - be/src/exec/tablet_sink.h | 1 - be/src/gutil/strings/numbers.cc | 2 +- be/src/olap/task/engine_alter_tablet_task.cpp | 2 +- be/src/olap/task/engine_batch_load_task.cpp | 2 +- be/src/olap/task/engine_checksum_task.cpp | 2 +- be/src/olap/task/engine_clone_task.cpp | 2 +- .../olap/task/engine_storage_migration_task_v2.cpp | 2 +- be/src/runtime/load_channel.cpp | 9 +++-- be/src/runtime/load_channel.h | 5 ++- be/src/runtime/load_channel_mgr.cpp | 44 +++++++++++----------- be/src/runtime/load_channel_mgr.h | 7 ++-- be/src/runtime/mem_tracker.cpp | 6 ++- be/src/runtime/mem_tracker.h | 3 +- be/src/runtime/mem_tracker_task_pool.cpp | 30 ++++++++++++--- be/src/runtime/thread_context.cpp | 8 ++-- be/src/runtime/thread_context.h | 3 +- be/src/runtime/thread_mem_tracker_mgr.cpp | 17 ++++----- be/src/runtime/thread_mem_tracker_mgr.h | 17 ++++++--- 19 files changed, 98 insertions(+), 65 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index e1d60f694c..c15f303200 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -558,7 +558,6 @@ Status NodeChannel::none_of(std::initializer_list<bool> vars) { } void NodeChannel::clear_all_batches() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker); std::lock_guard<std::mutex> lg(_pending_batches_lock); std::queue<AddBatchReq> empty; std::swap(_pending_batches, empty); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 8b3186e0c1..d797a1b80c 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -330,7 +330,6 @@ public: void for_each_node_channel( const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); for (auto& it : _node_channels) { func(it.second); } diff --git a/be/src/gutil/strings/numbers.cc b/be/src/gutil/strings/numbers.cc index 46be289fb8..24c993b86a 100644 --- a/be/src/gutil/strings/numbers.cc +++ b/be/src/gutil/strings/numbers.cc @@ -1488,7 +1488,7 @@ string AccurateItoaKMGT(int64 i) { i = -i; } - string ret = std::to_string(i) + " = " + StringPrintf("%s", sign); + string ret = StringPrintf("%s", sign) + std::to_string(i) + " = " + StringPrintf("%s", sign); int64 val; if ((val = (i >> 40)) > 1) { ret += StringPrintf("%" PRId64 diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 7686a632ae..24496822d3 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { _mem_tracker = MemTracker::create_tracker( config::memory_limitation_per_thread_for_schema_change_bytes, - fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}", + fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", std::to_string(_alter_tablet_req.base_tablet_id), std::to_string(_alter_tablet_req.new_tablet_id)), StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 9c507d362d..7add215823 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -55,7 +55,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet _download_status = Status::OK(); _mem_tracker = MemTracker::create_tracker( -1, - fmt::format("EngineBatchLoadTask:pushType={}:tabletId={}", _push_req.push_type, + fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type, std::to_string(_push_req.tablet_id)), StorageEngine::instance()->batch_load_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 30ef01bc7e..37efd52e1d 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -27,7 +27,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { _mem_tracker = MemTracker::create_tracker( - -1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id), + -1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id), StorageEngine::instance()->consistency_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 5fcdd86e1f..04a7df7199 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& _signature(signature), _master_info(master_info) { _mem_tracker = MemTracker::create_tracker( - -1, "EngineCloneTask:tabletId=" + std::to_string(_clone_req.tablet_id), + -1, "EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id), StorageEngine::instance()->clone_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp b/be/src/olap/task/engine_storage_migration_task_v2.cpp index 5e865c9849..fe00536662 100644 --- a/be/src/olap/task/engine_storage_migration_task_v2.cpp +++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp @@ -28,7 +28,7 @@ EngineStorageMigrationTaskV2::EngineStorageMigrationTaskV2(const TStorageMigrati : _storage_migration_req(request) { _mem_tracker = MemTracker::create_tracker( config::memory_limitation_per_thread_for_storage_migration_bytes, - fmt::format("EngineStorageMigrationTaskV2: {}-{}", + fmt::format("EngineStorageMigrationTaskV2#baseTabletId{}:newTabletId{}", std::to_string(_storage_migration_req.base_tablet_id), std::to_string(_storage_migration_req.new_tablet_id)), StorageEngine::instance()->storage_migration_mem_tracker(), MemTrackerLevel::TASK); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 1c270a3473..42fbe9dac5 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -25,17 +25,18 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip, bool is_vec) +LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit, + int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, + bool is_vec) : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority), _sender_ip(sender_ip), _is_vec(is_vec) { _mem_tracker = MemTracker::create_tracker( - mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(), + channel_mem_limit, "LoadChannel#senderIp=" + sender_ip, ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker( - _load_id.to_string(), mem_limit), + _load_id.to_string(), load_mem_limit), MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 644e546524..38cc2ac89f 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -39,8 +39,9 @@ class Cache; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip, bool is_vec); + LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit, + int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, + bool is_vec); ~LoadChannel(); // open a new load channel if not exist diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 2259eb2403..aea5479aa6 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -42,22 +42,22 @@ static int64_t calc_process_max_load_memory(int64_t process_mem_limit) { return std::min<int64_t>(max_load_memory_bytes, config::load_process_max_memory_limit_bytes); } -// Calculate the memory limit for a single load job. -static int64_t calc_job_max_load_memory(int64_t mem_limit_in_req, int64_t total_mem_limit) { +// Calculate the memory limit for a single load channel. +static int64_t calc_channel_max_load_memory(int64_t load_mem_limit, int64_t total_mem_limit) { // default mem limit is used to be compatible with old request. // new request should be set load_mem_limit. - constexpr int64_t default_load_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB - int64_t load_mem_limit = default_load_mem_limit; - if (mem_limit_in_req != -1) { + constexpr int64_t default_channel_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB + int64_t channel_mem_limit = default_channel_mem_limit; + if (load_mem_limit != -1) { // mem-limit of a certain load should between config::write_buffer_size // and total-memory-limit - load_mem_limit = std::max<int64_t>(mem_limit_in_req, config::write_buffer_size); - load_mem_limit = std::min<int64_t>(load_mem_limit, total_mem_limit); + channel_mem_limit = std::max<int64_t>(load_mem_limit, config::write_buffer_size); + channel_mem_limit = std::min<int64_t>(channel_mem_limit, total_mem_limit); } - return load_mem_limit; + return channel_mem_limit; } -static int64_t calc_job_timeout_s(int64_t timeout_in_req_s) { +static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) { int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; if (timeout_in_req_s > 0) { load_channel_timeout_s = std::max<int64_t>(load_channel_timeout_s, timeout_in_req_s); @@ -83,8 +83,8 @@ LoadChannelMgr::~LoadChannelMgr() { } Status LoadChannelMgr::init(int64_t process_mem_limit) { - int64_t load_mem_limit = calc_process_max_load_memory(process_mem_limit); - _mem_tracker = MemTracker::create_tracker(load_mem_limit, "LoadChannelMgr", + int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit); + _mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit, "LoadChannelMgr", MemTracker::get_process_tracker(), MemTrackerLevel::OVERVIEW); SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); @@ -95,10 +95,12 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) { return Status::OK(); } -LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t mem_limit, - int64_t timeout_s, bool is_high_priority, +LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t load_mem_limit, + int64_t channel_mem_limit, int64_t timeout_s, + bool is_high_priority, const std::string& sender_ip, bool is_vec) { - return new LoadChannel(load_id, mem_limit, timeout_s, is_high_priority, sender_ip, is_vec); + return new LoadChannel(load_id, load_mem_limit, channel_mem_limit, timeout_s, is_high_priority, + sender_ip, is_vec); } Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { @@ -112,18 +114,18 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { channel = it->second; } else { // create a new load channel - int64_t mem_limit_in_req = params.has_load_mem_limit() ? params.load_mem_limit() : -1; - int64_t job_max_memory = - calc_job_max_load_memory(mem_limit_in_req, _mem_tracker->limit()); + int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1; + int64_t channel_mem_limit = + calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit()); int64_t timeout_in_req_s = params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1; - int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s); + int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s); bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); - channel.reset(_create_load_channel(load_id, job_max_memory, job_timeout_s, - is_high_priority, params.sender_ip(), - params.is_vectorized())); + channel.reset(_create_load_channel(load_id, load_mem_limit, channel_mem_limit, + channel_timeout_s, is_high_priority, + params.sender_ip(), params.is_vectorized())); _load_channels.insert({load_id, channel}); } } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 7e1c4450f0..65d72534f4 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -61,9 +61,10 @@ public: std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; } private: - static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, - int64_t timeout_s, bool is_high_priority, - const std::string& sender_ip, bool is_vec); + static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t load_mem_limit, + int64_t channel_mem_limit, int64_t timeout_s, + bool is_high_priority, const std::string& sender_ip, + bool is_vec); template <typename Request> Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index 359b0f5368..824c8b8530 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -124,14 +124,17 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker_impl( std::string reset_label; MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker(); if (task_parent_tracker) { - reset_label = fmt::format("{}:{}", label, split(task_parent_tracker->label(), ":")[1]); + reset_label = fmt::format("{}#{}", label, split(task_parent_tracker->label(), "#")[1]); } else { reset_label = label; } + if (byte_limit == -1) byte_limit = reset_parent->limit(); std::shared_ptr<MemTracker> tracker( new MemTracker(byte_limit, reset_label, reset_parent, level > reset_parent->_level ? level : reset_parent->_level, profile)); + // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called. + STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER(); reset_parent->add_child_tracker(tracker); return tracker; } @@ -285,6 +288,7 @@ std::string MemTracker::log_usage(int max_recursive_depth, Status MemTracker::mem_limit_exceeded(RuntimeState* state, const std::string& details, int64_t failed_allocation_size, Status failed_alloc) { + STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER(); MemTracker* process_tracker = MemTracker::get_raw_process_tracker(); std::string detail = "Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process " diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index c21d6d3db5..85a6550f7a 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -401,8 +401,7 @@ public: /// 'failed_allocation_size' is zero, nothing about the allocation size is logged. /// If 'state' is non-nullptr, logs the error to 'state'. Status mem_limit_exceeded(RuntimeState* state, const std::string& details = std::string(), - int64_t failed_allocation = -1, - Status failed_alloc = Status::OK()) WARN_UNUSED_RESULT; + int64_t failed_allocation = -1, Status failed_alloc = Status::OK()); // Usually, a negative values means that the statistics are not accurate, // 1. The released memory is not consumed. diff --git a/be/src/runtime/mem_tracker_task_pool.cpp b/be/src/runtime/mem_tracker_task_pool.cpp index 551d904111..84f1a951e0 100644 --- a/be/src/runtime/mem_tracker_task_pool.cpp +++ b/be/src/runtime/mem_tracker_task_pool.cpp @@ -41,16 +41,17 @@ std::shared_ptr<MemTracker> MemTrackerTaskPool::register_query_mem_tracker( VLOG_FILE << "Register Query memory tracker, query id: " << query_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); return register_task_mem_tracker_impl(query_id, mem_limit, - fmt::format("Query:queryId={}", query_id), + fmt::format("Query#queryId={}", query_id), ExecEnv::GetInstance()->query_pool_mem_tracker()); } std::shared_ptr<MemTracker> MemTrackerTaskPool::register_load_mem_tracker( const std::string& load_id, int64_t mem_limit) { + // In load, the query id of the fragment is executed, which is the same as the load id of the load channel. VLOG_FILE << "Register Load memory tracker, load id: " << load_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); return register_task_mem_tracker_impl(load_id, mem_limit, - fmt::format("Load:loadId={}", load_id), + fmt::format("Load#loadId={}", load_id), ExecEnv::GetInstance()->load_pool_mem_tracker()); } @@ -66,8 +67,13 @@ std::shared_ptr<MemTracker> MemTrackerTaskPool::get_task_mem_tracker(const std:: void MemTrackerTaskPool::logout_task_mem_tracker() { std::vector<std::string> expired_tasks; for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) { - // No RuntimeState uses this task MemTracker, it is only referenced by this map, delete it - if (it->second.use_count() == 1) { + if (!it->second) { + // when parallel querying, after phmap _task_mem_trackers.erase, + // there have been cases where the key still exists in _task_mem_trackers. + // https://github.com/apache/incubator-doris/issues/10006 + expired_tasks.emplace_back(it->first); + } else if (it->second.use_count() == 1) { + // No RuntimeState uses this task MemTracker, it is only referenced by this map, delete it if (config::memory_leak_detection && it->second->consumption() != 0) { // If consumption is not equal to 0 before query mem tracker is destructed, // there are two possibilities in theory. @@ -86,6 +92,14 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { it->second->parent()->consume_local(-it->second->consumption(), MemTracker::get_process_tracker().get()); expired_tasks.emplace_back(it->first); + } else { + // Log limit exceeded query tracker. + if (it->second->limit_exceeded()) { + it->second->mem_limit_exceeded( + nullptr, + fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first), + 0, Status::OK()); + } } } for (auto tid : expired_tasks) { @@ -93,9 +107,13 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // there are still task mem trackers that are get or register. // The only known case: after an load task ends all fragments on a BE,`tablet_writer_open` is still // called to create a channel, and the load task tracker will be re-registered in the channel open. - if (_task_mem_trackers[tid].use_count() == 1) { + // https://github.com/apache/incubator-doris/issues/9905 + if (!_task_mem_trackers[tid]) { + _task_mem_trackers.erase(tid); + VLOG_FILE << "Deregister null task mem tracker, task id: " << tid; + } else if (_task_mem_trackers[tid].use_count() == 1) { _task_mem_trackers.erase(tid); - VLOG_FILE << "Deregister task memory tracker, task id: " << tid; + VLOG_FILE << "Deregister not used task mem tracker, task id: " << tid; } } } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index b08642bf12..2139469393 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -136,12 +136,14 @@ SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() { #endif // USE_MEM_TRACKER } -SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( - const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) { +SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(const std::string& action_type, + bool cancel_work, + ERRCALLBACK err_call_back_func, + bool log_limit_exceeded) { #ifdef USE_MEM_TRACKER DCHECK(action_type != std::string()); _old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb( - action_type, cancel_work, err_call_back_func); + action_type, cancel_work, err_call_back_func, log_limit_exceeded); #endif } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 0572c2b08d..30952692d6 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -300,7 +300,8 @@ class SwitchThreadMemTrackerErrCallBack { public: explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type, bool cancel_work = true, - ERRCALLBACK err_call_back_func = nullptr); + ERRCALLBACK err_call_back_func = nullptr, + bool log_limit_exceeded = true); ~SwitchThreadMemTrackerErrCallBack(); diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index e9768bf86b..d3715b58df 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -60,25 +60,24 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details ExecEnv::GetInstance()->fragment_mgr()->cancel( _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_details); - _fragment_instance_id = TUniqueId(); // Make sure it will only be canceled once } } void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) { - auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded( - nullptr, fmt::format("In TCMalloc Hook, {}", _consume_err_cb.cancel_msg), mem_usage, - st); if (_consume_err_cb.cb_func != nullptr) { _consume_err_cb.cb_func(); } if (is_attach_task()) { - if (_consume_err_cb.cancel_task == true) { + if (_consume_err_cb.cancel_task) { + auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded( + nullptr, + fmt::format("Task mem limit exceeded and cancel it, msg:{}", + _consume_err_cb.cancel_msg), + mem_usage, st); exceeded_cancel_task(rst.to_string()); - } else { - // TODO(zxy) Need other processing, or log (not too often). + _consume_err_cb.cancel_task = false; // Make sure it will only be canceled once + _consume_err_cb.log_limit_exceeded = false; } - } else { - // TODO(zxy) Need other processing, or log (not too often). } } } // namespace doris diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 754e231747..b476715612 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -28,18 +28,24 @@ typedef void (*ERRCALLBACK)(); struct ConsumeErrCallBackInfo { std::string cancel_msg; - bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit + bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit. ERRCALLBACK cb_func; + bool log_limit_exceeded; // Whether to print log_usage of mem tracker when mem limit exceeded. ConsumeErrCallBackInfo() { init(); } - ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task, ERRCALLBACK cb_func) - : cancel_msg(cancel_msg), cancel_task(cancel_task), cb_func(cb_func) {} + ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task, ERRCALLBACK cb_func, + bool log_limit_exceeded) + : cancel_msg(cancel_msg), + cancel_task(cancel_task), + cb_func(cb_func), + log_limit_exceeded(log_limit_exceeded) {} void init() { cancel_msg = ""; - cancel_task = false; + cancel_task = true; cb_func = nullptr; + log_limit_exceeded = true; } }; @@ -94,11 +100,12 @@ public: void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker); ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, bool cancel_task, - ERRCALLBACK cb_func) { + ERRCALLBACK cb_func, bool log_limit_exceeded) { _temp_consume_err_cb = _consume_err_cb; _consume_err_cb.cancel_msg = cancel_msg; _consume_err_cb.cancel_task = cancel_task; _consume_err_cb.cb_func = cb_func; + _consume_err_cb.log_limit_exceeded = log_limit_exceeded; return _temp_consume_err_cb; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org