This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bd33a794d5f branch-3.1: [Feature](recycler) Add recycler metrics for
recycler layer #51409 (#52176)
bd33a794d5f is described below
commit bd33a794d5f90fcfe7cd7af2cc92c1c6c1acca28
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 24 13:53:00 2025 +0800
branch-3.1: [Feature](recycler) Add recycler metrics for recycler layer
#51409 (#52176)
Cherry-picked from #51409
Co-authored-by: Uniqueyou <[email protected]>
---
cloud/src/common/bvars.cpp | 13 +++++
cloud/src/common/bvars.h | 107 ++++++++++++++++++++++++++++++++++++++++
cloud/src/main.cpp | 2 +
cloud/src/recycler/recycler.cpp | 25 ++++++++--
4 files changed, 144 insertions(+), 3 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 6eb4c31a670..ad37a21cc15 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -17,6 +17,10 @@
#include "common/bvars.h"
+#include <bvar/multi_dimension.h>
+#include <bvar/reducer.h>
+#include <bvar/status.h>
+
#include <cstdint>
#include <stdexcept>
@@ -98,6 +102,15 @@ BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_partition_earlest_ts("recycle
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_rowset_earlest_ts("recycler",
"recycle_rowset_earlest_ts");
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler",
"recycle_tmp_rowset_earlest_ts");
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler",
"recycle_expired_txn_label_earlest_ts");
+bvar::Status<int64_t>
g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0);
+bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
+
+// recycler's mbvars
+mBvarIntAdder
g_bvar_recycler_instance_running("recycler_instance_running",{"instance_id"});
+mBvarLongStatus
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration_ms",{"instance_id"});
+mBvarLongStatus
g_bvar_recycler_instance_next_time("recycler_instance_next_time_s",{"instance_id"});
+mBvarPairStatus<int64_t>
g_bvar_recycler_instance_recycle_times("recycler_instance_recycle_times",{"instance_id"});
+mBvarLongStatus
g_bvar_recycler_instance_recycle_last_success_times("recycler_instance_recycle_last_success_times",{"instance_id"});
// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index dce05dc3c57..d7ff99da329 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -20,6 +20,8 @@
#include <bthread/mutex.h>
#include <bvar/bvar.h>
#include <bvar/latency_recorder.h>
+#include <bvar/multi_dimension.h>
+#include <bvar/reducer.h>
#include <cstdint>
#include <map>
@@ -27,6 +29,7 @@
#include <mutex>
#include <string>
#include <type_traits>
+#include <utility>
/**
* Manage bvars that with similar names (identical prefix)
@@ -97,6 +100,102 @@ template <typename T>
requires std::is_integral_v<T>
using BvarStatusWithTag = BvarWithTag<bvar::Status<T>, true>;
+/**
+@brief: A wrapper class for multidimensional bvar metrics.
+This template class provides a convenient interface for managing
multidimensional
+bvar metrics. It supports various bvar types including Adder, IntRecorder,
+LatencyRecorder, Maxer, and Status.
+@param: BvarType The type of bvar metric to use (must be one of the supported
types)
+@output: Based on the bvar multidimensional counter implementation,
+the metrics output format would typically follow this structure:
+{metric_name}{dimension1="value1",dimension2="value2",...} value
+@example: Basic usage with an Adder:
+// Create a 2-dimensional counter with dimensions "region" and "service"
+mBvarWrapper<bvar::Adder<int>> request_counter("xxx_request_count", {"region",
"service"});
+// Increment the counter for specific dimension values
+request_counter.put({"east", "login"}, 1);
+request_counter.put({"west", "search"}, 1);
+request_counter.put({"east", "login"}, 1); // Now east/login has value 2
+// the output of above metrics:
+xxx_request_count{region="east",service="login"} 2
+xxx_request_count{region="west",service="search"} 1
+@note: The dimensions provided in the constructor and the values provided to
+put() and get() methods must match in count. Also, all supported bvar types
+have different behaviors for how values are processed and retrieved.
+*/
+template <typename BvarType>
+class mBvarWrapper {
+public:
+ mBvarWrapper(const std::string& metric_name,
+ const std::initializer_list<std::string>& dim_names)
+ : counter_(metric_name, std::list<std::string>(dim_names)) {
+ static_assert(is_valid_bvar_type<BvarType>::value,
+ "BvarType must be one of the supported bvar types
(Adder, IntRecorder, "
+ "LatencyRecorder, Maxer, Status)");
+ }
+
+ template <typename ValType>
+ void put(const std::initializer_list<std::string>& dim_values, ValType
value) {
+ BvarType* stats =
counter_.get_stats(std::list<std::string>(dim_values));
+ if (stats) {
+ if constexpr (std::is_same_v<BvarType, bvar::Status<double>> ||
+ std::is_same_v<BvarType, bvar::Status<long>> ||
+ is_pair_status<BvarType>::value) {
+ stats->set_value(value);
+ } else {
+ *stats << value;
+ }
+ }
+ }
+
+ auto get(const std::initializer_list<std::string>& dim_values) {
+ BvarType* stats =
counter_.get_stats(std::list<std::string>(dim_values));
+ if (stats) {
+ return stats->get_value();
+ }
+ return std::declval<BvarType>(0);
+ }
+
+private:
+ template <typename T>
+ struct is_valid_bvar_type : std::false_type {};
+ template <typename T>
+ struct is_pair_status : std::false_type {};
+ template <typename T>
+ struct is_valid_bvar_type<bvar::Adder<T>> : std::true_type {};
+ template <>
+ struct is_valid_bvar_type<bvar::IntRecorder> : std::true_type {};
+ template <typename T>
+ struct is_valid_bvar_type<bvar::Maxer<T>> : std::true_type {};
+ template <typename T>
+ struct is_valid_bvar_type<bvar::Status<T>> : std::true_type {};
+ template <typename T>
+ struct is_pair_status<bvar::Status<std::pair<T, T>>> : std::true_type {};
+ template <>
+ struct is_valid_bvar_type<bvar::LatencyRecorder> : std::true_type {};
+
+ bvar::MultiDimension<BvarType> counter_;
+};
+
+using mBvarIntAdder = mBvarWrapper<bvar::Adder<int>>;
+using mBvarDoubleAdder = mBvarWrapper<bvar::Adder<double>>;
+using mBvarIntRecorder = mBvarWrapper<bvar::IntRecorder>;
+using mBvarLatencyRecorder = mBvarWrapper<bvar::LatencyRecorder>;
+using mBvarIntMaxer = mBvarWrapper<bvar::Maxer<int>>;
+using mBvarDoubleMaxer = mBvarWrapper<bvar::Maxer<double>>;
+using mBvarLongStatus = mBvarWrapper<bvar::Status<long>>;
+using mBvarDoubleStatus = mBvarWrapper<bvar::Status<double>>;
+
+namespace std {
+template <typename T1, typename T2>
+inline std::ostream& operator<<(std::ostream& os, const std::pair<T1, T2>& p) {
+ return os << "{" << p.first << "," << p.second << "}";
+}
+} // namespace std
+
+template <typename T>
+using mBvarPairStatus = mBvarWrapper<bvar::Status<std::pair<T, T>>>;
+
// meta-service's bvars
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_precommit_txn;
@@ -171,6 +270,14 @@ extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts;
+extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
+extern bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
+extern mBvarIntAdder g_bvar_recycler_instance_running;
+extern mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration;
+extern mBvarLongStatus g_bvar_recycler_instance_next_time;
+extern mBvarPairStatus<int64_t> g_bvar_recycler_instance_recycle_times;
+extern mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times;
+
// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
diff --git a/cloud/src/main.cpp b/cloud/src/main.cpp
index 9115158743f..18cf98720e9 100644
--- a/cloud/src/main.cpp
+++ b/cloud/src/main.cpp
@@ -236,6 +236,8 @@ int main(int argc, char** argv) {
std::cout << "try to start meta_service, recycler" << std::endl;
}
+
google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number",
"2000");
+
brpc::Server server;
brpc::FLAGS_max_body_size = config::brpc_max_body_size;
brpc::FLAGS_socket_max_unwritten_bytes =
config::brpc_socket_max_unwritten_bytes;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index a263d747b60..e63f1016db3 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -17,8 +17,10 @@
#include "recycler/recycler.h"
+#include <brpc/builtin_service.pb.h>
#include <brpc/server.h>
#include <butil/endpoint.h>
+#include <bvar/status.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
@@ -27,9 +29,11 @@
#include <cstddef>
#include <cstdint>
#include <deque>
+#include <initializer_list>
#include <numeric>
#include <string>
#include <string_view>
+#include <utility>
#include "common/stopwatch.h"
#include "meta-service/meta_service.h"
@@ -275,7 +279,12 @@ void Recycler::recycle_callback() {
if (stopped()) return;
LOG_INFO("begin to recycle instance").tag("instance_id", instance_id);
auto ctime_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ g_bvar_recycler_task_concurrency << 1;
+ g_bvar_recycler_instance_running.put({instance_id}, 1);
+ g_bvar_recycler_instance_recycle_times.put({instance_id},
std::make_pair(ctime_ms, -1));
ret = instance_recycler->do_recycle();
+ g_bvar_recycler_task_concurrency << -1;
+ g_bvar_recycler_instance_running.put({instance_id}, -1);
// If instance recycler has been aborted, don't finish this job
if (!instance_recycler->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), recycle_job_key,
instance_id, ip_port_,
@@ -285,9 +294,18 @@ void Recycler::recycle_callback() {
std::lock_guard lock(mtx_);
recycling_instance_map_.erase(instance_id);
}
- auto elpased_ms =
-
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() -
- ctime_ms;
+ auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ auto elpased_ms = now - ctime_ms;
+ g_bvar_recycler_instance_recycle_times.put({instance_id},
std::make_pair(ctime_ms, now));
+ g_bvar_recycler_instance_last_recycle_duration.put({instance_id},
elpased_ms);
+ g_bvar_recycler_instance_next_time.put({instance_id},
+ now +
config::recycle_interval_seconds * 1000);
+ LOG(INFO) << "recycle instance done, "
+ << "instance_id=" << instance_id << " ret=" << ret << "
ctime_ms: " << ctime_ms
+ << " now: " << now;
+
+ g_bvar_recycler_instance_recycle_last_success_times.put({instance_id},
now);
+
LOG_INFO("finish recycle instance")
.tag("instance_id", instance_id)
.tag("cost_ms", elpased_ms);
@@ -344,6 +362,7 @@ void Recycler::check_recycle_tasks() {
int Recycler::start(brpc::Server* server) {
instance_filter_.reset(config::recycle_whitelist,
config::recycle_blacklist);
+
g_bvar_recycler_task_max_concurrency.set_value(config::recycle_concurrency);
if (config::enable_checker) {
checker_ = std::make_unique<Checker>(txn_kv_);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]