This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2745a88814 [enhancement](memtracker) Fix brpc causing query mem tracker to be inaccurate #13401 2745a88814 is described below commit 2745a888143cba313625520438b0616eb35e7aed Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Oct 19 12:28:20 2022 +0800 [enhancement](memtracker) Fix brpc causing query mem tracker to be inaccurate #13401 --- be/src/runtime/memory/mem_tracker.cpp | 10 ++++------ be/src/runtime/memory/mem_tracker.h | 6 ++++++ be/src/runtime/memory/mem_tracker_limiter.h | 4 ---- be/src/runtime/memory/mem_tracker_task_pool.cpp | 10 +++++----- be/src/service/internal_service.cpp | 12 ++++-------- be/src/vec/runtime/vdata_stream_recvr.cpp | 5 ++++- 6 files changed, 23 insertions(+), 24 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 9c0b006281..0604d538dc 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -23,7 +23,6 @@ #include <fmt/format.h> #include "runtime/thread_context.h" -#include "util/pretty_printer.h" #include "util/string_util.h" #include "util/time.h" @@ -103,11 +102,10 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot } std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format( - "MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label, - snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES), - snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES), - snapshot.peak_consumption); + return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", + snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption), + snapshot.cur_consumption, print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } static std::unordered_map<std::string, std::shared_ptr<MemTracker>> global_mem_trackers; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 0b8ead634b..c939d9a62d 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -19,6 +19,7 @@ // and modified by Doris #pragma once +#include "util/pretty_printer.h" #include "util/runtime_profile.h" namespace doris { @@ -56,6 +57,11 @@ public: static std::shared_ptr<MemTracker> get_global_mem_tracker(const std::string& label); static void make_global_mem_tracker_snapshot(std::vector<MemTracker::Snapshot>* snapshots); + static std::string print_bytes(int64_t bytes) { + return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) + : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); + } + public: const std::string& label() const { return _label; } // Returns the memory consumed in bytes. diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 9a40e79b66..f6440c0ace 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -165,10 +165,6 @@ public: return msg.str(); } - static std::string print_bytes(int64_t bytes) { - return PrettyPrinter::print(bytes, TUnit::BYTES); - } - private: // The following func, for automatic memory tracking and limiting based on system memory allocation. friend class ThreadMemTrackerMgr; diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 04f2388515..58d98278c6 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -55,7 +55,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_scanner_me const std::string& query_id) { return register_task_mem_tracker_impl("Scanner#" + query_id, -1, fmt::format("Scanner#Query#Id={}", query_id), - ExecEnv::GetInstance()->query_pool_mem_tracker()); + get_task_mem_tracker(query_id)); } std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker( @@ -69,7 +69,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_scanner_mem const std::string& load_id) { return register_task_mem_tracker_impl("Scanner#" + load_id, -1, fmt::format("Scanner#Load#Id={}", load_id), - ExecEnv::GetInstance()->load_pool_mem_tracker()); + get_task_mem_tracker(load_id)); } std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker( @@ -104,9 +104,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { LOG(INFO) << fmt::format( "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", - it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES), - PrettyPrinter::print(it->second->consumption(), TUnit::BYTES), - PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES)); + it->first, MemTracker::print_bytes(it->second->limit()), + MemTracker::print_bytes(it->second->consumption()), + MemTracker::print_bytes(it->second->peak_consumption())); expired_task_ids.emplace_back(it->first); } else if (config::memory_verbose_track) { it->second->print_log_usage("query routine"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d7f1834843..a81c97bc20 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -132,10 +132,8 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_ query_id = print_id(request->query_id()); finst_id.__set_hi(request->finst_id().hi()); finst_id.__set_lo(request->finst_id().lo()); - // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer. - transmit_tracker = std::make_shared<MemTrackerLimiter>( - -1, fmt::format("QueryTransmit#queryId={}", query_id), - _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); + transmit_tracker = + _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id); } else { query_id = "unkown_transmit_data"; transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data"); @@ -642,10 +640,8 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl query_id = print_id(request->query_id()); finst_id.__set_hi(request->finst_id().hi()); finst_id.__set_lo(request->finst_id().lo()); - // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer. - transmit_tracker = std::make_shared<MemTrackerLimiter>( - -1, fmt::format("QueryTransmit#queryId={}", query_id), - _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); + transmit_tracker = + _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id); } else { query_id = "unkown_transmit_block"; transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block"); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 0e40404733..e80354af25 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { } // _cur_batch must be replaced with the returned batch. - _current_block.reset(); + { + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + _current_block.reset(); + } *next_block = nullptr; if (_is_cancelled) { return Status::Cancelled("Cancelled"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org