This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8721197c774 branch-4.1: [feat](cloud) Add system rate limit for
meta-service #61516 (#63932)
8721197c774 is described below
commit 8721197c774ed4d06f2ea4315016c6c5d5247436
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 13:18:23 2026 +0800
branch-4.1: [feat](cloud) Add system rate limit for meta-service #61516
(#63932)
Cherry-picked from #61516
Co-authored-by: Yixuan Wang <[email protected]>
---
be/src/cloud/cloud_meta_mgr.cpp | 12 +
cloud/src/common/bvars.cpp | 1 +
cloud/src/common/bvars.h | 1 +
cloud/src/common/config.h | 14 +
cloud/src/common/http_helper.cpp | 61 ++
cloud/src/common/http_helper.h | 6 +
cloud/src/common/metric.cpp | 16 +
cloud/src/meta-service/CMakeLists.txt | 1 +
cloud/src/meta-service/meta_service_helper.h | 16 +
.../meta_service_rate_limit_helper.cpp | 843 +++++++++++++++++++++
.../meta-service/meta_service_rate_limit_helper.h | 80 ++
cloud/test/CMakeLists.txt | 2 +-
cloud/test/meta_service_helper_test.cpp | 151 ++++
13 files changed, 1203 insertions(+), 1 deletion(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index f04ff8c34e7..b3b2df60321 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -159,6 +159,18 @@ bvar::LatencyRecorder
g_cloud_commit_txn_resp_redirect_latency("cloud_table_stat
bvar::Adder<uint64_t>
g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count");
bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window(
"cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count,
30);
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_total_count(
+ "cloud_meta_mgr_ms_too_busy_reason", "total");
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_fdb_cluster_count(
+ "cloud_meta_mgr_ms_too_busy_reason", "fdb_cluster");
+bvar::Adder<uint64_t>
g_cloud_meta_mgr_ms_too_busy_reason_fdb_client_thread_count(
+ "cloud_meta_mgr_ms_too_busy_reason", "fdb_client_thread");
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_ms_resource_count(
+ "cloud_meta_mgr_ms_too_busy_reason", "ms_resource");
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_test_injection_count(
+ "cloud_meta_mgr_ms_too_busy_reason", "test_injection");
+bvar::Adder<uint64_t>
g_cloud_meta_mgr_ms_too_busy_reason_no_stress_condition_matched_count(
+ "cloud_meta_mgr_ms_too_busy_reason", "no_stress_condition_matched");
bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
"cloud_be_mow_get_dbm_lock_backoff_sleep_time");
bvar::Adder<uint64_t>
g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index e2f587e151a..f22495e0ecc 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -284,6 +284,7 @@ bvar::Status<int64_t>
g_bvar_fdb_incompatible_connections("fdb_incompatible_conn
bvar::Status<int64_t>
g_bvar_fdb_latency_probe_transaction_start_ns("fdb_latency_probe_transaction_start_ns",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t>
g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t>
g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_performance_limited_by_name("fdb_performance_limited_by_name",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t> g_bvar_fdb_machines_count("fdb_machines_count",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t> g_bvar_fdb_process_count("fdb_process_count",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t>
g_bvar_fdb_qos_worst_data_lag_storage_server_ns("fdb_qos_worst_data_lag_storage_server_ns",
BVAR_FDB_INVALID_VALUE);
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 5497cf6a754..d1a7bb6c8c7 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -750,6 +750,7 @@ extern bvar::Status<int64_t>
g_bvar_fdb_incompatible_connections;
extern bvar::Status<int64_t> g_bvar_fdb_latency_probe_transaction_start_ns;
extern bvar::Status<int64_t> g_bvar_fdb_latency_probe_commit_ns;
extern bvar::Status<int64_t> g_bvar_fdb_latency_probe_read_ns;
+extern bvar::Status<int64_t> g_bvar_fdb_performance_limited_by_name;
extern bvar::Status<int64_t> g_bvar_fdb_machines_count;
extern bvar::Status<int64_t> g_bvar_fdb_process_count;
extern bvar::Status<int64_t> g_bvar_fdb_qos_worst_data_lag_storage_server_ns;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index b03219b062b..85aaaf6f0d2 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -193,6 +193,20 @@ CONF_Int64(default_max_qps_limit, "1000000");
CONF_String(specific_max_qps_limit, "get_cluster:5000000;begin_txn:5000000");
CONF_Bool(enable_rate_limit, "true");
CONF_Int64(bvar_qps_update_second, "5");
+CONF_mBool(enable_ms_rate_limit, "true");
+// Fault injection: randomly return meta service rate limit error for testing.
+// ms_rate_limit_injection_probability is the probability (0-100) of injecting
a rate limit error.
+CONF_mBool(enable_ms_rate_limit_injection, "false");
+CONF_mInt32(ms_rate_limit_injection_probability, "5");
+CONF_Validator(ms_rate_limit_injection_probability,
+ [](int32_t config) -> bool { return config >= 0 && config <=
100; });
+CONF_mInt64(ms_rate_limit_window_seconds, "60");
+CONF_mInt64(ms_rate_limit_fdb_commit_latency_ms, "50");
+CONF_mInt64(ms_rate_limit_fdb_read_latency_ms, "5");
+CONF_mInt64(ms_rate_limit_fdb_client_thread_busyness_avg_percent, "70");
+CONF_mInt64(ms_rate_limit_fdb_client_thread_busyness_instant_percent, "90");
+CONF_mInt64(ms_rate_limit_cpu_usage_percent, "95");
+CONF_mInt64(ms_rate_limit_memory_usage_percent, "95");
CONF_mInt32(copy_job_max_retention_second, "259200"); //3 * 24 * 3600 seconds
CONF_String(arn_id, "");
diff --git a/cloud/src/common/http_helper.cpp b/cloud/src/common/http_helper.cpp
index 5ddd711dda6..c7f1bd488ad 100644
--- a/cloud/src/common/http_helper.cpp
+++ b/cloud/src/common/http_helper.cpp
@@ -36,12 +36,14 @@
#include <cstdint>
#include <string>
#include <type_traits>
+#include <vector>
#include "common/metric.h"
#include "cpp/s3_rate_limiter.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_http.h"
+#include "meta-service/meta_service_rate_limit_helper.h"
#include "recycler/recycler.h"
#include "recycler/recycler_service.h"
namespace doris::cloud {
@@ -287,6 +289,18 @@ const std::unordered_map<std::string_view,
HttpHandlerInfo>& get_http_handlers()
return process_get_cluster_status((MS*)s, c);
},
.role = HttpRole::META_SERVICE}},
+ {"set_rpc_rate_limit_whitelist",
+ {.handler =
+ [](void* s, brpc::Controller* c) {
+ return
process_set_rpc_rate_limit_whitelist((MS*)s, c);
+ },
+ .role = HttpRole::META_SERVICE}},
+ {"get_rpc_rate_limit_whitelist",
+ {.handler =
+ [](void* s, brpc::Controller* c) {
+ return
process_get_rpc_rate_limit_whitelist((MS*)s, c);
+ },
+ .role = HttpRole::META_SERVICE}},
{"list_snapshot",
{.handler = [](void* s,
@@ -995,6 +1009,53 @@ HttpResponse process_get_cluster_status(MetaServiceImpl*
service, brpc::Controll
return http_json_reply_message(resp.status(), resp);
}
+HttpResponse process_set_rpc_rate_limit_whitelist(MetaServiceImpl*,
brpc::Controller* ctrl) {
+ rapidjson::Document doc;
+ std::string body = ctrl->request_attachment().to_string();
+ doc.Parse(body.c_str());
+
+ if (doc.HasParseError()) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("parse json failed: {}",
+
rapidjson::GetParseError_En(doc.GetParseError())));
+ }
+
+ if (!doc.IsObject() || !doc.HasMember("rpcs") || !doc["rpcs"].IsArray()) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ "invalid request, need {\"rpcs\": [\"rpc1\",
\"rpc2\"]}");
+ }
+
+ std::vector<std::string> rpcs;
+ for (auto& rpc : doc["rpcs"].GetArray()) {
+ if (rpc.IsString()) {
+ rpcs.emplace_back(rpc.GetString());
+ }
+ }
+
+ RpcRateLimitWhitelist::instance().set_whitelist(rpcs);
+ return http_json_reply(MetaServiceCode::OK, "success");
+}
+
+HttpResponse process_get_rpc_rate_limit_whitelist(MetaServiceImpl*,
brpc::Controller*) {
+ auto rpcs = RpcRateLimitWhitelist::instance().get_whitelist();
+
+ rapidjson::Document doc;
+ doc.SetObject();
+ auto& allocator = doc.GetAllocator();
+
+ rapidjson::Value rpc_array(rapidjson::kArrayType);
+ for (const auto& rpc : rpcs) {
+ rpc_array.PushBack(rapidjson::Value(rpc.c_str(), allocator),
allocator);
+ }
+ doc.AddMember("rpcs", rpc_array, allocator);
+
+ rapidjson::StringBuffer buffer;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+ doc.Accept(writer);
+
+ return http_json_reply(MetaServiceCode::OK, "success", buffer.GetString());
+}
+
HttpResponse process_txn_lazy_commit(MetaServiceImpl* service,
brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string instance_id(http_query(uri, "instance_id"));
diff --git a/cloud/src/common/http_helper.h b/cloud/src/common/http_helper.h
index 378f1fcc5c9..7224510bc1d 100644
--- a/cloud/src/common/http_helper.h
+++ b/cloud/src/common/http_helper.h
@@ -155,6 +155,12 @@ const std::unordered_map<std::string_view,
HttpHandlerInfo>& get_http_handlers()
[[maybe_unused]] HttpResponse process_get_cluster_status(MetaServiceImpl*
service,
brpc::Controller*
ctrl);
+[[maybe_unused]] HttpResponse
process_set_rpc_rate_limit_whitelist(MetaServiceImpl* service,
+
brpc::Controller* ctrl);
+
+[[maybe_unused]] HttpResponse
process_get_rpc_rate_limit_whitelist(MetaServiceImpl* service,
+
brpc::Controller* ctrl);
+
[[maybe_unused]] HttpResponse process_txn_lazy_commit(MetaServiceImpl* service,
brpc::Controller* ctrl);
diff --git a/cloud/src/common/metric.cpp b/cloud/src/common/metric.cpp
index d41e7ea6e0f..a4c91cae271 100644
--- a/cloud/src/common/metric.cpp
+++ b/cloud/src/common/metric.cpp
@@ -130,6 +130,16 @@ static void export_fdb_status_details(const std::string&
status_str) {
if (node->value.IsArray()) return node->value.Size();
return BVAR_FDB_INVALID_VALUE;
};
+ auto get_string_value = [&](const std::vector<const char*>& v) ->
std::string {
+ if (v.empty()) return "invalid";
+ auto node = document.FindMember("cluster");
+ for (const auto& name : v) {
+ if (!node->value.HasMember(name)) return "invalid";
+ node = node->value.FindMember(name);
+ }
+ if (node->value.IsString()) return node->value.GetString();
+ return "invalid";
+ };
auto get_nanoseconds = [&](const std::vector<const char*>& v) -> int64_t {
constexpr double NANOSECONDS = 1e9;
auto node = document.FindMember("cluster");
@@ -195,6 +205,12 @@ static void export_fdb_status_details(const std::string&
status_str) {
// Backup and DR
+ // Performance Limited By
+ // invalid or not-workload, the final value is -1
+ int64_t performance_val =
+ get_string_value({"qos", "performance_limited_by", "name"}) ==
"workload" ? 0 : -1;
+ g_bvar_fdb_performance_limited_by_name.set_value(performance_val);
+
// Client Count
g_bvar_fdb_client_count.set_value(get_value({"clients", "count"}));
diff --git a/cloud/src/meta-service/CMakeLists.txt
b/cloud/src/meta-service/CMakeLists.txt
index 3623d99713e..4c5aa3c26b2 100644
--- a/cloud/src/meta-service/CMakeLists.txt
+++ b/cloud/src/meta-service/CMakeLists.txt
@@ -26,6 +26,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/meta-service")
add_library(MetaService
meta_server.cpp
meta_service.cpp
+ meta_service_rate_limit_helper.cpp
meta_service_http.cpp
injection_point_http.cpp
meta_service_job.cpp
diff --git a/cloud/src/meta-service/meta_service_helper.h
b/cloud/src/meta-service/meta_service_helper.h
index 2f9ebd4fcc2..3ee903e3dfd 100644
--- a/cloud/src/meta-service/meta_service_helper.h
+++ b/cloud/src/meta-service/meta_service_helper.h
@@ -34,6 +34,7 @@
#include "common/stopwatch.h"
#include "common/util.h"
#include "cpp/sync_point.h"
+#include "meta-service/meta_service_rate_limit_helper.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
@@ -311,6 +312,21 @@ inline MetaServiceCode cast_as(TxnErrorCode code) {
[[maybe_unused]] std::string instance_id;
\
[[maybe_unused]] bool drop_request = false;
\
[[maybe_unused]] KVStats stats;
\
+ [[maybe_unused]] MsStressDecision ms_stress_decision;
\
+ if (config::enable_ms_rate_limit ||
config::enable_ms_rate_limit_injection) { \
+ ms_stress_decision = get_ms_stress_decision();
\
+ }
\
+ if ((config::enable_ms_rate_limit ||
config::enable_ms_rate_limit_injection) && \
+ RpcRateLimitWhitelist::instance().should_rate_limit(#func_name) &&
\
+ ms_stress_decision.under_great_stress()) {
\
+ drop_request = true;
\
+ code = MetaServiceCode::MS_TOO_BUSY;
\
+ msg = ms_stress_decision.debug_string();
\
+ response->mutable_status()->set_code(code);
\
+ response->mutable_status()->set_msg(msg);
\
+ finish_rpc(#func_name, ctrl, request, response);
\
+ return;
\
+ }
\
DORIS_CLOUD_DEFER {
\
response->mutable_status()->set_code(code);
\
response->mutable_status()->set_msg(msg);
\
diff --git a/cloud/src/meta-service/meta_service_rate_limit_helper.cpp
b/cloud/src/meta-service/meta_service_rate_limit_helper.cpp
new file mode 100644
index 00000000000..0f8c750be6c
--- /dev/null
+++ b/cloud/src/meta-service/meta_service_rate_limit_helper.cpp
@@ -0,0 +1,843 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "meta-service/meta_service_rate_limit_helper.h"
+
+#include <fmt/format.h>
+#include <sched.h>
+#include <sys/resource.h>
+
+#include <algorithm>
+#include <atomic>
+#include <bit>
+#include <chrono>
+#include <condition_variable>
+#include <cstdint>
+#include <deque>
+#include <filesystem>
+#include <fstream>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <random>
+#include <sstream>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include "common/config.h"
+#include "common/logging.h"
+
+namespace doris::cloud {
+namespace internal {
+namespace {
+constexpr std::string_view kProcSelfCgroupPath = "/proc/self/cgroup";
+constexpr std::string_view kProcSelfMountInfoPath = "/proc/self/mountinfo";
+constexpr std::string_view kCgroupRootPath = "/sys/fs/cgroup";
+} // namespace
+
+struct CgroupMemoryInfo {
+ int64_t limit_bytes;
+ int64_t usage_bytes;
+};
+
+std::optional<std::string> read_first_line(const std::filesystem::path& path) {
+ std::ifstream stream(path);
+ if (!stream.is_open()) {
+ return std::nullopt;
+ }
+ std::string line;
+ std::getline(stream, line);
+ if (stream.fail() || stream.bad()) {
+ return std::nullopt;
+ }
+ return line;
+}
+
+std::optional<int64_t> read_int64_line(const std::filesystem::path& path) {
+ auto line = read_first_line(path);
+ if (!line.has_value() || line->empty()) {
+ return std::nullopt;
+ }
+ try {
+ return std::stoll(*line);
+ } catch (...) {
+ return std::nullopt;
+ }
+}
+
+std::unordered_map<std::string, int64_t> read_metrics_map(const
std::filesystem::path& path) {
+ std::unordered_map<std::string, int64_t> metrics;
+ std::ifstream stream(path);
+ if (!stream.is_open()) {
+ return metrics;
+ }
+
+ std::string key;
+ int64_t value = 0;
+ while (stream >> key >> value) {
+ metrics[key] = value;
+ }
+ return metrics;
+}
+
+std::vector<std::string> split(std::string_view text, char delimiter) {
+ std::vector<std::string> parts;
+ size_t start = 0;
+ while (start <= text.size()) {
+ size_t end = text.find(delimiter, start);
+ if (end == std::string_view::npos) {
+ end = text.size();
+ }
+ parts.emplace_back(text.substr(start, end - start));
+ start = end + 1;
+ }
+ return parts;
+}
+
+bool cgroups_v2_enabled() {
+ return std::filesystem::exists(std::filesystem::path(kCgroupRootPath) /
"cgroup.controllers");
+}
+
+std::optional<std::string> cgroup_v2_relative_path_of_process() {
+ auto line = read_first_line(std::filesystem::path(kProcSelfCgroupPath));
+ if (!line.has_value() || !line->starts_with("0::")) {
+ return std::nullopt;
+ }
+ return line->substr(3);
+}
+
+std::optional<std::filesystem::path> get_cgroup_v2_dir(const std::string&
subsystem_file) {
+ if (!cgroups_v2_enabled()) {
+ return std::nullopt;
+ }
+ auto relative_path = cgroup_v2_relative_path_of_process();
+ if (!relative_path.has_value()) {
+ return std::nullopt;
+ }
+
+ const std::filesystem::path cgroup_root(kCgroupRootPath);
+ std::filesystem::path current =
+ cgroup_root /
relative_path->substr(relative_path->starts_with('/') ? 1 : 0);
+ while (current != cgroup_root.parent_path()) {
+ if (std::filesystem::exists(current / subsystem_file)) {
+ return current;
+ }
+ current = current.parent_path();
+ }
+ return std::nullopt;
+}
+
+std::optional<std::string> get_cgroup_v1_process_path(const std::string&
subsystem) {
+ std::ifstream stream(kProcSelfCgroupPath.data());
+ if (!stream.is_open()) {
+ return std::nullopt;
+ }
+
+ std::string line;
+ while (std::getline(stream, line)) {
+ auto fields = split(line, ':');
+ if (fields.size() != 3) {
+ continue;
+ }
+ auto controllers = split(fields[1], ',');
+ if (std::find(controllers.begin(), controllers.end(), subsystem) !=
controllers.end()) {
+ return fields[2];
+ }
+ }
+ return std::nullopt;
+}
+
+std::optional<std::pair<std::string, std::string>> get_cgroup_v1_mount(
+ const std::string& subsystem) {
+ std::ifstream stream(kProcSelfMountInfoPath.data());
+ if (!stream.is_open()) {
+ return std::nullopt;
+ }
+
+ std::string line;
+ while (std::getline(stream, line)) {
+ auto separator = line.find(" - ");
+ if (separator == std::string::npos) {
+ continue;
+ }
+ auto left = split(std::string_view(line).substr(0, separator), ' ');
+ auto right = split(std::string_view(line).substr(separator + 3), ' ');
+ if (left.size() < 5 || right.size() < 3 || right[0] != "cgroup") {
+ continue;
+ }
+ auto options = split(right[2], ',');
+ if (std::find(options.begin(), options.end(), subsystem) ==
options.end()) {
+ continue;
+ }
+ std::string system_path = left[3];
+ if (system_path.size() > 1 && system_path.back() == '/') {
+ system_path.pop_back();
+ }
+ return std::make_pair(left[4], system_path);
+ }
+ return std::nullopt;
+}
+
+std::optional<std::filesystem::path> get_cgroup_v1_dir(const std::string&
subsystem) {
+ auto process_path = get_cgroup_v1_process_path(subsystem);
+ auto mount = get_cgroup_v1_mount(subsystem);
+ if (!process_path.has_value() || !mount.has_value()) {
+ return std::nullopt;
+ }
+
+ const auto& [mount_path, system_path] = *mount;
+ if (!process_path->starts_with(system_path)) {
+ return std::nullopt;
+ }
+
+ std::string absolute = *process_path;
+ absolute.replace(0, system_path.size(), mount_path);
+ return std::filesystem::path(absolute);
+}
+
+int parse_cpuset_cpu_count(std::string_view cpuset_line) {
+ if (cpuset_line.empty()) {
+ return -1;
+ }
+
+ int cpu_count = 0;
+ for (const auto& range : split(cpuset_line, ',')) {
+ if (range.empty()) {
+ return -1;
+ }
+ auto cpu_values = split(range, '-');
+ try {
+ if (cpu_values.size() == 2) {
+ const int start = std::stoi(cpu_values[0]);
+ const int end = std::stoi(cpu_values[1]);
+ if (end < start) {
+ return -1;
+ }
+ cpu_count += end - start + 1;
+ } else if (cpu_values.size() == 1) {
+ static_cast<void>(std::stoi(cpu_values[0]));
+ cpu_count += 1;
+ } else {
+ return -1;
+ }
+ } catch (...) {
+ return -1;
+ }
+ }
+ return cpu_count;
+}
+
+std::optional<double> parse_cgroup_v2_cpu_limit(std::string_view cpu_max_line)
{
+ std::istringstream input {std::string(cpu_max_line)};
+ std::string quota;
+ double period = 0;
+ if (!(input >> quota >> period) || quota == "max" || period <= 0) {
+ return std::nullopt;
+ }
+ try {
+ const double quota_value = std::stod(quota);
+ if (quota_value <= 0) {
+ return std::nullopt;
+ }
+ return quota_value / period;
+ } catch (...) {
+ return std::nullopt;
+ }
+}
+
+std::optional<double> parse_cgroup_v1_cpu_limit(int64_t quota_us, int64_t
period_us) {
+ if (quota_us <= 0 || period_us <= 0) {
+ return std::nullopt;
+ }
+ return static_cast<double>(quota_us) / static_cast<double>(period_us);
+}
+
+double get_process_affinity_cpu_limit() {
+ cpu_set_t cpu_set;
+ CPU_ZERO(&cpu_set);
+ if (sched_getaffinity(0, sizeof(cpu_set), &cpu_set) == 0) {
+ const int cpu_count = CPU_COUNT(&cpu_set);
+ if (cpu_count > 0) {
+ return static_cast<double>(cpu_count);
+ }
+ }
+ const uint32_t fallback = std::max<uint32_t>(1,
std::thread::hardware_concurrency());
+ return static_cast<double>(fallback);
+}
+
+std::optional<double> get_cgroup_cpu_quota_limit() {
+ if (auto dir = get_cgroup_v2_dir("cpu.max"); dir.has_value()) {
+ const std::filesystem::path cgroup_root(kCgroupRootPath);
+ std::optional<double> limit;
+ auto current = *dir;
+ while (current != cgroup_root.parent_path()) {
+ if (auto line = read_first_line(current / "cpu.max");
line.has_value()) {
+ if (auto quota = parse_cgroup_v2_cpu_limit(*line);
quota.has_value()) {
+ limit = limit.has_value() ? std::min(*limit, *quota) :
quota;
+ }
+ }
+ current = current.parent_path();
+ }
+ if (limit.has_value()) {
+ return limit;
+ }
+ }
+
+ if (auto dir = get_cgroup_v1_dir("cpu"); dir.has_value()) {
+ std::optional<double> limit;
+ auto current = *dir;
+ while (current != current.parent_path()) {
+ auto quota = read_int64_line(current / "cpu.cfs_quota_us");
+ auto period = read_int64_line(current / "cpu.cfs_period_us");
+ if (quota.has_value() && period.has_value()) {
+ if (auto parsed = parse_cgroup_v1_cpu_limit(*quota, *period);
parsed.has_value()) {
+ limit = limit.has_value() ? std::min(*limit, *parsed) :
parsed;
+ }
+ }
+ current = current.parent_path();
+ }
+ if (limit.has_value()) {
+ return limit;
+ }
+ }
+
+ return std::nullopt;
+}
+
+double get_effective_process_cpu_limit() {
+ double limit = get_process_affinity_cpu_limit();
+ if (auto quota = get_cgroup_cpu_quota_limit(); quota.has_value() && *quota
> 0) {
+ limit = std::min(limit, *quota);
+ }
+ return std::max(0.001, limit);
+}
+
+int64_t calculate_usage_percent(int64_t usage_bytes, int64_t limit_bytes) {
+ if (usage_bytes < 0 || limit_bytes <= 0) {
+ return -1;
+ }
+ return static_cast<int64_t>(static_cast<double>(usage_bytes) * 100.0 /
+ static_cast<double>(limit_bytes));
+}
+
+int64_t calculate_cpu_usage_percent(double delta_cpu_ns, double delta_wall_ns,
double cpu_limit) {
+ if (delta_cpu_ns < 0 || delta_wall_ns <= 0 || cpu_limit <= 0) {
+ return -1;
+ }
+ return static_cast<int64_t>(delta_cpu_ns * 100.0 / delta_wall_ns /
cpu_limit);
+}
+
+std::optional<CgroupMemoryInfo> get_cgroup_memory_info() {
+ if (auto dir = get_cgroup_v2_dir("memory.current"); dir.has_value()) {
+ auto limit_line = read_first_line(*dir / "memory.max");
+ auto usage = read_int64_line(*dir / "memory.current");
+ if (limit_line.has_value() && usage.has_value()) {
+ int64_t limit_bytes = std::numeric_limits<int64_t>::max();
+ if (*limit_line != "max") {
+ try {
+ limit_bytes = std::stoll(*limit_line);
+ } catch (...) {
+ return std::nullopt;
+ }
+ }
+ auto metrics = read_metrics_map(*dir / "memory.stat");
+ int64_t adjusted_usage = *usage;
+ adjusted_usage -= metrics["inactive_file"];
+ adjusted_usage -= metrics["slab_reclaimable"];
+ adjusted_usage = std::max<int64_t>(0, adjusted_usage);
+ return CgroupMemoryInfo {limit_bytes, adjusted_usage};
+ }
+ }
+
+ if (auto dir = get_cgroup_v1_dir("memory"); dir.has_value()) {
+ auto limit = read_int64_line(*dir / "memory.limit_in_bytes");
+ if (limit.has_value()) {
+ auto metrics = read_metrics_map(*dir / "memory.stat");
+ return CgroupMemoryInfo {*limit, metrics["rss"]};
+ }
+ }
+
+ return std::nullopt;
+}
+} // namespace internal
+
+namespace {
+constexpr int64_t kNanosecondsPerMillisecond = 1000 * 1000;
+constexpr int64_t kInvalidPercent = -1;
+
+struct WindowSample {
+ int64_t second {0};
+ int64_t fdb_client_thread_busyness_percent {BVAR_FDB_INVALID_VALUE};
+ int64_t ms_cpu_usage_percent {kInvalidPercent};
+ int64_t ms_memory_usage_percent {kInvalidPercent};
+};
+
+struct ProcessResourceSample {
+ int64_t cpu_usage_percent {kInvalidPercent};
+ int64_t memory_usage_percent {kInvalidPercent};
+};
+
+class LatestDecisionStorage {
+public:
+ void store(const MsStressDecision& decision) {
+ version_.fetch_add(1, std::memory_order_acq_rel);
+
+ fdb_cluster_under_pressure_.store(decision.fdb_cluster_under_pressure,
+ std::memory_order_relaxed);
+
fdb_client_thread_under_pressure_.store(decision.fdb_client_thread_under_pressure,
+ std::memory_order_relaxed);
+ ms_resource_under_pressure_.store(decision.ms_resource_under_pressure,
+ std::memory_order_relaxed);
+
rate_limit_injected_for_test_.store(decision.rate_limit_injected_for_test,
+ std::memory_order_relaxed);
+ fdb_commit_latency_ns_.store(decision.fdb_commit_latency_ns,
std::memory_order_relaxed);
+ fdb_read_latency_ns_.store(decision.fdb_read_latency_ns,
std::memory_order_relaxed);
+
fdb_performance_limited_by_name_.store(decision.fdb_performance_limited_by_name,
+ std::memory_order_relaxed);
+
fdb_client_thread_busyness_percent_.store(decision.fdb_client_thread_busyness_percent,
+ std::memory_order_relaxed);
+ fdb_client_thread_busyness_avg_percent_bits_.store(
+ encode_double(decision.fdb_client_thread_busyness_avg_percent),
+ std::memory_order_relaxed);
+ ms_cpu_usage_percent_.store(decision.ms_cpu_usage_percent,
std::memory_order_relaxed);
+
ms_cpu_usage_avg_percent_bits_.store(encode_double(decision.ms_cpu_usage_avg_percent),
+ std::memory_order_relaxed);
+ ms_memory_usage_percent_.store(decision.ms_memory_usage_percent,
std::memory_order_relaxed);
+
ms_memory_usage_avg_percent_bits_.store(encode_double(decision.ms_memory_usage_avg_percent),
+ std::memory_order_relaxed);
+
rate_limit_injected_random_value_.store(decision.rate_limit_injected_random_value,
+ std::memory_order_relaxed);
+
+ version_.fetch_add(1, std::memory_order_release);
+ }
+
+ MsStressDecision load() const {
+ MsStressDecision decision;
+ while (true) {
+ const uint64_t version_before =
version_.load(std::memory_order_acquire);
+ if ((version_before & 1) != 0) {
+ continue;
+ }
+
+ decision.fdb_cluster_under_pressure =
+
fdb_cluster_under_pressure_.load(std::memory_order_relaxed);
+ decision.fdb_client_thread_under_pressure =
+
fdb_client_thread_under_pressure_.load(std::memory_order_relaxed);
+ decision.ms_resource_under_pressure =
+
ms_resource_under_pressure_.load(std::memory_order_relaxed);
+ decision.rate_limit_injected_for_test =
+
rate_limit_injected_for_test_.load(std::memory_order_relaxed);
+ decision.fdb_commit_latency_ns =
fdb_commit_latency_ns_.load(std::memory_order_relaxed);
+ decision.fdb_read_latency_ns =
fdb_read_latency_ns_.load(std::memory_order_relaxed);
+ decision.fdb_performance_limited_by_name =
+
fdb_performance_limited_by_name_.load(std::memory_order_relaxed);
+ decision.fdb_client_thread_busyness_percent =
+
fdb_client_thread_busyness_percent_.load(std::memory_order_relaxed);
+ decision.fdb_client_thread_busyness_avg_percent = decode_double(
+
fdb_client_thread_busyness_avg_percent_bits_.load(std::memory_order_relaxed));
+ decision.ms_cpu_usage_percent =
ms_cpu_usage_percent_.load(std::memory_order_relaxed);
+ decision.ms_cpu_usage_avg_percent =
+
decode_double(ms_cpu_usage_avg_percent_bits_.load(std::memory_order_relaxed));
+ decision.ms_memory_usage_percent =
+ ms_memory_usage_percent_.load(std::memory_order_relaxed);
+ decision.ms_memory_usage_avg_percent = decode_double(
+
ms_memory_usage_avg_percent_bits_.load(std::memory_order_relaxed));
+ decision.rate_limit_injected_random_value =
+
rate_limit_injected_random_value_.load(std::memory_order_relaxed);
+
+ const uint64_t version_after =
version_.load(std::memory_order_acquire);
+ if (version_before == version_after) {
+ return decision;
+ }
+ }
+ }
+
+private:
+ static constexpr uint64_t encode_double(double value) { return
std::bit_cast<uint64_t>(value); }
+
+ static constexpr double decode_double(uint64_t bits) { return
std::bit_cast<double>(bits); }
+
+ std::atomic<uint64_t> version_ {0};
+ std::atomic<bool> fdb_cluster_under_pressure_ {false};
+ std::atomic<bool> fdb_client_thread_under_pressure_ {false};
+ std::atomic<bool> ms_resource_under_pressure_ {false};
+ std::atomic<bool> rate_limit_injected_for_test_ {false};
+ std::atomic<int64_t> fdb_commit_latency_ns_ {BVAR_FDB_INVALID_VALUE};
+ std::atomic<int64_t> fdb_read_latency_ns_ {BVAR_FDB_INVALID_VALUE};
+ std::atomic<int64_t> fdb_performance_limited_by_name_
{BVAR_FDB_INVALID_VALUE};
+ std::atomic<int64_t> fdb_client_thread_busyness_percent_
{BVAR_FDB_INVALID_VALUE};
+ std::atomic<uint64_t> fdb_client_thread_busyness_avg_percent_bits_
{encode_double(-1)};
+ std::atomic<int64_t> ms_cpu_usage_percent_ {-1};
+ std::atomic<uint64_t> ms_cpu_usage_avg_percent_bits_ {encode_double(-1)};
+ std::atomic<int64_t> ms_memory_usage_percent_ {-1};
+ std::atomic<uint64_t> ms_memory_usage_avg_percent_bits_
{encode_double(-1)};
+ std::atomic<int32_t> rate_limit_injected_random_value_ {-1};
+};
+
+class ProcessResourceSampler {
+public:
+ ProcessResourceSample sample() {
+ using namespace std::chrono;
+
+ const auto now = steady_clock::now();
+ const int64_t current_cpu_time_ns = get_process_cpu_time_ns();
+ ProcessResourceSample sample;
+ sample.memory_usage_percent = get_process_memory_usage_percent();
+
+ const auto current_wall_time_ns =
+ duration_cast<nanoseconds>(now.time_since_epoch()).count();
+ std::lock_guard lock(mutex_);
+ if (last_cpu_time_ns_ != kInvalidPercent && current_cpu_time_ns !=
kInvalidPercent &&
+ current_wall_time_ns > last_wall_time_ns_) {
+ const double delta_cpu_ns = current_cpu_time_ns -
last_cpu_time_ns_;
+ const double delta_wall_ns = current_wall_time_ns -
last_wall_time_ns_;
+ sample.cpu_usage_percent = internal::calculate_cpu_usage_percent(
+ delta_cpu_ns, delta_wall_ns,
internal::get_effective_process_cpu_limit());
+ }
+ last_cpu_time_ns_ = current_cpu_time_ns;
+ last_wall_time_ns_ = current_wall_time_ns;
+ return sample;
+ }
+
+private:
+ static int64_t get_process_cpu_time_ns() {
+ rusage usage {};
+ if (getrusage(RUSAGE_SELF, &usage) != 0) {
+ return kInvalidPercent;
+ }
+ return usage.ru_utime.tv_sec * 1000L * 1000 * 1000 +
usage.ru_utime.tv_usec * 1000L +
+ usage.ru_stime.tv_sec * 1000L * 1000 * 1000 +
usage.ru_stime.tv_usec * 1000L;
+ }
+
+ static int64_t get_process_memory_usage_percent() {
+ if (auto cgroup_memory = internal::get_cgroup_memory_info();
cgroup_memory.has_value()) {
+ if (cgroup_memory->limit_bytes > 0 &&
+ cgroup_memory->limit_bytes <
std::numeric_limits<int64_t>::max()) {
+ return
internal::calculate_usage_percent(cgroup_memory->usage_bytes,
+
cgroup_memory->limit_bytes);
+ }
+ }
+
+ return kInvalidPercent;
+ }
+
+ std::mutex mutex_;
+ int64_t last_cpu_time_ns_ {kInvalidPercent};
+ int64_t last_wall_time_ns_ {0};
+};
+
+MsStressMetrics collect_ms_stress_metrics(ProcessResourceSampler* sampler) {
+ MsStressMetrics metrics;
+ metrics.fdb_commit_latency_ns =
g_bvar_fdb_latency_probe_commit_ns.get_value();
+ metrics.fdb_read_latency_ns = g_bvar_fdb_latency_probe_read_ns.get_value();
+ metrics.fdb_performance_limited_by_name =
g_bvar_fdb_performance_limited_by_name.get_value();
+ metrics.fdb_client_thread_busyness_percent =
+ g_bvar_fdb_client_thread_busyness_percent.get_value();
+ const auto resource_sample = sampler->sample();
+ metrics.ms_cpu_usage_percent = resource_sample.cpu_usage_percent;
+ metrics.ms_memory_usage_percent = resource_sample.memory_usage_percent;
+ return metrics;
+}
+
+class MsStressDetector {
+public:
+ ~MsStressDetector() { stop(); }
+
+ // Compute decision from metrics and store it in latest_decision_.
+ // Called by the background thread or synchronously in tests.
+ void update(int64_t now_ms, const MsStressMetrics& metrics) {
+ MsStressDecision decision;
+ decision.fdb_commit_latency_ns = metrics.fdb_commit_latency_ns;
+ decision.fdb_read_latency_ns = metrics.fdb_read_latency_ns;
+ decision.fdb_performance_limited_by_name =
metrics.fdb_performance_limited_by_name;
+ decision.fdb_client_thread_busyness_percent =
metrics.fdb_client_thread_busyness_percent;
+ decision.ms_cpu_usage_percent = metrics.ms_cpu_usage_percent;
+ decision.ms_memory_usage_percent = metrics.ms_memory_usage_percent;
+ const bool commit_latency_high =
+ metrics.fdb_commit_latency_ns != BVAR_FDB_INVALID_VALUE &&
+ metrics.fdb_commit_latency_ns >
+ config::ms_rate_limit_fdb_commit_latency_ms *
kNanosecondsPerMillisecond;
+ const bool read_latency_high =
+ metrics.fdb_read_latency_ns != BVAR_FDB_INVALID_VALUE &&
+ metrics.fdb_read_latency_ns >
+ config::ms_rate_limit_fdb_read_latency_ms *
kNanosecondsPerMillisecond;
+ decision.fdb_cluster_under_pressure =
+ (commit_latency_high || read_latency_high) &&
+ metrics.fdb_performance_limited_by_name !=
BVAR_FDB_INVALID_VALUE &&
+ metrics.fdb_performance_limited_by_name != 0;
+
+ const int64_t current_second = now_ms / 1000;
+ // No mutex needed: update() is only called from a single thread
+ // (background thread in production, test thread in tests).
+ record_sample(current_second, metrics);
+
+ const double avg_busyness =
+ get_window_avg(current_second,
&WindowSample::fdb_client_thread_busyness_percent,
+ BVAR_FDB_INVALID_VALUE);
+ decision.fdb_client_thread_busyness_avg_percent = avg_busyness;
+ if (avg_busyness >= 0 &&
+ metrics.fdb_client_thread_busyness_percent !=
BVAR_FDB_INVALID_VALUE) {
+ decision.fdb_client_thread_under_pressure =
+ avg_busyness >
config::ms_rate_limit_fdb_client_thread_busyness_avg_percent &&
+ metrics.fdb_client_thread_busyness_percent >
+
config::ms_rate_limit_fdb_client_thread_busyness_instant_percent;
+ }
+
+ const double avg_cpu = get_window_avg(current_second,
&WindowSample::ms_cpu_usage_percent,
+ kInvalidPercent);
+ const double avg_memory = get_window_avg(
+ current_second, &WindowSample::ms_memory_usage_percent,
kInvalidPercent);
+ decision.ms_cpu_usage_avg_percent = avg_cpu;
+ decision.ms_memory_usage_avg_percent = avg_memory;
+ if (avg_cpu >= 0 && metrics.ms_cpu_usage_percent != kInvalidPercent) {
+ decision.ms_resource_under_pressure =
+ metrics.ms_cpu_usage_percent >
config::ms_rate_limit_cpu_usage_percent &&
+ avg_cpu > config::ms_rate_limit_cpu_usage_percent;
+ }
+ if (avg_memory >= 0 && metrics.ms_memory_usage_percent !=
kInvalidPercent) {
+ decision.ms_resource_under_pressure =
+ decision.ms_resource_under_pressure ||
+ (metrics.ms_memory_usage_percent >
config::ms_rate_limit_memory_usage_percent &&
+ avg_memory > config::ms_rate_limit_memory_usage_percent);
+ }
+ latest_decision_.store(decision);
+ }
+
+ MsStressDecision get_latest_decision() const { return
latest_decision_.load(); }
+
+ void reset() {
+ samples_.clear();
+ latest_decision_.store(MsStressDecision {});
+ }
+
+ // Start the background thread that periodically collects metrics and
updates.
+ void start() {
+ if (running_.load() != 0) {
+ return;
+ }
+
+ std::unique_lock lock(mtx_);
+ if (running_.load() != 0) {
+ return;
+ }
+ running_.store(1);
+ bg_thread_ = std::make_unique<std::thread>([this] {
+ pthread_setname_np(pthread_self(), "ms_stress_det");
+ LOG(INFO) << "MsStressDetector background thread started";
+ ProcessResourceSampler sampler;
+ while (running_.load() == 1) {
+ const auto now_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ const auto metrics = collect_ms_stress_metrics(&sampler);
+ update(now_ms, metrics);
+ std::unique_lock l(mtx_);
+ cv_.wait_for(l, std::chrono::seconds(1), [this]() { return
running_.load() != 1; });
+ }
+ LOG(INFO) << "MsStressDetector background thread stopped";
+ });
+ }
+
+ void stop() {
+ {
+ std::unique_lock lock(mtx_);
+ if (running_.load() != 1) {
+ return;
+ }
+ running_.store(2);
+ cv_.notify_all();
+ }
+ if (bg_thread_ && bg_thread_->joinable()) {
+ bg_thread_->join();
+ bg_thread_.reset();
+ }
+ }
+
+private:
+ using SampleField = int64_t WindowSample::*;
+
+ void record_sample(int64_t current_second, const MsStressMetrics& metrics)
{
+ WindowSample sample;
+ sample.second = current_second;
+ sample.fdb_client_thread_busyness_percent =
metrics.fdb_client_thread_busyness_percent;
+ sample.ms_cpu_usage_percent = metrics.ms_cpu_usage_percent;
+ sample.ms_memory_usage_percent = metrics.ms_memory_usage_percent;
+ if (!samples_.empty() && samples_.back().second == current_second) {
+ samples_.back() = sample;
+ } else {
+ samples_.push_back(sample);
+ }
+
+ const int64_t window_start =
+ current_second - std::max<int64_t>(1,
config::ms_rate_limit_window_seconds) + 1;
+ while (!samples_.empty() && samples_.front().second < window_start) {
+ samples_.pop_front();
+ }
+ }
+
+ double get_window_avg(int64_t current_second, SampleField field, int64_t
invalid_value) const {
+ if (samples_.empty()) {
+ return -1;
+ }
+ const int64_t required_span =
+ std::max<int64_t>(1, config::ms_rate_limit_window_seconds) - 1;
+ if (samples_.back().second != current_second ||
+ current_second - samples_.front().second < required_span) {
+ return -1;
+ }
+
+ double sum = 0;
+ int64_t valid_count = 0;
+ for (const auto& sample : samples_) {
+ if (sample.*field == invalid_value) {
+ continue;
+ }
+ sum += sample.*field;
+ ++valid_count;
+ }
+ if (valid_count == 0) {
+ return -1;
+ }
+ return sum / valid_count;
+ }
+
+ LatestDecisionStorage latest_decision_;
+ std::deque<WindowSample> samples_;
+
+ // Background thread lifecycle
+ std::atomic<int> running_ {0};
+ mutable std::mutex mtx_;
+ std::condition_variable cv_;
+ std::unique_ptr<std::thread> bg_thread_;
+};
+
+MsStressDetector& global_ms_stress_detector() {
+ static MsStressDetector detector;
+ // Auto-start background thread on first access.
+ // start() is idempotent: subsequent calls are no-ops.
+ detector.start();
+ return detector;
+}
+
+int32_t get_ms_rate_limit_injection_random_value() {
+ thread_local std::mt19937 gen(std::random_device {}());
+ thread_local std::uniform_int_distribution<int32_t> dist(0, 99);
+ return dist(gen);
+}
+
+void maybe_apply_ms_rate_limit_injection(MsStressDecision* decision, int32_t
random_value) {
+ if (!config::enable_ms_rate_limit_injection) {
+ return;
+ }
+ if (random_value < 0 || random_value >=
config::ms_rate_limit_injection_probability) {
+ return;
+ }
+ decision->rate_limit_injected_for_test = true;
+ decision->rate_limit_injected_random_value = random_value;
+}
+} // namespace
+
+std::string MsStressDecision::debug_string() const {
+ if (!under_great_stress()) {
+ return "meta service rate limited: no stress condition matched";
+ }
+
+ std::vector<std::string> reasons;
+ if (fdb_cluster_under_pressure) {
+ reasons.push_back(fmt::format(
+ "fdb_cluster(commit_latency_ms={}, read_latency_ms={},
performance_limited_by={})",
+ fdb_commit_latency_ns == BVAR_FDB_INVALID_VALUE ? -1
+ :
fdb_commit_latency_ns / 1000000,
+ fdb_read_latency_ns == BVAR_FDB_INVALID_VALUE ? -1 :
fdb_read_latency_ns / 1000000,
+ fdb_performance_limited_by_name));
+ }
+ if (fdb_client_thread_under_pressure) {
+ reasons.push_back(fmt::format(
+ "fdb_client_thread(busyness_avg={:.2f}%, busyness_instant={}%,
thresholds=avg>{}% "
+ "and instant>{}%)",
+ fdb_client_thread_busyness_avg_percent,
fdb_client_thread_busyness_percent,
+ config::ms_rate_limit_fdb_client_thread_busyness_avg_percent,
+
config::ms_rate_limit_fdb_client_thread_busyness_instant_percent));
+ }
+ if (ms_resource_under_pressure) {
+ reasons.push_back(
+ fmt::format("ms_resource(cpu_current={}%, cpu_avg={:.2f}%,
memory_current={}%, "
+ "memory_avg={:.2f}%, thresholds=cpu>{}% or
memory>{}%)",
+ ms_cpu_usage_percent, ms_cpu_usage_avg_percent,
ms_memory_usage_percent,
+ ms_memory_usage_avg_percent,
config::ms_rate_limit_cpu_usage_percent,
+ config::ms_rate_limit_memory_usage_percent));
+ }
+ if (rate_limit_injected_for_test) {
+ reasons.push_back(fmt::format("test_injection(random_value={},
probability<{}%)",
+ rate_limit_injected_random_value,
+
config::ms_rate_limit_injection_probability));
+ }
+ return fmt::format("meta service rate limited by {}", fmt::join(reasons,
"; "));
+}
+
+MsStressDecision get_ms_stress_decision() {
+ MsStressDecision decision =
global_ms_stress_detector().get_latest_decision();
+ // Rate limit injection is per-request (random), so apply it here, not in
the background thread.
+ maybe_apply_ms_rate_limit_injection(&decision,
get_ms_rate_limit_injection_random_value());
+ return decision;
+}
+
+MsStressDecision update_ms_stress_detector_for_test(int64_t now_ms, const
MsStressMetrics& metrics,
+ bool reset,
+ int32_t
rate_limit_injected_random_value) {
+ // Separate detector instance for tests — no background thread,
synchronous updates.
+ static MsStressDetector detector;
+ if (reset) {
+ detector.reset();
+ }
+ detector.update(now_ms, metrics);
+ MsStressDecision decision = detector.get_latest_decision();
+ maybe_apply_ms_rate_limit_injection(&decision,
rate_limit_injected_random_value);
+ return decision;
+}
+
+RpcRateLimitWhitelist& RpcRateLimitWhitelist::instance() {
+ static RpcRateLimitWhitelist inst;
+ static std::once_flag init_flag;
+ std::call_once(init_flag, []() {
+ inst.set_whitelist({"prepare_rowset", "commit_rowset",
"update_tmp_rowset",
+ "update_delete_bitmap",
"update_packed_file_info"});
+ });
+ return inst;
+}
+
+bool RpcRateLimitWhitelist::should_rate_limit(const std::string& rpc_name)
const {
+ std::lock_guard lock(mutex_);
+ return whitelist_.empty() || whitelist_.contains(rpc_name);
+}
+
+void RpcRateLimitWhitelist::set_whitelist(const std::vector<std::string>&
rpcs) {
+ std::lock_guard lock(mutex_);
+ whitelist_.clear();
+ whitelist_.insert(rpcs.begin(), rpcs.end());
+}
+
+std::vector<std::string> RpcRateLimitWhitelist::get_whitelist() const {
+ std::lock_guard lock(mutex_);
+ return {whitelist_.begin(), whitelist_.end()};
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service_rate_limit_helper.h
b/cloud/src/meta-service/meta_service_rate_limit_helper.h
new file mode 100644
index 00000000000..365c53e9aaa
--- /dev/null
+++ b/cloud/src/meta-service/meta_service_rate_limit_helper.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <mutex>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/bvars.h"
+
+namespace doris::cloud {
+
+struct MsStressMetrics {
+ int64_t fdb_commit_latency_ns {BVAR_FDB_INVALID_VALUE};
+ int64_t fdb_read_latency_ns {BVAR_FDB_INVALID_VALUE};
+ int64_t fdb_performance_limited_by_name {BVAR_FDB_INVALID_VALUE};
+ int64_t fdb_client_thread_busyness_percent {BVAR_FDB_INVALID_VALUE};
+ int64_t ms_cpu_usage_percent {-1};
+ int64_t ms_memory_usage_percent {-1};
+};
+
+struct MsStressDecision {
+ bool fdb_cluster_under_pressure {false};
+ bool fdb_client_thread_under_pressure {false};
+ bool ms_resource_under_pressure {false};
+ bool rate_limit_injected_for_test {false};
+ int64_t fdb_commit_latency_ns {BVAR_FDB_INVALID_VALUE};
+ int64_t fdb_read_latency_ns {BVAR_FDB_INVALID_VALUE};
+ int64_t fdb_performance_limited_by_name {BVAR_FDB_INVALID_VALUE};
+ int64_t fdb_client_thread_busyness_percent {BVAR_FDB_INVALID_VALUE};
+ double fdb_client_thread_busyness_avg_percent {-1};
+ int64_t ms_cpu_usage_percent {-1};
+ double ms_cpu_usage_avg_percent {-1};
+ int64_t ms_memory_usage_percent {-1};
+ double ms_memory_usage_avg_percent {-1};
+ int32_t rate_limit_injected_random_value {-1};
+
+ [[nodiscard]] bool under_great_stress() const {
+ return fdb_cluster_under_pressure || fdb_client_thread_under_pressure
||
+ ms_resource_under_pressure || rate_limit_injected_for_test;
+ }
+
+ [[nodiscard]] std::string debug_string() const;
+};
+
+MsStressDecision get_ms_stress_decision();
+MsStressDecision update_ms_stress_detector_for_test(int64_t now_ms, const
MsStressMetrics& metrics,
+ bool reset = false,
+ int32_t
rate_limit_injected_random_value = -1);
+
+class RpcRateLimitWhitelist {
+public:
+ static RpcRateLimitWhitelist& instance();
+ bool should_rate_limit(const std::string& rpc_name) const;
+ void set_whitelist(const std::vector<std::string>& rpcs);
+ std::vector<std::string> get_whitelist() const;
+
+private:
+ mutable std::mutex mutex_;
+ std::unordered_set<std::string> whitelist_;
+};
+
+} // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 0d40bd5c2d6..7ed83887e10 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -56,6 +56,7 @@ add_executable(meta_reader_test meta_reader_test.cpp)
add_executable(meta_service_test
meta_service_test.cpp
+ meta_service_helper_test.cpp
meta_service_job_test.cpp
meta_service_http_test.cpp
meta_service_operation_log_test.cpp
@@ -223,4 +224,3 @@ install(FILES
GROUP_READ GROUP_WRITE GROUP_EXECUTE
WORLD_READ WORLD_EXECUTE
DESTINATION ${BUILD_DIR}/test)
-
diff --git a/cloud/test/meta_service_helper_test.cpp
b/cloud/test/meta_service_helper_test.cpp
new file mode 100644
index 00000000000..73b3d37de5b
--- /dev/null
+++ b/cloud/test/meta_service_helper_test.cpp
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <limits>
+#include <optional>
+#include <string_view>
+
+#include "common/config.h"
+#include "meta-service/meta_service_rate_limit_helper.h"
+
+namespace doris::cloud {
+namespace internal {
+int parse_cpuset_cpu_count(std::string_view cpuset_line);
+std::optional<double> parse_cgroup_v2_cpu_limit(std::string_view cpu_max_line);
+std::optional<double> parse_cgroup_v1_cpu_limit(int64_t quota_us, int64_t
period_us);
+int64_t calculate_usage_percent(int64_t usage_bytes, int64_t limit_bytes);
+int64_t calculate_cpu_usage_percent(double delta_cpu_ns, double delta_wall_ns,
double cpu_limit);
+} // namespace internal
+
+namespace {
+struct MsRateLimitInjectionConfigGuard {
+ ~MsRateLimitInjectionConfigGuard() {
+ config::enable_ms_rate_limit_injection = original_enable;
+ config::ms_rate_limit_injection_probability = original_probability;
+ }
+
+ bool original_enable {config::enable_ms_rate_limit_injection};
+ int32_t original_probability {config::ms_rate_limit_injection_probability};
+};
+} // namespace
+
+TEST(MetaServiceHelperTest, FdbClusterPressureNeedsLatencyAndNonWorkload) {
+ MsStressMetrics metrics;
+ metrics.fdb_commit_latency_ns = 51L * 1000 * 1000;
+ metrics.fdb_performance_limited_by_name = -1;
+
+ auto decision = update_ms_stress_detector_for_test(0, metrics, true);
+ ASSERT_TRUE(decision.fdb_cluster_under_pressure);
+ ASSERT_TRUE(decision.under_great_stress());
+ std::cout << decision.debug_string() << std::endl;
+ ASSERT_NE(decision.debug_string().find("fdb_cluster"), std::string::npos);
+
+ metrics.fdb_performance_limited_by_name = 0;
+ decision = update_ms_stress_detector_for_test(1000, metrics, true);
+ ASSERT_FALSE(decision.fdb_cluster_under_pressure);
+ ASSERT_FALSE(decision.under_great_stress());
+}
+
+TEST(MetaServiceHelperTest,
FdbClientThreadPressureNeedsWindowAverageAndInstantValue) {
+ MsStressMetrics metrics;
+ for (int second = 0; second < 60; ++second) {
+ metrics.fdb_client_thread_busyness_percent = 71;
+ auto decision = update_ms_stress_detector_for_test(second * 1000,
metrics, second == 0);
+ ASSERT_FALSE(decision.fdb_client_thread_under_pressure);
+ }
+
+ metrics.fdb_client_thread_busyness_percent = 91;
+ auto decision = update_ms_stress_detector_for_test(60 * 1000, metrics);
+ ASSERT_TRUE(decision.fdb_client_thread_under_pressure);
+ ASSERT_TRUE(decision.under_great_stress());
+ std::cout << decision.debug_string() << std::endl;
+ ASSERT_NE(decision.debug_string().find("fdb_client_thread"),
std::string::npos);
+}
+
+TEST(MetaServiceHelperTest,
MsResourcePressureNeedsCurrentAndWindowAverageHigh) {
+ MsStressMetrics metrics;
+ for (int second = 0; second < 59; ++second) {
+ metrics.ms_cpu_usage_percent = 96;
+ auto decision = update_ms_stress_detector_for_test(second * 1000,
metrics, second == 0);
+ ASSERT_FALSE(decision.ms_resource_under_pressure);
+ }
+
+ metrics.ms_cpu_usage_percent = 96;
+ auto decision = update_ms_stress_detector_for_test(59 * 1000, metrics);
+ ASSERT_TRUE(decision.ms_resource_under_pressure);
+ ASSERT_TRUE(decision.under_great_stress());
+ std::cout << decision.debug_string() << std::endl;
+ ASSERT_NE(decision.debug_string().find("ms_resource"), std::string::npos);
+
+ metrics.ms_cpu_usage_percent = 50;
+ decision = update_ms_stress_detector_for_test(60 * 1000, metrics);
+ ASSERT_FALSE(decision.ms_resource_under_pressure);
+}
+
+TEST(MetaServiceHelperTest,
MsRateLimitInjectionRequiresSwitchAndProbabilityHit) {
+ MsRateLimitInjectionConfigGuard guard;
+
+ MsStressMetrics metrics;
+ config::enable_ms_rate_limit_injection = false;
+ config::ms_rate_limit_injection_probability = 100;
+ auto decision = update_ms_stress_detector_for_test(0, metrics, true, 0);
+ ASSERT_FALSE(decision.rate_limit_injected_for_test);
+ ASSERT_FALSE(decision.under_great_stress());
+
+ config::enable_ms_rate_limit_injection = true;
+ config::ms_rate_limit_injection_probability = 30;
+ decision = update_ms_stress_detector_for_test(1000, metrics, true, 30);
+ ASSERT_FALSE(decision.rate_limit_injected_for_test);
+ ASSERT_FALSE(decision.under_great_stress());
+
+ decision = update_ms_stress_detector_for_test(2000, metrics, true, 29);
+ ASSERT_TRUE(decision.rate_limit_injected_for_test);
+ ASSERT_TRUE(decision.under_great_stress());
+ ASSERT_NE(decision.debug_string().find("test_injection"),
std::string::npos);
+}
+
+TEST(MetaServiceHelperTest, ParseCpusetCpuCount) {
+ ASSERT_EQ(internal::parse_cpuset_cpu_count("0-3,5,7-8"), 7);
+ ASSERT_EQ(internal::parse_cpuset_cpu_count("2"), 1);
+ ASSERT_EQ(internal::parse_cpuset_cpu_count(""), -1);
+ ASSERT_EQ(internal::parse_cpuset_cpu_count("3-1"), -1);
+}
+
+TEST(MetaServiceHelperTest, ParseCgroupCpuQuota) {
+ auto v2_limit = internal::parse_cgroup_v2_cpu_limit("50000 100000");
+ ASSERT_TRUE(v2_limit.has_value());
+ ASSERT_DOUBLE_EQ(*v2_limit, 0.5);
+ ASSERT_FALSE(internal::parse_cgroup_v2_cpu_limit("max
100000").has_value());
+
+ auto v1_limit = internal::parse_cgroup_v1_cpu_limit(150000, 100000);
+ ASSERT_TRUE(v1_limit.has_value());
+ ASSERT_DOUBLE_EQ(*v1_limit, 1.5);
+ ASSERT_FALSE(internal::parse_cgroup_v1_cpu_limit(-1, 100000).has_value());
+}
+
+TEST(MetaServiceHelperTest, UsagePercentCalculationUsesEffectiveLimit) {
+ ASSERT_EQ(internal::calculate_usage_percent(512, 1024), 50);
+ ASSERT_EQ(internal::calculate_usage_percent(-1, 1024), -1);
+ ASSERT_EQ(internal::calculate_usage_percent(512,
std::numeric_limits<int64_t>::max()), 0);
+
+ ASSERT_EQ(internal::calculate_cpu_usage_percent(5e8, 1e9, 0.5), 100);
+ ASSERT_EQ(internal::calculate_cpu_usage_percent(15e8, 1e9, 2.0), 75);
+ ASSERT_EQ(internal::calculate_cpu_usage_percent(1, 0, 2.0), -1);
+}
+} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]