This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d5fa66d9a3 [Enhancement] [Memory] Limit memory usage use process 
actual physical memory (#10924)
d5fa66d9a3 is described below

commit d5fa66d9a3df6bd867743144ac542c20a87ec06c
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Jul 19 11:08:39 2022 +0800

    [Enhancement] [Memory] Limit memory usage use process actual physical 
memory (#10924)
---
 be/src/http/action/compaction_action.h    |  9 +--
 be/src/olap/rowset/segment_v2/segment.cpp |  5 +-
 be/src/runtime/exec_env_init.cpp          |  1 -
 be/src/runtime/mem_tracker.cpp            |  1 +
 be/src/runtime/mem_tracker.h              |  7 ++-
 be/src/runtime/tablets_channel.h          |  2 +-
 be/src/service/doris_main.cpp             |  5 +-
 be/src/util/CMakeLists.txt                |  2 +-
 be/src/util/mem_info.h                    |  2 +
 be/src/util/perf_counters.cpp             | 97 ++++++++++++++++++++++++++-----
 be/src/util/perf_counters.h               | 22 +++++++
 11 files changed, 124 insertions(+), 29 deletions(-)

diff --git a/be/src/http/action/compaction_action.h 
b/be/src/http/action/compaction_action.h
index 80b11bb436..943db1a85a 100644
--- a/be/src/http/action/compaction_action.h
+++ b/be/src/http/action/compaction_action.h
@@ -39,12 +39,7 @@ const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
 /// See compaction-action.md for details.
 class CompactionAction : public HttpHandler {
 public:
-    CompactionAction(CompactionActionType type) : _type(type) {
-        _compaction_mem_tracker =
-                type == RUN_COMPACTION ? MemTracker::create_tracker(-1, 
"ManualCompaction", nullptr,
-                                                                    
MemTrackerLevel::VERBOSE)
-                                       : nullptr;
-    }
+    CompactionAction(CompactionActionType type) : _type(type) {}
 
     virtual ~CompactionAction() {}
 
@@ -75,8 +70,6 @@ private:
     static std::mutex _compaction_running_mutex;
     /// whether there is manual compaction running
     static bool _is_compaction_running;
-    /// memory tracker
-    std::shared_ptr<MemTracker> _compaction_mem_tracker;
 };
 
 } // end namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index f3922752d8..f4e96833d3 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -48,9 +48,10 @@ Status Segment::open(io::FileSystem* fs, const std::string& 
path, uint32_t segme
 Segment::Segment(uint32_t segment_id, const TabletSchema* tablet_schema)
         : _segment_id(segment_id), _tablet_schema(*tablet_schema) {
 #ifndef BE_TEST
-    _mem_tracker = StorageEngine::instance()->tablet_mem_tracker();
+    _mem_tracker = MemTracker::create_virtual_tracker(
+            -1, "Segment", StorageEngine::instance()->tablet_mem_tracker());
 #else
-    _mem_tracker = MemTracker::get_process_tracker();
+    _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment");
 #endif
 }
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 6d62e0a258..e82473b01f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -183,7 +183,6 @@ Status ExecEnv::_init_mem_tracker() {
                      << ". Using physical memory instead";
         global_memory_limit_bytes = MemInfo::physical_mem();
     }
-    MemTracker::get_process_tracker()->set_limit(global_memory_limit_bytes);
     _query_pool_mem_tracker = MemTracker::create_tracker(
             -1, "QueryPool", MemTracker::get_process_tracker(), 
MemTrackerLevel::OVERVIEW);
     REGISTER_HOOK_METRIC(query_mem_consumption,
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index 71e89f8b1e..ffa7ae188b 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -39,6 +39,7 @@ const std::string MemTracker::COUNTER_NAME = 
"PeakMemoryUsage";
 
 // The ancestor for all trackers. Every tracker is visible from the process 
down.
 // All manually created trackers should specify the process tracker as the 
parent.
+// Not limit total memory by process tracker, and it's just used to track 
virtual memory of process.
 static std::shared_ptr<MemTracker> process_tracker;
 static MemTracker* raw_process_tracker;
 static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT;
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index e69f6c80ad..5b51ae30e2 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -29,6 +29,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "util/mem_info.h"
+#include "util/perf_counters.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 
@@ -112,7 +113,11 @@ public:
     static std::shared_ptr<MemTracker> get_temporary_mem_tracker(const 
std::string& label);
 
     Status check_sys_mem_info(int64_t bytes) {
-        if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= 
MemInfo::mem_limit()) {
+        // Limit process memory usage using the actual physical memory of the 
process in `/proc/self/status`.
+        // This is independent of the consumption value of the mem tracker, 
which counts the virtual memory
+        // of the process malloc.
+        // for fast, expect MemInfo::initialized() to be true.
+        if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
             return Status::MemoryLimitExceeded(
                     "{}: TryConsume failed, bytes={} process whole 
consumption={}  mem limit={}",
                     _label, bytes, MemInfo::current_mem(), 
MemInfo::mem_limit());
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 5ad1b47027..69897544ca 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -171,7 +171,7 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq, 
const Request& request
 template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
 Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
                                  TabletWriterAddResult* response) {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t cur_seq = 0;
 
     auto status = _get_current_seq(cur_seq, request);
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index c746f07b76..af65cab0e8 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -58,6 +58,7 @@
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/logging.h"
+#include "util/perf_counters.h"
 #include "util/telemetry/telemetry.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/thrift_server.h"
@@ -478,11 +479,13 @@ int main(int argc, char** argv) {
         !defined(USE_JEMALLOC)
         doris::MemInfo::refresh_current_mem();
 #endif
+        doris::PerfCounters::refresh_proc_status();
+
         // TODO(zxy) 10s is too long to clear the expired task mem tracker.
         // A query mem tracker is about 57 bytes, assuming 10000 qps, which 
wastes about 55M of memory.
         // It should be actively triggered at the end of query/load.
         
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
-        sleep(10);
+        sleep(1);
     }
 
     http_service.stop();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index ae31a7550e..b2523eeeab 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -47,7 +47,7 @@ set(UTIL_FILES
   parse_util.cpp
   path_builder.cpp
 # TODO: not supported on RHEL 5
-# perf-counters.cpp
+  perf_counters.cpp
   progress_updater.cpp
   runtime_profile.cpp
   static_asserts.cpp
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 7827271604..7ce77301de 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -46,6 +46,8 @@ public:
 
     static inline size_t current_mem() { return _s_current_mem; }
 
+    // Tcmalloc property `generic.total_physical_bytes` records the total 
length of the virtual memory
+    // obtained by the process malloc, not the physical memory actually used 
by the process in the OS.
     static inline void refresh_current_mem() {
         
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
                                                         &_s_current_mem);
diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp
index 30dd7ea763..2e9d51d56c 100644
--- a/be/src/util/perf_counters.cpp
+++ b/be/src/util/perf_counters.cpp
@@ -26,12 +26,17 @@
 #include <string.h>
 #include <sys/syscall.h>
 
+#include <boost/algorithm/string/trim.hpp>
 #include <fstream>
 #include <iomanip>
 #include <iostream>
 #include <sstream>
 
+#include "gutil/strings/substitute.h"
 #include "util/debug_util.h"
+#include "util/pretty_printer.h"
+#include "util/string_parser.hpp"
+#include "util/string_util.h"
 
 namespace doris {
 
@@ -39,6 +44,8 @@ namespace doris {
 #define BUFFER_SIZE 256
 #define PRETTY_PRINT_WIDTH 13
 
+static std::unordered_map<std::string, std::string> _process_state;
+
 // This is the order of the counters in /proc/self/io
 enum PERF_IO_IDX {
     PROC_IO_READ = 0,
@@ -126,7 +133,7 @@ static bool init_event_attr(perf_event_attr* attr, 
PerfCounters::Counter counter
     return true;
 }
 
-static string get_counter_name(PerfCounters::Counter counter) {
+static std::string get_counter_name(PerfCounters::Counter counter) {
     switch (counter) {
     case PerfCounters::PERF_COUNTER_SW_CPU_CLOCK:
         return "CPUTime";
@@ -278,7 +285,7 @@ bool PerfCounters::init_proc_self_status_counter(Counter 
counter) {
     return true;
 }
 
-bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_sys_counters(std::vector<int64_t>& buffer) {
     for (int i = 0; i < _counters.size(); i++) {
         if (_counters[i].source == SYS_PERF_COUNTER) {
             int num_bytes = read(_counters[i].fd, &buffer[i], COUNTER_SIZE);
@@ -306,7 +313,7 @@ bool PerfCounters::get_sys_counters(vector<int64_t>& 
buffer) {
 //    read_bytes: 0
 //    write_bytes: 0
 //    cancelled_write_bytes: 0
-bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_io_counters(std::vector<int64_t>& buffer) {
     std::ifstream file("/proc/self/io", std::ios::in);
     std::string buf;
     int64_t values[PROC_IO_LAST_COUNTER];
@@ -346,9 +353,9 @@ bool 
PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
     return true;
 }
 
-bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_status_counters(std::vector<int64_t>& buffer) 
{
     std::ifstream file("/proc/self/status", std::ios::in);
-    string buf;
+    std::string buf;
 
     while (file) {
         getline(file, buf);
@@ -357,13 +364,13 @@ bool 
PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
             if (_counters[i].source == PROC_SELF_STATUS) {
                 size_t field = buf.find(_counters[i].proc_status_field);
 
-                if (field == string::npos) {
+                if (field == std::string::npos) {
                     continue;
                 }
 
                 size_t colon = field + _counters[i].proc_status_field.size() + 
1;
                 buf = buf.substr(colon + 1);
-                istringstream stream(buf);
+                std::istringstream stream(buf);
                 int64_t value;
                 stream >> value;
                 buffer[i] = value * 1024; // values in file are in kb
@@ -458,12 +465,12 @@ bool PerfCounters::add_counter(Counter counter) {
 }
 
 // Query all the counters right now and store the values in results
-void PerfCounters::snapshot(const string& name) {
+void PerfCounters::snapshot(const std::string& name) {
     if (_counters.size() == 0) {
         return;
     }
 
-    string fixed_name = name;
+    std::string fixed_name = name;
 
     if (fixed_name.size() == 0) {
         std::stringstream ss;
@@ -489,22 +496,22 @@ const std::vector<int64_t>* PerfCounters::counters(int 
snapshot) const {
     return &_snapshots[snapshot];
 }
 
-void PerfCounters::pretty_print(ostream* s) const {
+void PerfCounters::pretty_print(std::ostream* s) const {
     std::ostream& stream = *s;
-    std::stream << setw(8) << "snapshot";
+    stream << std::setw(8) << "snapshot";
 
     for (int i = 0; i < _counter_names.size(); ++i) {
-        stream << setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
+        stream << std::setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
     }
 
     stream << std::endl;
 
     for (int s = 0; s < _snapshots.size(); s++) {
-        stream << setw(8) << _snapshot_names[s];
+        stream << std::setw(8) << _snapshot_names[s];
         const std::vector<int64_t>& snapshot = _snapshots[s];
 
         for (int i = 0; i < snapshot.size(); ++i) {
-            stream << setw(PRETTY_PRINT_WIDTH)
+            stream << std::setw(PRETTY_PRINT_WIDTH)
                    << PrettyPrinter::print(snapshot[i], _counters[i].type);
         }
 
@@ -514,4 +521,66 @@ void PerfCounters::pretty_print(ostream* s) const {
     stream << std::endl;
 }
 
+// Refactor below
+
+int PerfCounters::parse_int(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) return atoi(it->second.c_str());
+    return -1;
+}
+
+int64_t PerfCounters::parse_int64(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) {
+        StringParser::ParseResult result;
+        int64_t state_value =
+                StringParser::string_to_int<int64_t>(it->second.data(), 
it->second.size(), &result);
+        if (result == StringParser::PARSE_SUCCESS) return state_value;
+    }
+    return -1;
+}
+
+string PerfCounters::parse_string(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) return it->second;
+    return string();
+}
+
+int64_t PerfCounters::parse_bytes(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) {
+        vector<string> fields = split(it->second, " ");
+        // We expect state_value such as, e.g., '16129508', '16129508 kB', 
'16129508 mB'
+        StringParser::ParseResult result;
+        int64_t state_value =
+                StringParser::string_to_int<int64_t>(fields[0].data(), 
fields[0].size(), &result);
+        if (result == StringParser::PARSE_SUCCESS) {
+            if (fields.size() < 2) return state_value;
+            if (fields[1].compare("kB") == 0) return state_value * 1024L;
+        }
+    }
+    return -1;
+}
+
+void PerfCounters::refresh_proc_status() {
+    std::ifstream statusinfo("/proc/self/status", std::ios::in);
+    std::string line;
+    while (statusinfo.good() && !statusinfo.eof()) {
+        getline(statusinfo, line);
+        std::vector<std::string> fields = split(line, "\t");
+        if (fields.size() < 2) continue;
+        boost::algorithm::trim(fields[1]);
+        std::string key = fields[0].substr(0, fields[0].size() - 1);
+        _process_state[strings::Substitute("status/$0", key)] = fields[1];
+    }
+
+    if (statusinfo.is_open()) statusinfo.close();
+}
+
+void PerfCounters::get_proc_status(ProcStatus* out) {
+    out->vm_size = parse_bytes("status/VmSize");
+    out->vm_peak = parse_bytes("status/VmPeak");
+    out->vm_rss = parse_bytes("status/VmRSS");
+}
+
 } // namespace doris
diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h
index adad06c354..c2a4b1c4d9 100644
--- a/be/src/util/perf_counters.h
+++ b/be/src/util/perf_counters.h
@@ -41,6 +41,8 @@
 //  <do your work>
 //  counters.snapshot("After Work");
 //  counters.PrettyPrint(cout);
+//
+// TODO: Expect PerfCounters to be refactored to ProcessState.
 
 namespace doris {
 
@@ -95,6 +97,26 @@ public:
     PerfCounters();
     ~PerfCounters();
 
+    // Refactor
+
+    struct ProcStatus {
+        int64_t vm_size = 0;
+        int64_t vm_peak = 0;
+        int64_t vm_rss = 0;
+    };
+
+    static int parse_int(const std::string& state_key);
+    static int64_t parse_int64(const std::string& state_key);
+    static std::string parse_string(const std::string& state_key);
+    // Original data's unit is B or KB.
+    static int64_t parse_bytes(const std::string& state_key);
+
+    // Flush cached process status info from `/proc/self/status`.
+    static void refresh_proc_status();
+    static void get_proc_status(ProcStatus* out);
+    // Return the process actual physical memory in bytes.
+    static inline int64_t get_vm_rss() { return parse_bytes("status/VmRSS"); }
+
 private:
     // Copy constructor and assignment not allowed
     PerfCounters(const PerfCounters&);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to