This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ba7f5d892db [Improvement]Log be thread num (#37289) ba7f5d892db is described below commit ba7f5d892db4dcec76c396e836e23cce3eb02544 Author: wangbo <wan...@apache.org> AuthorDate: Mon Jul 8 19:39:27 2024 +0800 [Improvement]Log be thread num (#37289) --- be/src/common/config.cpp | 3 + be/src/common/config.h | 2 + be/src/common/daemon.cpp | 14 ++++ be/src/common/daemon.h | 1 + .../action/be_proc_thread_action.cpp} | 40 +++------- .../action/be_proc_thread_action.h} | 39 +++------- be/src/runtime/be_proc_monitor.cpp | 88 ++++++++++++++++++++++ .../{common/daemon.h => runtime/be_proc_monitor.h} | 40 ++-------- be/src/service/http_service.cpp | 6 ++ regression-test/pipeline/p0/conf/be.conf | 3 + 10 files changed, 144 insertions(+), 92 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a5677721326..96ce3cd6fa1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1165,6 +1165,9 @@ DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup DEFINE_mString(doris_cgroup_cpu_path, ""); +DEFINE_mBool(enable_be_proc_monitor, "false"); +DEFINE_mInt32(be_proc_monitor_interval_ms, "10000"); + DEFINE_mBool(enable_workload_group_memory_gc, "true"); DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 0d424940cfc..16fc4a08d67 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1248,6 +1248,8 @@ DECLARE_mBool(exit_on_exception); // cgroup DECLARE_mString(doris_cgroup_cpu_path); +DECLARE_mBool(enable_be_proc_monitor); +DECLARE_mInt32(be_proc_monitor_interval_ms); DECLARE_mBool(enable_workload_group_memory_gc); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 757b1605688..c97904f5677 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -48,6 +48,7 @@ #include "olap/options.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "runtime/be_proc_monitor.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -399,6 +400,13 @@ void Daemon::wg_mem_used_refresh_thread() { } } +void Daemon::be_proc_monitor_thread() { + while (!_stop_background_threads_latch.wait_for( + std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) { + LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info(); + } +} + void Daemon::start() { Status st; st = Thread::create( @@ -435,6 +443,12 @@ void Daemon::start() { st = Thread::create( "Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); }, &_threads.emplace_back()); + + if (config::enable_be_proc_monitor) { + st = Thread::create( + "Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); }, + &_threads.emplace_back()); + } CHECK(st.ok()) << st; } diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 0c282a8516a..9dfb079b904 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -45,6 +45,7 @@ private: void je_purge_dirty_pages_thread() const; void report_runtime_query_statistics_thread(); void wg_mem_used_refresh_thread(); + void be_proc_monitor_thread(); CountDownLatch _stop_background_threads_latch; std::vector<scoped_refptr<Thread>> _threads; diff --git a/be/src/common/daemon.h b/be/src/http/action/be_proc_thread_action.cpp similarity index 51% copy from be/src/common/daemon.h copy to be/src/http/action/be_proc_thread_action.cpp index 0c282a8516a..a0762b3f316 100644 --- a/be/src/common/daemon.h +++ b/be/src/http/action/be_proc_thread_action.cpp @@ -15,38 +15,20 @@ // specific language governing permissions and limitations // under the License. -#pragma once +#include "http/action/be_proc_thread_action.h" -#include <vector> - -#include "gutil/ref_counted.h" -#include "util/countdown_latch.h" -#include "util/thread.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_status.h" +#include "runtime/be_proc_monitor.h" namespace doris { -class Daemon { -public: - Daemon() : _stop_background_threads_latch(1) {} - ~Daemon() = default; - - // Start background threads - void start(); - - // Stop background threads - void stop(); +const static std::string HEADER_JSON = "application/json"; -private: - void tcmalloc_gc_thread(); - void memory_maintenance_thread(); - void memory_gc_thread(); - void memtable_memory_refresh_thread(); - void calculate_metrics_thread(); - void je_purge_dirty_pages_thread() const; - void report_runtime_query_statistics_thread(); - void wg_mem_used_refresh_thread(); +void BeProcThreadAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); + HttpChannel::send_reply(req, HttpStatus::OK, BeProcMonitor::get_be_thread_info()); +} - CountDownLatch _stop_background_threads_latch; - std::vector<scoped_refptr<Thread>> _threads; -}; -} // namespace doris +}; // namespace doris \ No newline at end of file diff --git a/be/src/common/daemon.h b/be/src/http/action/be_proc_thread_action.h similarity index 53% copy from be/src/common/daemon.h copy to be/src/http/action/be_proc_thread_action.h index 0c282a8516a..923bc3f56d9 100644 --- a/be/src/common/daemon.h +++ b/be/src/http/action/be_proc_thread_action.h @@ -14,39 +14,20 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - #pragma once -#include <vector> - -#include "gutil/ref_counted.h" -#include "util/countdown_latch.h" -#include "util/thread.h" +#include "http/http_handler.h" +#include "http/http_request.h" namespace doris { -class Daemon { -public: - Daemon() : _stop_background_threads_latch(1) {} - ~Daemon() = default; - - // Start background threads - void start(); +class HttpRequest; - // Stop background threads - void stop(); - -private: - void tcmalloc_gc_thread(); - void memory_maintenance_thread(); - void memory_gc_thread(); - void memtable_memory_refresh_thread(); - void calculate_metrics_thread(); - void je_purge_dirty_pages_thread() const; - void report_runtime_query_statistics_thread(); - void wg_mem_used_refresh_thread(); - - CountDownLatch _stop_background_threads_latch; - std::vector<scoped_refptr<Thread>> _threads; +class BeProcThreadAction : public HttpHandler { +public: + BeProcThreadAction() = default; + ~BeProcThreadAction() override = default; + void handle(HttpRequest* req) override; }; -} // namespace doris + +}; // namespace doris \ No newline at end of file diff --git a/be/src/runtime/be_proc_monitor.cpp b/be/src/runtime/be_proc_monitor.cpp new file mode 100644 index 00000000000..2a850fc8b04 --- /dev/null +++ b/be/src/runtime/be_proc_monitor.cpp @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/be_proc_monitor.h" + +#include <fmt/format.h> +#include <glog/logging.h> +#include <sys/stat.h> +#include <unistd.h> + +#include <deque> +#include <filesystem> +#include <fstream> +#include <map> +#include <nlohmann/json.hpp> + +std::string BeProcMonitor::get_be_thread_info() { + int32_t pid = getpid(); + std::string proc_path = fmt::format("/proc/{}/task", pid); + if (access(proc_path.c_str(), F_OK) != 0) { + LOG(WARNING) << "be proc path " << proc_path << " not exists."; + return ""; + } + + std::map<std::string, int> thread_num_map; + + int total_thread_count = 0; + int distinct_thread_name_count = 0; + for (const auto& entry : std::filesystem::directory_iterator(proc_path)) { + const std::string tid_path = entry.path().string(); + std::string thread_name_path = tid_path + "/comm"; + struct stat st; + // == 0 means exists + if (stat(thread_name_path.c_str(), &st) == 0) { + // NOTE: there is no need to close std::ifstream, it's called during deconstruction. + // refer:https://stackoverflow.com/questions/748014/do-i-need-to-manually-close-an-ifstream + std::ifstream file(thread_name_path.c_str()); + if (!file.is_open()) { + continue; + } + std::stringstream str_buf; + str_buf << file.rdbuf(); + std::string thread_name = str_buf.str(); + thread_name.erase(std::remove(thread_name.begin(), thread_name.end(), '\n'), + thread_name.end()); + + if (thread_num_map.find(thread_name) != thread_num_map.end()) { + thread_num_map[thread_name]++; + } else { + distinct_thread_name_count++; + thread_num_map.emplace(thread_name, 1); + } + total_thread_count++; + } + } + + std::deque<std::pair<std::string, int>> ordered_list(thread_num_map.begin(), + thread_num_map.end()); + std::sort(ordered_list.begin(), ordered_list.end(), + [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); + + ordered_list.push_front( + std::make_pair("distinct_thread_name_count", distinct_thread_name_count)); + ordered_list.push_front(std::make_pair("total_thread_count", total_thread_count)); + ordered_list.push_front(std::make_pair("be_process_id", pid)); + + nlohmann::json js = nlohmann::json::array(); + for (const auto& p : ordered_list) { + js.push_back({p.first, p.second}); + } + + std::string output_json_str = js.dump(); + return output_json_str; +} \ No newline at end of file diff --git a/be/src/common/daemon.h b/be/src/runtime/be_proc_monitor.h similarity index 50% copy from be/src/common/daemon.h copy to be/src/runtime/be_proc_monitor.h index 0c282a8516a..dfd962be012 100644 --- a/be/src/common/daemon.h +++ b/be/src/runtime/be_proc_monitor.h @@ -15,38 +15,10 @@ // specific language governing permissions and limitations // under the License. -#pragma once - -#include <vector> - -#include "gutil/ref_counted.h" -#include "util/countdown_latch.h" -#include "util/thread.h" - -namespace doris { - -class Daemon { +// Currently BeProcMonitor used to read proc/<pid>/task/ and log be's thread num, we can find +// which logic cost too much thread when BE core because of thread exhaustion. +#include <string> +class BeProcMonitor { public: - Daemon() : _stop_background_threads_latch(1) {} - ~Daemon() = default; - - // Start background threads - void start(); - - // Stop background threads - void stop(); - -private: - void tcmalloc_gc_thread(); - void memory_maintenance_thread(); - void memory_gc_thread(); - void memtable_memory_refresh_thread(); - void calculate_metrics_thread(); - void je_purge_dirty_pages_thread() const; - void report_runtime_query_statistics_thread(); - void wg_mem_used_refresh_thread(); - - CountDownLatch _stop_background_threads_latch; - std::vector<scoped_refptr<Thread>> _threads; -}; -} // namespace doris + static std::string get_be_thread_info(); +}; \ No newline at end of file diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 651323f7880..cb4d36f050a 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -30,6 +30,7 @@ #include "common/status.h" #include "http/action/adjust_log_level.h" #include "http/action/adjust_tracing_dump.h" +#include "http/action/be_proc_thread_action.h" #include "http/action/calc_file_crc_action.h" #include "http/action/check_rpc_channel_action.h" #include "http/action/check_tablet_segment_action.h" @@ -175,6 +176,11 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}", query_pipeline_task_action); + // Dump all be process thread num + BeProcThreadAction* be_proc_thread_action = _pool.add(new BeProcThreadAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/be_process_thread_num", + be_proc_thread_action); + // Register BE LoadStream action LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction()); _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action); diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index d5e447e8601..3720da6f741 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -64,3 +64,6 @@ trino_connector_plugin_dir=/tmp/trino_connector/connectors enable_jvm_monitor = true +enable_be_proc_monitor = true +be_proc_monitor_interval_ms = 30000 + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org