This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 443b8f1 [Feature][ThreadPool]Add Web Page to display thread's stats (#4110) 443b8f1 is described below commit 443b8f100bf17b8fd41d3b3a6a7adb2a63016a0f Author: WangCong <1018957...@qq.com> AuthorDate: Thu Jul 23 21:08:36 2020 +0800 [Feature][ThreadPool]Add Web Page to display thread's stats (#4110) This CL mainly includes: - add some methods to get thread's stats from Linux's system file in env. - support get thread's stats by http method. - register page handle in BE to show thread's stats to help developer position some thread relate problem. --- be/src/env/env.h | 63 ++++++------- be/src/env/env_util.cpp | 61 ++++++++++++- be/src/env/env_util.h | 20 +++-- be/src/http/default_path_handlers.cpp | 6 +- be/src/http/web_page_handler.cpp | 5 +- be/src/util/CMakeLists.txt | 3 +- be/src/util/os_util.cpp | 165 ++++++++++++++++++++++++++++++++++ be/src/util/os_util.h | 68 ++++++++++++++ be/src/util/thread.cpp | 135 +++++++++++++++++++++++----- be/src/util/thread.h | 36 ++++---- be/src/util/url_coding.cpp | 75 +++++++--------- be/src/util/url_coding.h | 8 +- webroot/be/threadz.mustache | 68 ++++++++++++++ 13 files changed, 570 insertions(+), 143 deletions(-) diff --git a/be/src/env/env.h b/be/src/env/env.h index 5ad360b..8cbaaf7 100644 --- a/be/src/env/env.h +++ b/be/src/env/env.h @@ -9,8 +9,8 @@ #pragma once -#include <string> #include <memory> +#include <string> #include "common/status.h" #include "util/slice.h" @@ -35,15 +35,15 @@ public: // CREATE_OR_OPEN | opens | creates // MUST_CREATE | fails | creates // MUST_EXIST | opens | fails - enum OpenMode { - CREATE_OR_OPEN_WITH_TRUNCATE, - CREATE_OR_OPEN, - MUST_CREATE, - MUST_EXIST + enum OpenMode { + CREATE_OR_OPEN_WITH_TRUNCATE, + CREATE_OR_OPEN, + MUST_CREATE, + MUST_EXIST }; - Env() { } - virtual ~Env() { } + Env() {} + virtual ~Env() {} // Return a default environment suitable for the current operating // system. Sophisticated users may wish to provide their own Env @@ -85,8 +85,7 @@ public: // Like the previous new_writable_file, but allows options to be // specified. - virtual Status new_writable_file(const WritableFileOptions& opts, - const std::string& fname, + virtual Status new_writable_file(const WritableFileOptions& opts, const std::string& fname, std::unique_ptr<WritableFile>* result) = 0; // Creates a new readable and writable file. If a file with the same name @@ -98,8 +97,7 @@ public: std::unique_ptr<RandomRWFile>* result) = 0; // Like the previous new_random_rw_file, but allows options to be specified. - virtual Status new_random_rw_file(const RandomRWFileOptions& opts, - const std::string& fname, + virtual Status new_random_rw_file(const RandomRWFileOptions& opts, const std::string& fname, std::unique_ptr<RandomRWFile>* result) = 0; // Returns OK if the path exists. @@ -116,8 +114,7 @@ public: // NotFound if "dir" does not exist, the calling process does not have // permission to access "dir", or if "dir" is invalid. // IOError if an IO Error was encountered - virtual Status get_children(const std::string& dir, - std::vector<std::string>* result) = 0; + virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0; // Iterate the specified directory and call given callback function with child's // name. This function continues execution until all children have been iterated @@ -168,19 +165,16 @@ public: virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0; // Store the last modification time of fname in *file_mtime. - virtual Status get_file_modified_time(const std::string& fname, - uint64_t* file_mtime) = 0; + virtual Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) = 0; // Rename file src to target. - virtual Status rename_file(const std::string& src, - const std::string& target) = 0; + virtual Status rename_file(const std::string& src, const std::string& target) = 0; // create a hard-link - virtual Status link_file(const std::string& /*old_path*/, - const std::string& /*new_path*/) = 0; + virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0; }; struct RandomAccessFileOptions { - RandomAccessFileOptions() { } + RandomAccessFileOptions() {} }; // Creation-time options for WritableFile @@ -202,8 +196,8 @@ struct RandomRWFileOptions { // A file abstraction for reading sequentially through a file class SequentialFile { public: - SequentialFile() { } - virtual ~SequentialFile() { } + SequentialFile() {} + virtual ~SequentialFile() {} // Read up to "result.size" bytes from the file. // Sets "result.data" to the data that was read. @@ -229,8 +223,8 @@ public: class RandomAccessFile { public: - RandomAccessFile() { } - virtual ~RandomAccessFile() { } + RandomAccessFile() {} + virtual ~RandomAccessFile() {} // Read "result.size" bytes from the file starting at "offset". // Copies the resulting data into "result.data". @@ -271,13 +265,13 @@ public: // one of Append or PositionedAppend. We support only Append here. class WritableFile { public: - enum FlushMode { - FLUSH_SYNC, - FLUSH_ASYNC + enum FlushMode { + FLUSH_SYNC, + FLUSH_ASYNC }; - WritableFile() { } - virtual ~WritableFile() { } + WritableFile() {} + virtual ~WritableFile() {} // Append data to the end of the file virtual Status append(const Slice& data) = 0; @@ -325,12 +319,9 @@ private: // A file abstraction for random reading and writing. class RandomRWFile { public: - enum FlushMode { - FLUSH_SYNC, - FLUSH_ASYNC - }; + enum FlushMode { FLUSH_SYNC, FLUSH_ASYNC }; RandomRWFile() {} - virtual ~RandomRWFile() { } + virtual ~RandomRWFile() {} virtual Status read_at(uint64_t offset, const Slice& result) const = 0; @@ -350,4 +341,4 @@ public: virtual const std::string& filename() const = 0; }; -} +} // namespace doris diff --git a/be/src/env/env_util.cpp b/be/src/env/env_util.cpp index 07bf874..b383439 100644 --- a/be/src/env/env_util.cpp +++ b/be/src/env/env_util.cpp @@ -18,6 +18,7 @@ #include "env/env_util.h" #include "env/env.h" +#include "util/faststring.h" using std::shared_ptr; using std::string; @@ -30,21 +31,73 @@ Status open_file_for_write(Env* env, const string& path, shared_ptr<WritableFile return open_file_for_write(WritableFileOptions(), env, path, file); } -Status open_file_for_write(const WritableFileOptions& opts, - Env *env, const string &path, - shared_ptr<WritableFile> *file) { +Status open_file_for_write(const WritableFileOptions& opts, Env* env, const string& path, + shared_ptr<WritableFile>* file) { unique_ptr<WritableFile> w; RETURN_IF_ERROR(env->new_writable_file(opts, path, &w)); file->reset(w.release()); return Status::OK(); } -Status open_file_for_random(Env *env, const string &path, shared_ptr<RandomAccessFile> *file) { +Status open_file_for_random(Env* env, const string& path, shared_ptr<RandomAccessFile>* file) { unique_ptr<RandomAccessFile> r; RETURN_IF_ERROR(env->new_random_access_file(path, &r)); file->reset(r.release()); return Status::OK(); } +static Status do_write_string_to_file(Env* env, const Slice& data, const std::string& fname, + bool should_sync) { + unique_ptr<WritableFile> file; + Status s = env->new_writable_file(fname, &file); + if (!s.ok()) { + return s; + } + s = file->append(data); + if (s.ok() && should_sync) { + s = file->sync(); + } + if (s.ok()) { + s = file->close(); + } + file.reset(); // Will auto-close if we did not close above + if (!s.ok()) { + RETURN_NOT_OK_STATUS_WITH_WARN(env->delete_file(fname), + "Failed to delete partially-written file " + fname); + } + return s; +} + +Status write_string_to_file(Env* env, const Slice& data, const std::string& fname) { + return do_write_string_to_file(env, data, fname, false); +} + +Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname) { + return do_write_string_to_file(env, data, fname, true); +} + +Status read_file_to_string(Env* env, const std::string& fname, faststring* data) { + data->clear(); + unique_ptr<SequentialFile> file; + Status s = env->new_sequential_file(fname, &file); + if (!s.ok()) { + return s; + } + static const int kBufferSize = 8192; + unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]); + while (true) { + Slice fragment(scratch.get(), kBufferSize); + s = file->read(&fragment); + if (!s.ok()) { + break; + } + data->append(fragment.get_data(), fragment.get_size()); + if (fragment.empty()) { + break; + } + } + return s; +} + } // namespace env_util } // namespace doris diff --git a/be/src/env/env_util.h b/be/src/env/env_util.h index aa2af49..70ea526 100644 --- a/be/src/env/env_util.h +++ b/be/src/env/env_util.h @@ -20,6 +20,7 @@ #include <string> #include "common/status.h" +#include "env.h" namespace doris { @@ -30,14 +31,21 @@ struct WritableFileOptions; namespace env_util { -Status open_file_for_write(Env *env, const std::string& path, std::shared_ptr<WritableFile> *file); +Status open_file_for_write(Env* env, const std::string& path, std::shared_ptr<WritableFile>* file); -Status open_file_for_write(const WritableFileOptions& opts, Env *env, - const std::string& path, std::shared_ptr<WritableFile> *file); +Status open_file_for_write(const WritableFileOptions& opts, Env* env, const std::string& path, + std::shared_ptr<WritableFile>* file); -Status open_file_for_random(Env *env, const std::string& path, - std::shared_ptr<RandomAccessFile> *file); +Status open_file_for_random(Env* env, const std::string& path, + std::shared_ptr<RandomAccessFile>* file); + +// A utility routine: write "data" to the named file. +Status write_string_to_file(Env* env, const Slice& data, const std::string& fname); +// Like above but also fsyncs the new file. +Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname); + +// A utility routine: read contents of named file into *data +Status read_file_to_string(Env* env, const std::string& fname, faststring* data); } // namespace env_util } // namespace doris - diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 0ebbbc0..dd85400 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -18,20 +18,17 @@ #include "http/default_path_handlers.h" #include <gperftools/malloc_extension.h> -#include <sys/stat.h> #include <boost/algorithm/string.hpp> #include <boost/bind.hpp> -#include <fstream> #include <sstream> #include "common/configbase.h" -#include "common/logging.h" #include "http/web_page_handler.h" #include "runtime/mem_tracker.h" #include "util/debug_util.h" -#include "util/logging.h" #include "util/pretty_printer.h" +#include "util/thread.h" namespace doris { @@ -109,6 +106,7 @@ void add_default_path_handlers(WebPageHandler* web_page_handler, MemTracker* pro web_page_handler->register_page("/varz", "Configs", config_handler, true /* is_on_nav_bar */); web_page_handler->register_page("/memz", "Memory", boost::bind<void>(&mem_usage_handler, process_mem_tracker, _1, _2), true /* is_on_nav_bar */); + register_thread_display_page(web_page_handler); } } // namespace doris diff --git a/be/src/http/web_page_handler.cpp b/be/src/http/web_page_handler.cpp index 788b028..f9a4427 100644 --- a/be/src/http/web_page_handler.cpp +++ b/be/src/http/web_page_handler.cpp @@ -17,8 +17,7 @@ #include "http/web_page_handler.h" -#include <boost/bind.hpp> -#include <boost/mem_fn.hpp> +#include <functional> #include "common/config.h" #include "env/env.h" @@ -51,7 +50,7 @@ WebPageHandler::WebPageHandler(EvHttpServer* server) : _http_server(server) { _http_server->register_static_file_handler(this); TemplatePageHandlerCallback root_callback = - boost::bind<void>(boost::mem_fn(&WebPageHandler::root_handler), this, _1, _2); + std::bind<void>(std::mem_fn(&WebPageHandler::root_handler), this, std::placeholders::_1, std::placeholders::_2); register_template_page("/", "Home", root_callback, false /* is_on_nav_bar */); } diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index e9ee89b..5107d7a 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -70,7 +70,8 @@ set(UTIL_FILES null_load_error_hub.cpp time.cpp os_info.cpp -# coding_util.cpp + os_util.cpp + # coding_util.cpp cidr.cpp core_local.cpp uid_util.cpp diff --git a/be/src/util/os_util.cpp b/be/src/util/os_util.cpp new file mode 100644 index 0000000..15aed51 --- /dev/null +++ b/be/src/util/os_util.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/os_util.h" + +#include <fcntl.h> +#include <sys/resource.h> +#include <unistd.h> + +#include <cstddef> +#include <ostream> +#include <string> +#include <utility> +#include <vector> + +#include <glog/logging.h> + +#include "env/env_util.h" +#include "gutil/macros.h" +#include "gutil/strings/numbers.h" +#include "gutil/strings/split.h" +#include "gutil/strings/stringpiece.h" +#include "gutil/strings/substitute.h" +#include "gutil/strings/util.h" +#include "util/faststring.h" + +using std::string; +using std::vector; +using strings::Split; +using strings::Substitute; + +namespace doris { + +// Ensure that Impala compiles on earlier kernels. If the target kernel does not support +// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1. +#ifndef _SC_CLK_TCK +#define _SC_CLK_TCK 2 +#endif + +static const int64_t kTicksPerSec = sysconf(_SC_CLK_TCK); + +// Offsets into the ../stat file array of per-thread statistics. +// +// They are themselves offset by two because the pid and comm fields of the +// file are parsed separately. +static const int64_t kUserTicks = 13 - 2; +static const int64_t kKernelTicks = 14 - 2; +static const int64_t kIoWait = 41 - 2; + +// Largest offset we are interested in, to check we get a well formed stat file. +static const int64_t kMaxOffset = kIoWait; + +Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats) { + DCHECK(stats != nullptr); + + // The thread name should be the only field with parentheses. But the name + // itself may contain parentheses. + size_t open_paren = buffer.find('('); + size_t close_paren = buffer.rfind(')'); + if (open_paren == string::npos || // '(' must exist + close_paren == string::npos || // ')' must exist + open_paren >= close_paren || // '(' must come before ')' + close_paren + 2 == buffer.size()) { // there must be at least two chars after ')' + return Status::IOError("Unrecognised /proc format"); + } + string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1)); + string rest = buffer.substr(close_paren + 2); + vector<string> splits = Split(rest, " ", strings::SkipEmpty()); + if (splits.size() < kMaxOffset) { + return Status::IOError("Unrecognised /proc format"); + } + + int64_t tmp; + if (safe_strto64(splits[kUserTicks], &tmp)) { + stats->user_ns = tmp * (1e9 / kTicksPerSec); + } + if (safe_strto64(splits[kKernelTicks], &tmp)) { + stats->kernel_ns = tmp * (1e9 / kTicksPerSec); + } + if (safe_strto64(splits[kIoWait], &tmp)) { + stats->iowait_ns = tmp * (1e9 / kTicksPerSec); + } + if (name != nullptr) { + *name = extracted_name; + } + return Status::OK(); +} + +Status get_thread_stats(int64_t tid, ThreadStats* stats) { + DCHECK(stats != nullptr); + if (kTicksPerSec <= 0) { + return Status::NotSupported("ThreadStats not supported"); + } + faststring buf; + RETURN_IF_ERROR(env_util::read_file_to_string( + Env::Default(), Substitute("/proc/self/task/$0/stat", tid), &buf)); + + return parse_stat(buf.ToString(), nullptr, stats); +} +void disable_core_dumps() { + struct rlimit lim; + PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0); + lim.rlim_cur = 0; + PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0); + + // Set coredump_filter to not dump any parts of the address space. + // Although the above disables core dumps to files, if core_pattern + // is set to a pipe rather than a file, it's not sufficient. Setting + // this pattern results in piping a very minimal dump into the core + // processor (eg abrtd), thus speeding up the crash. + int f; + RETRY_ON_EINTR(f, open("/proc/self/coredump_filter", O_WRONLY)); + if (f >= 0) { + ssize_t ret; + RETRY_ON_EINTR(ret, write(f, "00000000", 8)); + int close_ret; + RETRY_ON_EINTR(close_ret, close(f)); + } +} + +bool is_being_debugged() { +#ifndef __linux__ + return false; +#else + // Look for the TracerPid line in /proc/self/status. + // If this is non-zero, we are being ptraced, which is indicative of gdb or strace + // being attached. + faststring buf; + Status s = env_util::read_file_to_string(Env::Default(), "/proc/self/status", &buf); + if (!s.ok()) { + LOG(WARNING) << "could not read /proc/self/status: " << s.to_string(); + return false; + } + StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size()); + vector<StringPiece> lines = Split(buf_sp, "\n"); + for (const auto& l : lines) { + if (!HasPrefixString(l, "TracerPid:")) continue; + std::pair<StringPiece, StringPiece> key_val = Split(l, "\t"); + int64_t tracer_pid = -1; + if (!safe_strto64(key_val.second.data(), key_val.second.size(), &tracer_pid)) { + LOG(WARNING) << "Invalid line in /proc/self/status: " << l; + return false; + } + return tracer_pid != 0; + } + LOG(WARNING) << "Could not find TracerPid line in /proc/self/status"; + return false; +#endif // __linux__ +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/util/os_util.h b/be/src/util/os_util.h new file mode 100644 index 0000000..7e0f514 --- /dev/null +++ b/be/src/util/os_util.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_UTIL_OS_UTIL_H +#define DORIS_BE_UTIL_OS_UTIL_H + +#include <cstdint> +#include <string> +#include <type_traits> + +#include "common/status.h" +#include "env/env.h" + +namespace doris { + +// Utility methods to read interesting values from /proc. +// TODO: Get stats for parent process. + +// Container struct for statistics read from the /proc filesystem for a thread. +struct ThreadStats { + int64_t user_ns; + int64_t kernel_ns; + int64_t iowait_ns; + + // Default constructor zeroes all members in case structure can't be filled by + // GetThreadStats. + ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) {} +}; + +// Populates ThreadStats object using a given buffer. The buffer is expected to +// conform to /proc/<pid>/task/<tid>/stat layout; an error will be returned otherwise. +// +// If 'name' is supplied, the extracted thread name will be written to it. +Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats); + +// Populates ThreadStats object for a given thread by reading from +// /proc/<pid>/task/<tid>/stat. Returns OK unless the file cannot be read or is in an +// unrecognised format, or if the kernel version is not modern enough. +Status get_thread_stats(int64_t tid, ThreadStats* stats); + +// Disable core dumps for this process. +// +// This is useful particularly in tests where we have injected failures and don't +// want to generate a core dump from an "expected" crash. +void disable_core_dumps(); + +// Return true if this process appears to be running under a debugger or strace. +// +// This may return false on unsupported (non-Linux) platforms. +bool is_being_debugged(); + +} // namespace doris + +#endif diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp index c4b5493..128896d 100644 --- a/be/src/util/thread.cpp +++ b/be/src/util/thread.cpp @@ -17,23 +17,29 @@ #include "thread.h" +#include <sys/prctl.h> +#include <sys/types.h> #include <unistd.h> + #include <cstring> #include <limits> #include <map> #include <memory> #include <string> -#include <sys/prctl.h> -#include <sys/types.h> +#include <functional> #include "common/logging.h" #include "gutil/atomicops.h" -#include "gutil/once.h" #include "gutil/dynamic_annotations.h" +#include "gutil/map-util.h" +#include "gutil/once.h" #include "gutil/strings/substitute.h" #include "olap/olap_define.h" +#include "util/easy_json.h" #include "util/mutex.h" +#include "util/os_util.h" #include "util/scoped_cleanup.h" +#include "util/url_coding.h" namespace doris { @@ -55,9 +61,7 @@ static GoogleOnceType once = GOOGLE_ONCE_INIT; // auditing. Used only by Thread. class ThreadMgr { public: - ThreadMgr() - : _threads_started_metric(0), - _threads_running_metric(0) {} + ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {} ~ThreadMgr() { MutexLock lock(&_lock); @@ -74,18 +78,17 @@ public: // already been removed, this is a no-op. void remove_thread(const pthread_t& pthread_id, const std::string& category); -private: + void display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const; +private: // Container class for any details we want to capture about a thread // TODO: Add start-time. // TODO: Track fragment ID. class ThreadDescriptor { public: - ThreadDescriptor() { } + ThreadDescriptor() {} ThreadDescriptor(std::string category, std::string name, int64_t thread_id) - : _name(std::move(name)), - _category(std::move(category)), - _thread_id(thread_id) {} + : _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {} const std::string& name() const { return _name; } const std::string& category() const { return _category; } @@ -97,6 +100,8 @@ private: int64_t _thread_id; }; + void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const; + // A ThreadCategory is a set of threads that are logically related. // TODO: unordered_map is incompatible with pthread_t, but would be more // efficient here. @@ -106,7 +111,7 @@ private: typedef std::map<std::string, ThreadCategory> ThreadCategoryMap; // Protects _thread_categories and thread metrics. - Mutex _lock; + mutable Mutex _lock; // All thread categorys that ever contained a thread, even if empty ThreadCategoryMap _thread_categories; @@ -121,7 +126,7 @@ private: void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) { if (tid == getpid()) { - return ; + return; } int err = prctl(PR_SET_NAME, name.c_str()); if (err < 0 && errno != EPERM) { @@ -169,6 +174,81 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca ANNOTATE_IGNORE_READS_AND_WRITES_END(); } +void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const { + const auto* category_name = FindOrNull(args, "group"); + if (category_name) { + bool requested_all = (*category_name == "all"); + ej->Set("requested_thread_group", EasyJson::kObject); + (*ej)["group_name"] = escape_for_html_to_string(*category_name); + (*ej)["requested_all"] = requested_all; + + // The critical section is as short as possible so as to minimize the delay + // imposed on new threads that acquire the lock in write mode. + vector<ThreadDescriptor> descriptors_to_print; + if (!requested_all) { + MutexLock l(&_lock); + const auto* category = FindOrNull(_thread_categories, *category_name); + if (!category) { + return; + } + for (const auto& elem : *category) { + descriptors_to_print.emplace_back(elem.second); + } + } else { + MutexLock l(&_lock); + for (const auto& category : _thread_categories) { + for (const auto& elem : category.second) { + descriptors_to_print.emplace_back(elem.second); + } + } + } + + EasyJson found = (*ej).Set("found", EasyJson::kObject); + EasyJson threads = found.Set("threads", EasyJson::kArray); + for (const auto& desc : descriptors_to_print) { + summarize_thread_descriptor(desc, &threads); + } + } else { + // List all thread groups and the number of threads running in each. + vector<pair<string, uint64_t>> thread_categories_info; + uint64_t running; + { + MutexLock l(&_lock); + running = _threads_running_metric; + thread_categories_info.reserve(_thread_categories.size()); + for (const auto& category : _thread_categories) { + thread_categories_info.emplace_back(category.first, category.second.size()); + } + + (*ej)["total_threads_running"] = running; + EasyJson groups = ej->Set("groups", EasyJson::kArray); + for (const auto& elem : thread_categories_info) { + string category_arg; + url_encode(elem.first, &category_arg); + EasyJson group = groups.PushBack(EasyJson::kObject); + group["encoded_group_name"] = category_arg; + group["group_name"] = elem.first; + group["threads_running"] = elem.second; + } + } + } +} + +void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc, + EasyJson* ej) const { + ThreadStats stats; + Status status = get_thread_stats(desc.thread_id(), &stats); + if (!status.ok()) { + LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string(); + } + + EasyJson thread = ej->PushBack(EasyJson::kObject); + thread["thread_name"] = desc.name(); + thread["user_sec"] = static_cast<double>(stats.user_ns) / 1e9; + thread["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9; + thread["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9; +} + Thread::~Thread() { if (_joinable) { int ret = pthread_detach(_thread); @@ -201,7 +281,8 @@ const std::string& Thread::category() const { } std::string Thread::to_string() const { - return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category); + return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, + _category); } Thread* Thread::current_thread() { @@ -210,7 +291,7 @@ Thread* Thread::current_thread() { int64_t Thread::unique_thread_id() { return static_cast<int64_t>(pthread_self()); -} +} int64_t Thread::current_thread_id() { return syscall(SYS_gettid); @@ -268,7 +349,7 @@ Status Thread::start_thread(const std::string& category, const std::string& name t->_joinable = true; cleanup.cancel(); - VLOG(3) << "Started thread " << t->tid()<< " - " << category << ":" << name; + VLOG(3) << "Started thread " << t->tid() << " - " << category << ":" << name; return Status::OK(); } @@ -331,10 +412,10 @@ void Thread::init_threadmgr() { } ThreadJoiner::ThreadJoiner(Thread* thr) - : _thread(CHECK_NOTNULL(thr)), - _warn_after_ms(kDefaultWarnAfterMs), - _warn_every_ms(kDefaultWarnEveryMs), - _give_up_after_ms(kDefaultGiveUpAfterMs) {} + : _thread(CHECK_NOTNULL(thr)), + _warn_after_ms(kDefaultWarnAfterMs), + _warn_every_ms(kDefaultWarnEveryMs), + _give_up_after_ms(kDefaultGiveUpAfterMs) {} ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { _warn_after_ms = ms; @@ -352,8 +433,7 @@ ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { } Status ThreadJoiner::join() { - if (Thread::current_thread() && - Thread::current_thread()->tid() == _thread->tid()) { + if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) { return Status::InvalidArgument("Can't join on own thread", -1, _thread->_name); } @@ -397,8 +477,15 @@ Status ThreadJoiner::join() { } waited_ms += wait_for; } - return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", - waited_ms, _thread->_name)); + return Status::Aborted( + strings::Substitute("Timed out after $0ms joining on $1", waited_ms, _thread->_name)); } +void register_thread_display_page(WebPageHandler* web_page_handler) { + web_page_handler->register_template_page( + "/threadz", "Threads", + std::bind(&ThreadMgr::display_thread_callback, thread_manager.get(), + std::placeholders::_1, std::placeholders::_2), + true); +} } // namespace doris diff --git a/be/src/util/thread.h b/be/src/util/thread.h index 1e1b1e9..2f587e6 100644 --- a/be/src/util/thread.h +++ b/be/src/util/thread.h @@ -18,27 +18,25 @@ #ifndef DORIS_BE_SRC_UTIL_THREAD_H #define DORIS_BE_SRC_UTIL_THREAD_H -#include <atomic> #include <pthread.h> #include <syscall.h> +#include <atomic> + #include "common/status.h" #include "gutil/ref_counted.h" +#include "http/web_page_handler.h" #include "util/countdown_latch.h" namespace doris { class Thread : public RefCountedThreadSafe<Thread> { public: - enum CreateFlags { - NO_FLAGS = 0, - NO_STACK_WATCHDOG = 1 - }; + enum CreateFlags { NO_FLAGS = 0, NO_STACK_WATCHDOG = 1 }; template <class F> static Status create_with_flags(const std::string& category, const std::string& name, - const F& f, uint64_t flags, - scoped_refptr<Thread>* holder) { + const F& f, uint64_t flags, scoped_refptr<Thread>* holder) { return start_thread(category, name, f, flags, holder); } @@ -145,17 +143,15 @@ private: }; // User function to be executed by this thread. - typedef std::function<void ()> ThreadFunctor; + typedef std::function<void()> ThreadFunctor; Thread(const std::string& category, const std::string& name, ThreadFunctor functor) - : _thread(0), - _tid(INVALID_TID), - _functor(std::move(functor)), - _category(std::move(category)), - _name(std::move(name)), - _done(1), - _joinable(false) - {} - + : _thread(0), + _tid(INVALID_TID), + _functor(std::move(functor)), + _category(std::move(category)), + _name(std::move(name)), + _done(1), + _joinable(false) {} // Library-specific thread ID. pthread_t _thread; @@ -172,7 +168,7 @@ private: int64_t _tid; const ThreadFunctor _functor; - + const std::string _category; const std::string _name; @@ -188,7 +184,7 @@ private: // Thread local pointer to the current thread of execution. Will be NULL if the current // thread is not a Thread. static __thread Thread* _tls; - + // Wait for the running thread to publish its tid. int64_t wait_for_tid() const; @@ -280,6 +276,8 @@ private: DISALLOW_COPY_AND_ASSIGN(ThreadJoiner); }; +// Registers /threadz with the debug webserver. +void register_thread_display_page(WebPageHandler* web_page_handler); } //namespace doris diff --git a/be/src/util/url_coding.cpp b/be/src/util/url_coding.cpp index 08a671f..7e2624f 100644 --- a/be/src/util/url_coding.cpp +++ b/be/src/util/url_coding.cpp @@ -78,7 +78,7 @@ bool url_decode(const std::string& in, std::string* out) { } else { return false; } - } else if (in[i] == '+') { + } else if (in[i] == '+') { (*out) += ' '; } else { (*out) += in[i]; @@ -122,7 +122,7 @@ static void encode_base64_internal(const std::string& in, std::string* out, out->assign((char*)buf.get(), d - buf.get()); } -void base64url_encode(const std::string& in, std::string *out) { +void base64url_encode(const std::string& in, std::string* out) { static unsigned char basis64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; encode_base64_internal(in, out, basis64, false); @@ -130,51 +130,39 @@ void base64url_encode(const std::string& in, std::string *out) { void base64_encode(const std::string& in, std::string* out) { static unsigned char basis64[] = - "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; encode_base64_internal(in, out, basis64, true); } -static char encoding_table[] = { - 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', - 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', - 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', - 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', - 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', - 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', - 'w', 'x', 'y', 'z', '0', '1', '2', '3', - '4', '5', '6', '7', '8', '9', '+', '/' -}; +static char encoding_table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', + 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', + 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'}; static const char base64_pad = '='; static short decoding_table[256] = { - -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -1, -2, -2, -1, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, 62, -2, -2, -2, 63, - 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -2, -2, -2, -2, -2, -2, - -2, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, - 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -2, -2, -2, -2, -2, - -2, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, - 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, - -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2 -}; + -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -1, -2, -2, -1, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, 62, + -2, -2, -2, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -2, -2, -2, -2, -2, -2, -2, 0, + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, + 23, 24, 25, -2, -2, -2, -2, -2, -2, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, + 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -2, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, + -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2}; static int mod_table[] = {0, 2, 1}; -size_t base64_encode(const unsigned char *data, - size_t length, - unsigned char *encoded_data) { - size_t output_length = (size_t) (4.0 * ceil((double) length / 3.0)); +size_t base64_encode(const unsigned char* data, size_t length, unsigned char* encoded_data) { + size_t output_length = (size_t)(4.0 * ceil((double)length / 3.0)); if (encoded_data == NULL) { - return 0; + return 0; } for (uint32_t i = 0, j = 0; i < length;) { @@ -196,11 +184,8 @@ size_t base64_encode(const unsigned char *data, return output_length; } -static inline int64_t base64_decode( - const char *data, - size_t length, - char *decoded_data) { - const char *current = data; +static inline int64_t base64_decode(const char* data, size_t length, char* decoded_data) { + const char* current = data; int ch = 0; int i = 0; int j = 0; @@ -232,7 +217,7 @@ static inline int64_t base64_decode( decoded_data[j] = (ch & 0x0f) << 4; break; case 2: - decoded_data[j++] |= ch >>2; + decoded_data[j++] |= ch >> 2; decoded_data[j] = (ch & 0x03) << 6; break; case 3: @@ -279,7 +264,7 @@ bool base64_decode(const std::string& in, std::string* out) { } void escape_for_html(const std::string& in, std::stringstream* out) { - for (auto& c: in) { + for (auto& c : in) { switch (c) { case '<': (*out) << "<"; @@ -298,5 +283,9 @@ void escape_for_html(const std::string& in, std::stringstream* out) { } } } - +std::string escape_for_html_to_string(const std::string& in) { + std::stringstream str; + escape_for_html(in, &str); + return str.str(); +} } diff --git a/be/src/util/url_coding.h b/be/src/util/url_coding.h index 7a9457e..37ca4a7 100644 --- a/be/src/util/url_coding.h +++ b/be/src/util/url_coding.h @@ -18,9 +18,9 @@ #ifndef DORIS_BE_SRC_COMMON_UTIL_URL_CODING_H #define DORIS_BE_SRC_COMMON_UTIL_URL_CODING_H +#include <boost/cstdint.hpp> #include <string> #include <vector> -#include <boost/cstdint.hpp> namespace doris { @@ -39,8 +39,8 @@ void url_encode(const std::vector<uint8_t>& in, std::string* out); // certain characters like ' '. bool url_decode(const std::string& in, std::string* out); -void base64url_encode(const std::string& in, std::string *out); -void base64_encode(const std::string& in, std::string *out); +void base64url_encode(const std::string& in, std::string* out); +void base64_encode(const std::string& in, std::string* out); // Utility method to decode base64 encoded strings. Also not extremely // performant. @@ -54,6 +54,8 @@ bool base64_decode(const std::string& in, std::string* out); // judiciously. void escape_for_html(const std::string& in, std::stringstream* out); +// Same as above, but returns a string. +std::string escape_for_html_to_string(const std::string& in); } #endif diff --git a/webroot/be/threadz.mustache b/webroot/be/threadz.mustache new file mode 100644 index 0000000..77f575f --- /dev/null +++ b/webroot/be/threadz.mustache @@ -0,0 +1,68 @@ +{{! +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +}} + +{{#requested_thread_group}} +<h2>Thread Group: {{group_name}}</h2> +{{#requested_all}}<h3>All Threads : </h3>{{/requested_all}} +{{#found}} +<table class='table table-hover' data-sort-name='name' data-toggle='table'> + <thead> + <tr> + <th data-field='name' data-sortable='true' data-sorter='stringsSorter'>Thread name</th> + <th data-sortable='true' data-sorter='floatsSorter'>Cumulative User CPU (s)</th> + <th data-sortable='true' data-sorter='floatsSorter'>Cumulative Kernel CPU (s)</th> + <th data-sortable='true' data-sorter='floatsSorter'>Cumulative IO-wait (s)</th> + </tr> + </thead> + <tbody> + {{#threads}} + <tr> + <td>{{thread_name}}</td> + <td>{{user_sec}}</td> + <td>{{kernel_sec}}</td> + <td>{{iowait_sec}}</td> + </tr> + {{/threads}} + </tbody> +</table> +{{/found}} +{{^found}}Thread group {{group_name}} not found{{/found}} +{{/requested_thread_group}} + +{{^requested_thread_group}} +<h2>Thread Groups</h2> +<h4>{{total_threads_running}} thread(s) running</h4> +<a href='{{base_url}}/threadz?group=all'><h3>All Threads</h3></a> +<table class='table table-hover' data-sort-name='group' data-toggle='table'> + <thead> + <tr> + <th data-field='group' data-sortable='true' data-sorter='stringsSorter'>Group</th> + <th data-sortable='true' data-sorter='numericStringsSorter'>Threads running</th> + </tr> + </thead> + <tbody> + {{#groups}} + <tr> + <td><a href='{{base_url}}/threadz?group={{encoded_group_name}}'>{{group_name}}</a></td> + <td>{{threads_running}}</td> + </tr> + {{/groups}} + </tbody> +</table> +{{/requested_thread_group}} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org