This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d5fa66d9a3 [Enhancement] [Memory] Limit memory usage use process actual physical memory (#10924) d5fa66d9a3 is described below commit d5fa66d9a3df6bd867743144ac542c20a87ec06c Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Jul 19 11:08:39 2022 +0800 [Enhancement] [Memory] Limit memory usage use process actual physical memory (#10924) --- be/src/http/action/compaction_action.h | 9 +-- be/src/olap/rowset/segment_v2/segment.cpp | 5 +- be/src/runtime/exec_env_init.cpp | 1 - be/src/runtime/mem_tracker.cpp | 1 + be/src/runtime/mem_tracker.h | 7 ++- be/src/runtime/tablets_channel.h | 2 +- be/src/service/doris_main.cpp | 5 +- be/src/util/CMakeLists.txt | 2 +- be/src/util/mem_info.h | 2 + be/src/util/perf_counters.cpp | 97 ++++++++++++++++++++++++++----- be/src/util/perf_counters.h | 22 +++++++ 11 files changed, 124 insertions(+), 29 deletions(-) diff --git a/be/src/http/action/compaction_action.h b/be/src/http/action/compaction_action.h index 80b11bb436..943db1a85a 100644 --- a/be/src/http/action/compaction_action.h +++ b/be/src/http/action/compaction_action.h @@ -39,12 +39,7 @@ const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative"; /// See compaction-action.md for details. class CompactionAction : public HttpHandler { public: - CompactionAction(CompactionActionType type) : _type(type) { - _compaction_mem_tracker = - type == RUN_COMPACTION ? MemTracker::create_tracker(-1, "ManualCompaction", nullptr, - MemTrackerLevel::VERBOSE) - : nullptr; - } + CompactionAction(CompactionActionType type) : _type(type) {} virtual ~CompactionAction() {} @@ -75,8 +70,6 @@ private: static std::mutex _compaction_running_mutex; /// whether there is manual compaction running static bool _is_compaction_running; - /// memory tracker - std::shared_ptr<MemTracker> _compaction_mem_tracker; }; } // end namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index f3922752d8..f4e96833d3 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -48,9 +48,10 @@ Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t segme Segment::Segment(uint32_t segment_id, const TabletSchema* tablet_schema) : _segment_id(segment_id), _tablet_schema(*tablet_schema) { #ifndef BE_TEST - _mem_tracker = StorageEngine::instance()->tablet_mem_tracker(); + _mem_tracker = MemTracker::create_virtual_tracker( + -1, "Segment", StorageEngine::instance()->tablet_mem_tracker()); #else - _mem_tracker = MemTracker::get_process_tracker(); + _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment"); #endif } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 6d62e0a258..e82473b01f 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -183,7 +183,6 @@ Status ExecEnv::_init_mem_tracker() { << ". Using physical memory instead"; global_memory_limit_bytes = MemInfo::physical_mem(); } - MemTracker::get_process_tracker()->set_limit(global_memory_limit_bytes); _query_pool_mem_tracker = MemTracker::create_tracker( -1, "QueryPool", MemTracker::get_process_tracker(), MemTrackerLevel::OVERVIEW); REGISTER_HOOK_METRIC(query_mem_consumption, diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index 71e89f8b1e..ffa7ae188b 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -39,6 +39,7 @@ const std::string MemTracker::COUNTER_NAME = "PeakMemoryUsage"; // The ancestor for all trackers. Every tracker is visible from the process down. // All manually created trackers should specify the process tracker as the parent. +// Not limit total memory by process tracker, and it's just used to track virtual memory of process. static std::shared_ptr<MemTracker> process_tracker; static MemTracker* raw_process_tracker; static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT; diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index e69f6c80ad..5b51ae30e2 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -29,6 +29,7 @@ #include "common/config.h" #include "common/status.h" #include "util/mem_info.h" +#include "util/perf_counters.h" #include "util/runtime_profile.h" #include "util/spinlock.h" @@ -112,7 +113,11 @@ public: static std::shared_ptr<MemTracker> get_temporary_mem_tracker(const std::string& label); Status check_sys_mem_info(int64_t bytes) { - if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { + // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`. + // This is independent of the consumption value of the mem tracker, which counts the virtual memory + // of the process malloc. + // for fast, expect MemInfo::initialized() to be true. + if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) { return Status::MemoryLimitExceeded( "{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}", _label, bytes, MemInfo::current_mem(), MemInfo::mem_limit()); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 5ad1b47027..69897544ca 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -171,7 +171,7 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request template <typename TabletWriterAddRequest, typename TabletWriterAddResult> Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t cur_seq = 0; auto status = _get_current_seq(cur_seq, request); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index c746f07b76..af65cab0e8 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -58,6 +58,7 @@ #include "util/debug_util.h" #include "util/doris_metrics.h" #include "util/logging.h" +#include "util/perf_counters.h" #include "util/telemetry/telemetry.h" #include "util/thrift_rpc_helper.h" #include "util/thrift_server.h" @@ -478,11 +479,13 @@ int main(int argc, char** argv) { !defined(USE_JEMALLOC) doris::MemInfo::refresh_current_mem(); #endif + doris::PerfCounters::refresh_proc_status(); + // TODO(zxy) 10s is too long to clear the expired task mem tracker. // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory. // It should be actively triggered at the end of query/load. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); - sleep(10); + sleep(1); } http_service.stop(); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index ae31a7550e..b2523eeeab 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -47,7 +47,7 @@ set(UTIL_FILES parse_util.cpp path_builder.cpp # TODO: not supported on RHEL 5 -# perf-counters.cpp + perf_counters.cpp progress_updater.cpp runtime_profile.cpp static_asserts.cpp diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 7827271604..7ce77301de 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -46,6 +46,8 @@ public: static inline size_t current_mem() { return _s_current_mem; } + // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory + // obtained by the process malloc, not the physical memory actually used by the process in the OS. static inline void refresh_current_mem() { MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", &_s_current_mem); diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp index 30dd7ea763..2e9d51d56c 100644 --- a/be/src/util/perf_counters.cpp +++ b/be/src/util/perf_counters.cpp @@ -26,12 +26,17 @@ #include <string.h> #include <sys/syscall.h> +#include <boost/algorithm/string/trim.hpp> #include <fstream> #include <iomanip> #include <iostream> #include <sstream> +#include "gutil/strings/substitute.h" #include "util/debug_util.h" +#include "util/pretty_printer.h" +#include "util/string_parser.hpp" +#include "util/string_util.h" namespace doris { @@ -39,6 +44,8 @@ namespace doris { #define BUFFER_SIZE 256 #define PRETTY_PRINT_WIDTH 13 +static std::unordered_map<std::string, std::string> _process_state; + // This is the order of the counters in /proc/self/io enum PERF_IO_IDX { PROC_IO_READ = 0, @@ -126,7 +133,7 @@ static bool init_event_attr(perf_event_attr* attr, PerfCounters::Counter counter return true; } -static string get_counter_name(PerfCounters::Counter counter) { +static std::string get_counter_name(PerfCounters::Counter counter) { switch (counter) { case PerfCounters::PERF_COUNTER_SW_CPU_CLOCK: return "CPUTime"; @@ -278,7 +285,7 @@ bool PerfCounters::init_proc_self_status_counter(Counter counter) { return true; } -bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) { +bool PerfCounters::get_sys_counters(std::vector<int64_t>& buffer) { for (int i = 0; i < _counters.size(); i++) { if (_counters[i].source == SYS_PERF_COUNTER) { int num_bytes = read(_counters[i].fd, &buffer[i], COUNTER_SIZE); @@ -306,7 +313,7 @@ bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) { // read_bytes: 0 // write_bytes: 0 // cancelled_write_bytes: 0 -bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) { +bool PerfCounters::get_proc_self_io_counters(std::vector<int64_t>& buffer) { std::ifstream file("/proc/self/io", std::ios::in); std::string buf; int64_t values[PROC_IO_LAST_COUNTER]; @@ -346,9 +353,9 @@ bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) { return true; } -bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) { +bool PerfCounters::get_proc_self_status_counters(std::vector<int64_t>& buffer) { std::ifstream file("/proc/self/status", std::ios::in); - string buf; + std::string buf; while (file) { getline(file, buf); @@ -357,13 +364,13 @@ bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) { if (_counters[i].source == PROC_SELF_STATUS) { size_t field = buf.find(_counters[i].proc_status_field); - if (field == string::npos) { + if (field == std::string::npos) { continue; } size_t colon = field + _counters[i].proc_status_field.size() + 1; buf = buf.substr(colon + 1); - istringstream stream(buf); + std::istringstream stream(buf); int64_t value; stream >> value; buffer[i] = value * 1024; // values in file are in kb @@ -458,12 +465,12 @@ bool PerfCounters::add_counter(Counter counter) { } // Query all the counters right now and store the values in results -void PerfCounters::snapshot(const string& name) { +void PerfCounters::snapshot(const std::string& name) { if (_counters.size() == 0) { return; } - string fixed_name = name; + std::string fixed_name = name; if (fixed_name.size() == 0) { std::stringstream ss; @@ -489,22 +496,22 @@ const std::vector<int64_t>* PerfCounters::counters(int snapshot) const { return &_snapshots[snapshot]; } -void PerfCounters::pretty_print(ostream* s) const { +void PerfCounters::pretty_print(std::ostream* s) const { std::ostream& stream = *s; - std::stream << setw(8) << "snapshot"; + stream << std::setw(8) << "snapshot"; for (int i = 0; i < _counter_names.size(); ++i) { - stream << setw(PRETTY_PRINT_WIDTH) << _counter_names[i]; + stream << std::setw(PRETTY_PRINT_WIDTH) << _counter_names[i]; } stream << std::endl; for (int s = 0; s < _snapshots.size(); s++) { - stream << setw(8) << _snapshot_names[s]; + stream << std::setw(8) << _snapshot_names[s]; const std::vector<int64_t>& snapshot = _snapshots[s]; for (int i = 0; i < snapshot.size(); ++i) { - stream << setw(PRETTY_PRINT_WIDTH) + stream << std::setw(PRETTY_PRINT_WIDTH) << PrettyPrinter::print(snapshot[i], _counters[i].type); } @@ -514,4 +521,66 @@ void PerfCounters::pretty_print(ostream* s) const { stream << std::endl; } +// Refactor below + +int PerfCounters::parse_int(const string& state_key) { + auto it = _process_state.find(state_key); + if (it != _process_state.end()) return atoi(it->second.c_str()); + return -1; +} + +int64_t PerfCounters::parse_int64(const string& state_key) { + auto it = _process_state.find(state_key); + if (it != _process_state.end()) { + StringParser::ParseResult result; + int64_t state_value = + StringParser::string_to_int<int64_t>(it->second.data(), it->second.size(), &result); + if (result == StringParser::PARSE_SUCCESS) return state_value; + } + return -1; +} + +string PerfCounters::parse_string(const string& state_key) { + auto it = _process_state.find(state_key); + if (it != _process_state.end()) return it->second; + return string(); +} + +int64_t PerfCounters::parse_bytes(const string& state_key) { + auto it = _process_state.find(state_key); + if (it != _process_state.end()) { + vector<string> fields = split(it->second, " "); + // We expect state_value such as, e.g., '16129508', '16129508 kB', '16129508 mB' + StringParser::ParseResult result; + int64_t state_value = + StringParser::string_to_int<int64_t>(fields[0].data(), fields[0].size(), &result); + if (result == StringParser::PARSE_SUCCESS) { + if (fields.size() < 2) return state_value; + if (fields[1].compare("kB") == 0) return state_value * 1024L; + } + } + return -1; +} + +void PerfCounters::refresh_proc_status() { + std::ifstream statusinfo("/proc/self/status", std::ios::in); + std::string line; + while (statusinfo.good() && !statusinfo.eof()) { + getline(statusinfo, line); + std::vector<std::string> fields = split(line, "\t"); + if (fields.size() < 2) continue; + boost::algorithm::trim(fields[1]); + std::string key = fields[0].substr(0, fields[0].size() - 1); + _process_state[strings::Substitute("status/$0", key)] = fields[1]; + } + + if (statusinfo.is_open()) statusinfo.close(); +} + +void PerfCounters::get_proc_status(ProcStatus* out) { + out->vm_size = parse_bytes("status/VmSize"); + out->vm_peak = parse_bytes("status/VmPeak"); + out->vm_rss = parse_bytes("status/VmRSS"); +} + } // namespace doris diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h index adad06c354..c2a4b1c4d9 100644 --- a/be/src/util/perf_counters.h +++ b/be/src/util/perf_counters.h @@ -41,6 +41,8 @@ // <do your work> // counters.snapshot("After Work"); // counters.PrettyPrint(cout); +// +// TODO: Expect PerfCounters to be refactored to ProcessState. namespace doris { @@ -95,6 +97,26 @@ public: PerfCounters(); ~PerfCounters(); + // Refactor + + struct ProcStatus { + int64_t vm_size = 0; + int64_t vm_peak = 0; + int64_t vm_rss = 0; + }; + + static int parse_int(const std::string& state_key); + static int64_t parse_int64(const std::string& state_key); + static std::string parse_string(const std::string& state_key); + // Original data's unit is B or KB. + static int64_t parse_bytes(const std::string& state_key); + + // Flush cached process status info from `/proc/self/status`. + static void refresh_proc_status(); + static void get_proc_status(ProcStatus* out); + // Return the process actual physical memory in bytes. + static inline int64_t get_vm_rss() { return parse_bytes("status/VmRSS"); } + private: // Copy constructor and assignment not allowed PerfCounters(const PerfCounters&); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org