This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba7f5d892db [Improvement]Log be thread num (#37289)
ba7f5d892db is described below

commit ba7f5d892db4dcec76c396e836e23cce3eb02544
Author: wangbo <wan...@apache.org>
AuthorDate: Mon Jul 8 19:39:27 2024 +0800

    [Improvement]Log be thread num (#37289)
---
 be/src/common/config.cpp                           |  3 +
 be/src/common/config.h                             |  2 +
 be/src/common/daemon.cpp                           | 14 ++++
 be/src/common/daemon.h                             |  1 +
 .../action/be_proc_thread_action.cpp}              | 40 +++-------
 .../action/be_proc_thread_action.h}                | 39 +++-------
 be/src/runtime/be_proc_monitor.cpp                 | 88 ++++++++++++++++++++++
 .../{common/daemon.h => runtime/be_proc_monitor.h} | 40 ++--------
 be/src/service/http_service.cpp                    |  6 ++
 regression-test/pipeline/p0/conf/be.conf           |  3 +
 10 files changed, 144 insertions(+), 92 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a5677721326..96ce3cd6fa1 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1165,6 +1165,9 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 // cgroup
 DEFINE_mString(doris_cgroup_cpu_path, "");
 
+DEFINE_mBool(enable_be_proc_monitor, "false");
+DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");
+
 DEFINE_mBool(enable_workload_group_memory_gc, "true");
 
 DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0d424940cfc..16fc4a08d67 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1248,6 +1248,8 @@ DECLARE_mBool(exit_on_exception);
 
 // cgroup
 DECLARE_mString(doris_cgroup_cpu_path);
+DECLARE_mBool(enable_be_proc_monitor);
+DECLARE_mInt32(be_proc_monitor_interval_ms);
 
 DECLARE_mBool(enable_workload_group_memory_gc);
 
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 757b1605688..c97904f5677 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -48,6 +48,7 @@
 #include "olap/options.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
+#include "runtime/be_proc_monitor.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -399,6 +400,13 @@ void Daemon::wg_mem_used_refresh_thread() {
     }
 }
 
+void Daemon::be_proc_monitor_thread() {
+    while (!_stop_background_threads_latch.wait_for(
+            std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
+        LOG(INFO) << "log be thread num, " << 
BeProcMonitor::get_be_thread_info();
+    }
+}
+
 void Daemon::start() {
     Status st;
     st = Thread::create(
@@ -435,6 +443,12 @@ void Daemon::start() {
     st = Thread::create(
             "Daemon", "wg_mem_refresh_thread", [this]() { 
this->wg_mem_used_refresh_thread(); },
             &_threads.emplace_back());
+
+    if (config::enable_be_proc_monitor) {
+        st = Thread::create(
+                "Daemon", "be_proc_monitor_thread", [this]() { 
this->be_proc_monitor_thread(); },
+                &_threads.emplace_back());
+    }
     CHECK(st.ok()) << st;
 }
 
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 0c282a8516a..9dfb079b904 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -45,6 +45,7 @@ private:
     void je_purge_dirty_pages_thread() const;
     void report_runtime_query_statistics_thread();
     void wg_mem_used_refresh_thread();
+    void be_proc_monitor_thread();
 
     CountDownLatch _stop_background_threads_latch;
     std::vector<scoped_refptr<Thread>> _threads;
diff --git a/be/src/common/daemon.h 
b/be/src/http/action/be_proc_thread_action.cpp
similarity index 51%
copy from be/src/common/daemon.h
copy to be/src/http/action/be_proc_thread_action.cpp
index 0c282a8516a..a0762b3f316 100644
--- a/be/src/common/daemon.h
+++ b/be/src/http/action/be_proc_thread_action.cpp
@@ -15,38 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+#include "http/action/be_proc_thread_action.h"
 
-#include <vector>
-
-#include "gutil/ref_counted.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_status.h"
+#include "runtime/be_proc_monitor.h"
 
 namespace doris {
 
-class Daemon {
-public:
-    Daemon() : _stop_background_threads_latch(1) {}
-    ~Daemon() = default;
-
-    // Start background threads
-    void start();
-
-    // Stop background threads
-    void stop();
+const static std::string HEADER_JSON = "application/json";
 
-private:
-    void tcmalloc_gc_thread();
-    void memory_maintenance_thread();
-    void memory_gc_thread();
-    void memtable_memory_refresh_thread();
-    void calculate_metrics_thread();
-    void je_purge_dirty_pages_thread() const;
-    void report_runtime_query_statistics_thread();
-    void wg_mem_used_refresh_thread();
+void BeProcThreadAction::handle(HttpRequest* req) {
+    req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; 
version=0.0.4");
+    HttpChannel::send_reply(req, HttpStatus::OK, 
BeProcMonitor::get_be_thread_info());
+}
 
-    CountDownLatch _stop_background_threads_latch;
-    std::vector<scoped_refptr<Thread>> _threads;
-};
-} // namespace doris
+}; // namespace doris
\ No newline at end of file
diff --git a/be/src/common/daemon.h b/be/src/http/action/be_proc_thread_action.h
similarity index 53%
copy from be/src/common/daemon.h
copy to be/src/http/action/be_proc_thread_action.h
index 0c282a8516a..923bc3f56d9 100644
--- a/be/src/common/daemon.h
+++ b/be/src/http/action/be_proc_thread_action.h
@@ -14,39 +14,20 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
 #pragma once
 
-#include <vector>
-
-#include "gutil/ref_counted.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
+#include "http/http_handler.h"
+#include "http/http_request.h"
 
 namespace doris {
 
-class Daemon {
-public:
-    Daemon() : _stop_background_threads_latch(1) {}
-    ~Daemon() = default;
-
-    // Start background threads
-    void start();
+class HttpRequest;
 
-    // Stop background threads
-    void stop();
-
-private:
-    void tcmalloc_gc_thread();
-    void memory_maintenance_thread();
-    void memory_gc_thread();
-    void memtable_memory_refresh_thread();
-    void calculate_metrics_thread();
-    void je_purge_dirty_pages_thread() const;
-    void report_runtime_query_statistics_thread();
-    void wg_mem_used_refresh_thread();
-
-    CountDownLatch _stop_background_threads_latch;
-    std::vector<scoped_refptr<Thread>> _threads;
+class BeProcThreadAction : public HttpHandler {
+public:
+    BeProcThreadAction() = default;
+    ~BeProcThreadAction() override = default;
+    void handle(HttpRequest* req) override;
 };
-} // namespace doris
+
+}; // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/be_proc_monitor.cpp 
b/be/src/runtime/be_proc_monitor.cpp
new file mode 100644
index 00000000000..2a850fc8b04
--- /dev/null
+++ b/be/src/runtime/be_proc_monitor.cpp
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/be_proc_monitor.h"
+
+#include <fmt/format.h>
+#include <glog/logging.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <deque>
+#include <filesystem>
+#include <fstream>
+#include <map>
+#include <nlohmann/json.hpp>
+
+std::string BeProcMonitor::get_be_thread_info() {
+    int32_t pid = getpid();
+    std::string proc_path = fmt::format("/proc/{}/task", pid);
+    if (access(proc_path.c_str(), F_OK) != 0) {
+        LOG(WARNING) << "be proc path " << proc_path << " not exists.";
+        return "";
+    }
+
+    std::map<std::string, int> thread_num_map;
+
+    int total_thread_count = 0;
+    int distinct_thread_name_count = 0;
+    for (const auto& entry : std::filesystem::directory_iterator(proc_path)) {
+        const std::string tid_path = entry.path().string();
+        std::string thread_name_path = tid_path + "/comm";
+        struct stat st;
+        // == 0 means exists
+        if (stat(thread_name_path.c_str(), &st) == 0) {
+            // NOTE: there is no need to close std::ifstream, it's called 
during deconstruction.
+            // 
refer:https://stackoverflow.com/questions/748014/do-i-need-to-manually-close-an-ifstream
+            std::ifstream file(thread_name_path.c_str());
+            if (!file.is_open()) {
+                continue;
+            }
+            std::stringstream str_buf;
+            str_buf << file.rdbuf();
+            std::string thread_name = str_buf.str();
+            thread_name.erase(std::remove(thread_name.begin(), 
thread_name.end(), '\n'),
+                              thread_name.end());
+
+            if (thread_num_map.find(thread_name) != thread_num_map.end()) {
+                thread_num_map[thread_name]++;
+            } else {
+                distinct_thread_name_count++;
+                thread_num_map.emplace(thread_name, 1);
+            }
+            total_thread_count++;
+        }
+    }
+
+    std::deque<std::pair<std::string, int>> 
ordered_list(thread_num_map.begin(),
+                                                         thread_num_map.end());
+    std::sort(ordered_list.begin(), ordered_list.end(),
+              [](const auto& lhs, const auto& rhs) { return lhs.second > 
rhs.second; });
+
+    ordered_list.push_front(
+            std::make_pair("distinct_thread_name_count", 
distinct_thread_name_count));
+    ordered_list.push_front(std::make_pair("total_thread_count", 
total_thread_count));
+    ordered_list.push_front(std::make_pair("be_process_id", pid));
+
+    nlohmann::json js = nlohmann::json::array();
+    for (const auto& p : ordered_list) {
+        js.push_back({p.first, p.second});
+    }
+
+    std::string output_json_str = js.dump();
+    return output_json_str;
+}
\ No newline at end of file
diff --git a/be/src/common/daemon.h b/be/src/runtime/be_proc_monitor.h
similarity index 50%
copy from be/src/common/daemon.h
copy to be/src/runtime/be_proc_monitor.h
index 0c282a8516a..dfd962be012 100644
--- a/be/src/common/daemon.h
+++ b/be/src/runtime/be_proc_monitor.h
@@ -15,38 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
-
-#include <vector>
-
-#include "gutil/ref_counted.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
-
-namespace doris {
-
-class Daemon {
+// Currently BeProcMonitor used to read proc/<pid>/task/ and log be's thread 
num, we can find
+// which logic cost too much thread when BE core because of thread exhaustion.
+#include <string>
+class BeProcMonitor {
 public:
-    Daemon() : _stop_background_threads_latch(1) {}
-    ~Daemon() = default;
-
-    // Start background threads
-    void start();
-
-    // Stop background threads
-    void stop();
-
-private:
-    void tcmalloc_gc_thread();
-    void memory_maintenance_thread();
-    void memory_gc_thread();
-    void memtable_memory_refresh_thread();
-    void calculate_metrics_thread();
-    void je_purge_dirty_pages_thread() const;
-    void report_runtime_query_statistics_thread();
-    void wg_mem_used_refresh_thread();
-
-    CountDownLatch _stop_background_threads_latch;
-    std::vector<scoped_refptr<Thread>> _threads;
-};
-} // namespace doris
+    static std::string get_be_thread_info();
+};
\ No newline at end of file
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 651323f7880..cb4d36f050a 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "http/action/adjust_log_level.h"
 #include "http/action/adjust_tracing_dump.h"
+#include "http/action/be_proc_thread_action.h"
 #include "http/action/calc_file_crc_action.h"
 #include "http/action/check_rpc_channel_action.h"
 #include "http/action/check_tablet_segment_action.h"
@@ -175,6 +176,11 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::GET, 
"/api/query_pipeline_tasks/{query_id}",
                                       query_pipeline_task_action);
 
+    // Dump all be process thread num
+    BeProcThreadAction* be_proc_thread_action = _pool.add(new 
BeProcThreadAction());
+    _ev_http_server->register_handler(HttpMethod::GET, 
"/api/be_process_thread_num",
+                                      be_proc_thread_action);
+
     // Register BE LoadStream action
     LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
     _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", 
load_stream_action);
diff --git a/regression-test/pipeline/p0/conf/be.conf 
b/regression-test/pipeline/p0/conf/be.conf
index d5e447e8601..3720da6f741 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -64,3 +64,6 @@ trino_connector_plugin_dir=/tmp/trino_connector/connectors
 
 enable_jvm_monitor = true
 
+enable_be_proc_monitor = true
+be_proc_monitor_interval_ms = 30000
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to