morningman commented on code in PR #9145: URL: https://github.com/apache/incubator-doris/pull/9145#discussion_r857056461
########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -227,21 +253,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(); + noncache_consume(_untracked_mem); + _untracked_mem = 0; start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - Status st = mem_tracker()->try_consume(_untracked_mem); +inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { + Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - mem_tracker()->consume(_untracked_mem); - exceeded(_untracked_mem, st); + mem_tracker()->consume(size); + exceeded(size, st); } - _untracked_mem = 0; +} + +inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { + DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string(); + if (_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) { + _mem_trackers[mem_tracker->id()] = mem_tracker; + DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); Review Comment: Why check `_mem_trackers` again right left it has been inserted? ########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -227,21 +253,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(); + noncache_consume(_untracked_mem); + _untracked_mem = 0; start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - Status st = mem_tracker()->try_consume(_untracked_mem); +inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { + Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - mem_tracker()->consume(_untracked_mem); - exceeded(_untracked_mem, st); + mem_tracker()->consume(size); + exceeded(size, st); } - _untracked_mem = 0; +} + +inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { + DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string(); + if (_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) { Review Comment: If you add a `DCHECK` before, than I don't think we need to call `find` again. Just insert the mem tracker directly into the `_mem_trackers`. ########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -227,21 +253,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(); + noncache_consume(_untracked_mem); + _untracked_mem = 0; start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - Status st = mem_tracker()->try_consume(_untracked_mem); +inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { + Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - mem_tracker()->consume(_untracked_mem); - exceeded(_untracked_mem, st); + mem_tracker()->consume(size); + exceeded(size, st); } - _untracked_mem = 0; +} + +inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { + DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string(); + if (_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) { + _mem_trackers[mem_tracker->id()] = mem_tracker; + DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); + _untracked_mems[mem_tracker->id()] = 0; + _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); Review Comment: What is `_temp_tracker_id` mean? ########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -227,21 +253,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(); + noncache_consume(_untracked_mem); + _untracked_mem = 0; start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - Status st = mem_tracker()->try_consume(_untracked_mem); +inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { + Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - mem_tracker()->consume(_untracked_mem); - exceeded(_untracked_mem, st); + mem_tracker()->consume(size); + exceeded(size, st); } - _untracked_mem = 0; +} + +inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { + DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string(); + if (_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) { + _mem_trackers[mem_tracker->id()] = mem_tracker; + DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); + _untracked_mems[mem_tracker->id()] = 0; + _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); + } +} + +inline std::shared_ptr<MemTracker> ThreadMemTrackerMgr::mem_tracker() { + DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string(); + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); Review Comment: These 2 `DCHECK` looks same? ########## be/src/runtime/mem_tracker.cpp: ########## @@ -60,6 +60,21 @@ MemTracker* MemTracker::get_raw_process_tracker() { return raw_process_tracker; } +// Track memory for all brpc server responses. +static std::shared_ptr<MemTracker> brpc_server_tracker; +static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT; + +void MemTracker::create_brpc_server_tracker() { + brpc_server_tracker.reset(new MemTracker(-1, "Brpc", get_process_tracker(), MemTrackerLevel::OVERVIEW, nullptr)); Review Comment: I think we should use `MemTracker::create_tracker`? And no need to call `add_child_tracker` and `init` manually. ########## be/src/olap/byte_buffer.cpp: ########## @@ -128,6 +132,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset size_t length = handler->length(); int fd = handler->fd(); + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset); if (MAP_FAILED == memory) { Review Comment: Call `RELEASE_THREAD_LOCAL_MEM_TRACKER`? ########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -175,39 +170,70 @@ class ThreadMemTrackerMgr { ConsumeErrCallBackInfo _consume_err_cb; }; +inline void ThreadMemTrackerMgr::init() { + _tracker_id = 0; + _mem_trackers.clear(); + _mem_trackers[0] = MemTracker::get_process_tracker(); + _untracked_mems.clear(); + _untracked_mems[0] = 0; + _mem_tracker_labels.clear(); + _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); +} + +inline void ThreadMemTrackerMgr::init_bthread() { + init(); + _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); + _untracked_mems[1] = 0; Review Comment: Is it safe to use 0 and 1 as the tracker id for process tracker and brpc tracker? ########## be/src/exec/exchange_node.cpp: ########## @@ -80,6 +80,7 @@ Status ExchangeNode::prepare(RuntimeState* state) { Status ExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); Review Comment: Why moving to here? ########## be/src/olap/byte_buffer.cpp: ########## @@ -93,6 +96,7 @@ StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* refere StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, int fd, uint64_t offset) { + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); char* memory = (char*)::mmap(start, length, prot, flags, fd, offset); if (MAP_FAILED == memory) { Review Comment: If mmap failed, should we call `RELEASE_THREAD_LOCAL_MEM_TRACKER`? ########## be/src/runtime/bufferpool/system_allocator.cc: ########## @@ -75,6 +76,7 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { // Map an extra huge page so we can fix up the alignment if needed. map_len += HUGE_PAGE_SIZE; } + CONSUME_THREAD_LOCAL_MEM_TRACKER(map_len); uint8_t* mem = reinterpret_cast<uint8_t*>( mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0)); if (mem == MAP_FAILED) { Review Comment: call `RELEASE_THREAD_LOCAL_MEM_TRACKER`? ########## be/src/util/doris_metrics.h: ########## @@ -128,6 +128,8 @@ class DorisMetrics { IntCounter* attach_task_thread_count; IntCounter* switch_thread_mem_tracker_count; IntCounter* switch_thread_mem_tracker_err_cb_count; + // brpc server response count + IntCounter* switch_pthread_count; Review Comment: Should it `switch_bthread_count`? ########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -127,17 +107,32 @@ class ThreadMemTrackerMgr { // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void cache_consume(int64_t size); - void noncache_consume(); + void noncache_consume(int64_t size); bool is_attach_task() { return _task_id != ""; } - std::shared_ptr<MemTracker> mem_tracker() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - if (_mem_trackers[_tracker_id]) { - return _mem_trackers[_tracker_id]; - } else { - return MemTracker::get_process_tracker(); + std::shared_ptr<MemTracker> mem_tracker(); + + int64_t switch_count = 0; + + std::string print_debug_string() { + std::stringstream mem_trackers_str; Review Comment: use `fmt` ########## be/src/runtime/thread_mem_tracker_mgr.h: ########## @@ -175,39 +170,70 @@ class ThreadMemTrackerMgr { ConsumeErrCallBackInfo _consume_err_cb; }; +inline void ThreadMemTrackerMgr::init() { + _tracker_id = 0; + _mem_trackers.clear(); + _mem_trackers[0] = MemTracker::get_process_tracker(); + _untracked_mems.clear(); + _untracked_mems[0] = 0; + _mem_tracker_labels.clear(); + _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); +} + +inline void ThreadMemTrackerMgr::init_bthread() { + init(); + _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); + _untracked_mems[1] = 0; + _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label(); + _tracker_id = 1; +} + +inline void ThreadMemTrackerMgr::clear_untracked_mems() { + for (const auto& untracked_mem : _untracked_mems) { + if (untracked_mem.second != 0) { + DCHECK(_mem_trackers[untracked_mem.first]) << print_debug_string(); + _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); + } + } + mem_tracker()->consume(_untracked_mem); + _untracked_mem = 0; +} + template <bool Existed> inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { - DCHECK(mem_tracker); + DCHECK(mem_tracker) << print_debug_string(); _temp_tracker_id = mem_tracker->id(); if (_temp_tracker_id == _tracker_id) { return _tracker_id; } if (Existed) { - DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()); + DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()) << print_debug_string(); } else { if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) { Review Comment: No need to call `find` here, just insert directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org