This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eb25df5a2c [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409) eb25df5a2c is described below commit eb25df5a2cb7de6e541fd4b0ed8c43ca78cd97a4 Author: Kidd <107781942+k-i-...@users.noreply.github.com> AuthorDate: Sat Jun 25 14:13:02 2022 +0800 [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409) * fix load tracker * fix comment --- be/src/runtime/load_channel.cpp | 10 ++----- be/src/runtime/load_channel.h | 6 ++--- be/src/runtime/load_channel_mgr.cpp | 46 +++++++++++++++++---------------- be/src/runtime/load_channel_mgr.h | 8 ++---- be/src/runtime/tablets_channel.cpp | 2 -- be/src/runtime/tcmalloc_hook.h | 5 ++++ be/src/runtime/thread_context.h | 4 +++ be/src/runtime/thread_mem_tracker_mgr.h | 17 +++++++----- be/src/service/internal_service.cpp | 8 +++--- 9 files changed, 53 insertions(+), 53 deletions(-) diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 42fbe9dac5..d2223554eb 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -25,19 +25,15 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit, +LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker, int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_vec) : _load_id(load_id), + _mem_tracker(mem_tracker), _timeout_s(timeout_s), _is_high_priority(is_high_priority), _sender_ip(sender_ip), _is_vec(is_vec) { - _mem_tracker = MemTracker::create_tracker( - channel_mem_limit, "LoadChannel#senderIp=" + sender_ip, - ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker( - _load_id.to_string(), load_mem_limit), - MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. @@ -52,7 +48,6 @@ LoadChannel::~LoadChannel() { } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = params.index_id(); std::shared_ptr<TabletsChannel> channel; { @@ -138,7 +133,6 @@ bool LoadChannel::is_finished() { } Status LoadChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard<std::mutex> l(_lock); for (auto& it : _tablets_channels) { it.second->cancel(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 38cc2ac89f..20ef476dd9 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -39,9 +39,8 @@ class Cache; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit, - int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, - bool is_vec); + LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker, + int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_ve); ~LoadChannel(); // open a new load channel if not exist @@ -129,7 +128,6 @@ private: template <typename TabletWriterAddRequest, typename TabletWriterAddResult> Status LoadChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr<TabletsChannel> channel; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index aea5479aa6..c105bd96b3 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -84,10 +84,9 @@ LoadChannelMgr::~LoadChannelMgr() { Status LoadChannelMgr::init(int64_t process_mem_limit) { int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit); - _mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit, "LoadChannelMgr", - MemTracker::get_process_tracker(), - MemTrackerLevel::OVERVIEW); - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + _mem_tracker = MemTracker::create_virtual_tracker(load_mgr_mem_limit, "LoadChannelMgr", + MemTracker::get_process_tracker(), + MemTrackerLevel::OVERVIEW); REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024); @@ -95,16 +94,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) { return Status::OK(); } -LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t load_mem_limit, - int64_t channel_mem_limit, int64_t timeout_s, - bool is_high_priority, - const std::string& sender_ip, bool is_vec) { - return new LoadChannel(load_id, load_mem_limit, channel_mem_limit, timeout_s, is_high_priority, - sender_ip, is_vec); -} - Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr<LoadChannel> channel; { @@ -114,18 +104,31 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { channel = it->second; } else { // create a new load channel - int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1; - int64_t channel_mem_limit = - calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit()); - int64_t timeout_in_req_s = params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1; int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s); - bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); - channel.reset(_create_load_channel(load_id, load_mem_limit, channel_mem_limit, - channel_timeout_s, is_high_priority, - params.sender_ip(), params.is_vectorized())); + + int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1; + int64_t channel_mem_limit = + calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit()); + auto channel_mem_tracker = + MemTracker::create_tracker(channel_mem_limit, + fmt::format("LoadChannel#senderIp={}#loadID={}", + params.sender_ip(), load_id.to_string()), + _mem_tracker); + // TODO + // auto channel_mem_tracker_job = std::make_shared<MemTracker>( + // -1, + // fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(), + // load_id.to_string()), + // ExecEnv::GetInstance() + // ->task_pool_mem_tracker_registry() + // ->register_load_mem_tracker(load_id.to_string(), load_mem_limit), + // MemTrackerLevel::TASK); + channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s, + is_high_priority, params.sender_ip(), + params.is_vectorized())); _load_channels.insert({load_id, channel}); } } @@ -181,7 +184,6 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { } Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr<LoadChannel> cancelled_channel; { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 65d72534f4..39d7ed5b2b 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -61,11 +61,6 @@ public: std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; } private: - static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t load_mem_limit, - int64_t channel_mem_limit, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip, - bool is_vec); - template <typename Request> Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, const UniqueId& load_id, const Request& request); @@ -84,7 +79,8 @@ protected: std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels; Cache* _last_success_channel = nullptr; - // check the total load mem consumption of this Backend + // check the total load channel mem consumption of this Backend + // TODO no used, refactor soon std::shared_ptr<MemTracker> _mem_tracker; CountDownLatch _stop_background_threads_latch; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 8010956cc4..566d90fb7f 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -53,7 +53,6 @@ TabletsChannel::~TabletsChannel() { } Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard<std::mutex> l(_lock); if (_state == kOpened) { // Normal case, already open by other sender @@ -253,7 +252,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request } Status TabletsChannel::cancel() { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard<std::mutex> l(_lock); if (_state == kFinished) { return _close_status; diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 59208b334a..af60cdc178 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -21,6 +21,7 @@ #include <gperftools/nallocx.h> #include <gperftools/tcmalloc.h> +#include "runtime/mem_tracker.h" #include "runtime/thread_context.h" // Notice: modify the command in New/Delete Hook should be careful enough!, @@ -38,12 +39,16 @@ void new_hook(const void* ptr, size_t size) { if (doris::tls_ctx()) { doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); + } else if (doris::ExecEnv::GetInstance()->initialized()) { + doris::MemTracker::get_process_tracker()->consume(tc_nallocx(size, 0)); } } void delete_hook(const void* ptr) { if (doris::tls_ctx()) { doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr))); + } else if (doris::ExecEnv::GetInstance()->initialized()) { + doris::MemTracker::get_process_tracker()->release(tc_malloc_size(const_cast<void*>(ptr))); } } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 7b58315b09..33478b19c5 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -157,12 +157,16 @@ public: void consume_mem(int64_t size) { if (start_thread_mem_tracker) { _thread_mem_tracker_mgr->cache_consume(size); + } else { + MemTracker::get_process_tracker()->consume(size); } } void release_mem(int64_t size) { if (start_thread_mem_tracker) { _thread_mem_tracker_mgr->cache_consume(-size); + } else { + MemTracker::get_process_tracker()->release(size); } } diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index d582c3a46a..d8042ac0fa 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -177,6 +177,7 @@ private: phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels; // If true, call memtracker try_consume, otherwise call consume. bool _check_limit; + bool _stop_consume = false; int64_t _tracker_id; // Avoid memory allocation in functions. @@ -256,25 +257,27 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { // When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes` // and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(), // it will cause tracker->consumption to be temporarily less than 0. - if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes || - _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) { + // + // Temporary memory may be allocated during the consumption of the mem tracker (in the processing logic of + // the exceeded limit), which will lead to entering the TCMalloc Hook again, so suspend consumption to avoid + // falling into an infinite loop. + if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || + _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && + !_stop_consume) { + _stop_consume = true; DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string(); // When switching to the current tracker last time, the remaining untracked memory. if (_untracked_mems[_tracker_id] != 0) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, - // will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume - // and mem_limit_exceeded will directly call consume. if (_check_limit) { - _check_limit = false; noncache_try_consume(_untracked_mem); - _check_limit = true; } else { mem_tracker()->consume(_untracked_mem); } _untracked_mem = 0; + _stop_consume = false; } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e85bdfebfc..47e156a0bf 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -206,6 +206,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); @@ -216,6 +217,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure<PTabletWriterAddBlockRequest>(request_raw, done); @@ -243,8 +245,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, - _exec_env->load_channel_mgr()->mem_tracker()); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { @@ -264,12 +264,14 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); _tablet_writer_add_batch(cntl_base, request, response, done); } void PInternalServiceImpl::tablet_writer_add_batch_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done); @@ -300,8 +302,6 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, - _exec_env->load_channel_mgr()->mem_tracker()); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org