This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8721197c774 branch-4.1: [feat](cloud) Add system rate limit for 
meta-service #61516 (#63932)
8721197c774 is described below

commit 8721197c774ed4d06f2ea4315016c6c5d5247436
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 13:18:23 2026 +0800

    branch-4.1: [feat](cloud) Add system rate limit for meta-service #61516 
(#63932)
    
    Cherry-picked from #61516
    
    Co-authored-by: Yixuan Wang <[email protected]>
---
 be/src/cloud/cloud_meta_mgr.cpp                    |  12 +
 cloud/src/common/bvars.cpp                         |   1 +
 cloud/src/common/bvars.h                           |   1 +
 cloud/src/common/config.h                          |  14 +
 cloud/src/common/http_helper.cpp                   |  61 ++
 cloud/src/common/http_helper.h                     |   6 +
 cloud/src/common/metric.cpp                        |  16 +
 cloud/src/meta-service/CMakeLists.txt              |   1 +
 cloud/src/meta-service/meta_service_helper.h       |  16 +
 .../meta_service_rate_limit_helper.cpp             | 843 +++++++++++++++++++++
 .../meta-service/meta_service_rate_limit_helper.h  |  80 ++
 cloud/test/CMakeLists.txt                          |   2 +-
 cloud/test/meta_service_helper_test.cpp            | 151 ++++
 13 files changed, 1203 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index f04ff8c34e7..b3b2df60321 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -159,6 +159,18 @@ bvar::LatencyRecorder 
g_cloud_commit_txn_resp_redirect_latency("cloud_table_stat
 bvar::Adder<uint64_t> 
g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count");
 bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window(
         "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 
30);
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_total_count(
+        "cloud_meta_mgr_ms_too_busy_reason", "total");
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_fdb_cluster_count(
+        "cloud_meta_mgr_ms_too_busy_reason", "fdb_cluster");
+bvar::Adder<uint64_t> 
g_cloud_meta_mgr_ms_too_busy_reason_fdb_client_thread_count(
+        "cloud_meta_mgr_ms_too_busy_reason", "fdb_client_thread");
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_ms_resource_count(
+        "cloud_meta_mgr_ms_too_busy_reason", "ms_resource");
+bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_test_injection_count(
+        "cloud_meta_mgr_ms_too_busy_reason", "test_injection");
+bvar::Adder<uint64_t> 
g_cloud_meta_mgr_ms_too_busy_reason_no_stress_condition_matched_count(
+        "cloud_meta_mgr_ms_too_busy_reason", "no_stress_condition_matched");
 bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
         "cloud_be_mow_get_dbm_lock_backoff_sleep_time");
 bvar::Adder<uint64_t> 
g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index e2f587e151a..f22495e0ecc 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -284,6 +284,7 @@ bvar::Status<int64_t> 
g_bvar_fdb_incompatible_connections("fdb_incompatible_conn
 bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_transaction_start_ns("fdb_latency_probe_transaction_start_ns",
 BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_performance_limited_by_name("fdb_performance_limited_by_name", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> g_bvar_fdb_machines_count("fdb_machines_count", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> g_bvar_fdb_process_count("fdb_process_count", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> 
g_bvar_fdb_qos_worst_data_lag_storage_server_ns("fdb_qos_worst_data_lag_storage_server_ns",
 BVAR_FDB_INVALID_VALUE);
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 5497cf6a754..d1a7bb6c8c7 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -750,6 +750,7 @@ extern bvar::Status<int64_t> 
g_bvar_fdb_incompatible_connections;
 extern bvar::Status<int64_t> g_bvar_fdb_latency_probe_transaction_start_ns;
 extern bvar::Status<int64_t> g_bvar_fdb_latency_probe_commit_ns;
 extern bvar::Status<int64_t> g_bvar_fdb_latency_probe_read_ns;
+extern bvar::Status<int64_t> g_bvar_fdb_performance_limited_by_name;
 extern bvar::Status<int64_t> g_bvar_fdb_machines_count;
 extern bvar::Status<int64_t> g_bvar_fdb_process_count;
 extern bvar::Status<int64_t> g_bvar_fdb_qos_worst_data_lag_storage_server_ns;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index b03219b062b..85aaaf6f0d2 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -193,6 +193,20 @@ CONF_Int64(default_max_qps_limit, "1000000");
 CONF_String(specific_max_qps_limit, "get_cluster:5000000;begin_txn:5000000");
 CONF_Bool(enable_rate_limit, "true");
 CONF_Int64(bvar_qps_update_second, "5");
+CONF_mBool(enable_ms_rate_limit, "true");
+// Fault injection: randomly return meta service rate limit error for testing.
+// ms_rate_limit_injection_probability is the probability (0-100) of injecting 
a rate limit error.
+CONF_mBool(enable_ms_rate_limit_injection, "false");
+CONF_mInt32(ms_rate_limit_injection_probability, "5");
+CONF_Validator(ms_rate_limit_injection_probability,
+               [](int32_t config) -> bool { return config >= 0 && config <= 
100; });
+CONF_mInt64(ms_rate_limit_window_seconds, "60");
+CONF_mInt64(ms_rate_limit_fdb_commit_latency_ms, "50");
+CONF_mInt64(ms_rate_limit_fdb_read_latency_ms, "5");
+CONF_mInt64(ms_rate_limit_fdb_client_thread_busyness_avg_percent, "70");
+CONF_mInt64(ms_rate_limit_fdb_client_thread_busyness_instant_percent, "90");
+CONF_mInt64(ms_rate_limit_cpu_usage_percent, "95");
+CONF_mInt64(ms_rate_limit_memory_usage_percent, "95");
 
 CONF_mInt32(copy_job_max_retention_second, "259200"); //3 * 24 * 3600 seconds
 CONF_String(arn_id, "");
diff --git a/cloud/src/common/http_helper.cpp b/cloud/src/common/http_helper.cpp
index 5ddd711dda6..c7f1bd488ad 100644
--- a/cloud/src/common/http_helper.cpp
+++ b/cloud/src/common/http_helper.cpp
@@ -36,12 +36,14 @@
 #include <cstdint>
 #include <string>
 #include <type_traits>
+#include <vector>
 
 #include "common/metric.h"
 #include "cpp/s3_rate_limiter.h"
 #include "meta-service/meta_service.h"
 #include "meta-service/meta_service_helper.h"
 #include "meta-service/meta_service_http.h"
+#include "meta-service/meta_service_rate_limit_helper.h"
 #include "recycler/recycler.h"
 #include "recycler/recycler_service.h"
 namespace doris::cloud {
@@ -287,6 +289,18 @@ const std::unordered_map<std::string_view, 
HttpHandlerInfo>& get_http_handlers()
                               return process_get_cluster_status((MS*)s, c);
                           },
                   .role = HttpRole::META_SERVICE}},
+                {"set_rpc_rate_limit_whitelist",
+                 {.handler =
+                          [](void* s, brpc::Controller* c) {
+                              return 
process_set_rpc_rate_limit_whitelist((MS*)s, c);
+                          },
+                  .role = HttpRole::META_SERVICE}},
+                {"get_rpc_rate_limit_whitelist",
+                 {.handler =
+                          [](void* s, brpc::Controller* c) {
+                              return 
process_get_rpc_rate_limit_whitelist((MS*)s, c);
+                          },
+                  .role = HttpRole::META_SERVICE}},
 
                 {"list_snapshot",
                  {.handler = [](void* s,
@@ -995,6 +1009,53 @@ HttpResponse process_get_cluster_status(MetaServiceImpl* 
service, brpc::Controll
     return http_json_reply_message(resp.status(), resp);
 }
 
+HttpResponse process_set_rpc_rate_limit_whitelist(MetaServiceImpl*, 
brpc::Controller* ctrl) {
+    rapidjson::Document doc;
+    std::string body = ctrl->request_attachment().to_string();
+    doc.Parse(body.c_str());
+
+    if (doc.HasParseError()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                               fmt::format("parse json failed: {}",
+                                           
rapidjson::GetParseError_En(doc.GetParseError())));
+    }
+
+    if (!doc.IsObject() || !doc.HasMember("rpcs") || !doc["rpcs"].IsArray()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                               "invalid request, need {\"rpcs\": [\"rpc1\", 
\"rpc2\"]}");
+    }
+
+    std::vector<std::string> rpcs;
+    for (auto& rpc : doc["rpcs"].GetArray()) {
+        if (rpc.IsString()) {
+            rpcs.emplace_back(rpc.GetString());
+        }
+    }
+
+    RpcRateLimitWhitelist::instance().set_whitelist(rpcs);
+    return http_json_reply(MetaServiceCode::OK, "success");
+}
+
+HttpResponse process_get_rpc_rate_limit_whitelist(MetaServiceImpl*, 
brpc::Controller*) {
+    auto rpcs = RpcRateLimitWhitelist::instance().get_whitelist();
+
+    rapidjson::Document doc;
+    doc.SetObject();
+    auto& allocator = doc.GetAllocator();
+
+    rapidjson::Value rpc_array(rapidjson::kArrayType);
+    for (const auto& rpc : rpcs) {
+        rpc_array.PushBack(rapidjson::Value(rpc.c_str(), allocator), 
allocator);
+    }
+    doc.AddMember("rpcs", rpc_array, allocator);
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+    doc.Accept(writer);
+
+    return http_json_reply(MetaServiceCode::OK, "success", buffer.GetString());
+}
+
 HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
     auto& uri = ctrl->http_request().uri();
     std::string instance_id(http_query(uri, "instance_id"));
diff --git a/cloud/src/common/http_helper.h b/cloud/src/common/http_helper.h
index 378f1fcc5c9..7224510bc1d 100644
--- a/cloud/src/common/http_helper.h
+++ b/cloud/src/common/http_helper.h
@@ -155,6 +155,12 @@ const std::unordered_map<std::string_view, 
HttpHandlerInfo>& get_http_handlers()
 [[maybe_unused]] HttpResponse process_get_cluster_status(MetaServiceImpl* 
service,
                                                          brpc::Controller* 
ctrl);
 
+[[maybe_unused]] HttpResponse 
process_set_rpc_rate_limit_whitelist(MetaServiceImpl* service,
+                                                                   
brpc::Controller* ctrl);
+
+[[maybe_unused]] HttpResponse 
process_get_rpc_rate_limit_whitelist(MetaServiceImpl* service,
+                                                                   
brpc::Controller* ctrl);
+
 [[maybe_unused]] HttpResponse process_txn_lazy_commit(MetaServiceImpl* service,
                                                       brpc::Controller* ctrl);
 
diff --git a/cloud/src/common/metric.cpp b/cloud/src/common/metric.cpp
index d41e7ea6e0f..a4c91cae271 100644
--- a/cloud/src/common/metric.cpp
+++ b/cloud/src/common/metric.cpp
@@ -130,6 +130,16 @@ static void export_fdb_status_details(const std::string& 
status_str) {
         if (node->value.IsArray()) return node->value.Size();
         return BVAR_FDB_INVALID_VALUE;
     };
+    auto get_string_value = [&](const std::vector<const char*>& v) -> 
std::string {
+        if (v.empty()) return "invalid";
+        auto node = document.FindMember("cluster");
+        for (const auto& name : v) {
+            if (!node->value.HasMember(name)) return "invalid";
+            node = node->value.FindMember(name);
+        }
+        if (node->value.IsString()) return node->value.GetString();
+        return "invalid";
+    };
     auto get_nanoseconds = [&](const std::vector<const char*>& v) -> int64_t {
         constexpr double NANOSECONDS = 1e9;
         auto node = document.FindMember("cluster");
@@ -195,6 +205,12 @@ static void export_fdb_status_details(const std::string& 
status_str) {
 
     // Backup and DR
 
+    // Performance Limited By
+    // invalid or not-workload, the final value is -1
+    int64_t performance_val =
+            get_string_value({"qos", "performance_limited_by", "name"}) == 
"workload" ? 0 : -1;
+    g_bvar_fdb_performance_limited_by_name.set_value(performance_val);
+
     // Client Count
     g_bvar_fdb_client_count.set_value(get_value({"clients", "count"}));
 
diff --git a/cloud/src/meta-service/CMakeLists.txt 
b/cloud/src/meta-service/CMakeLists.txt
index 3623d99713e..4c5aa3c26b2 100644
--- a/cloud/src/meta-service/CMakeLists.txt
+++ b/cloud/src/meta-service/CMakeLists.txt
@@ -26,6 +26,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/meta-service")
 add_library(MetaService
     meta_server.cpp
     meta_service.cpp
+    meta_service_rate_limit_helper.cpp
     meta_service_http.cpp
     injection_point_http.cpp
     meta_service_job.cpp
diff --git a/cloud/src/meta-service/meta_service_helper.h 
b/cloud/src/meta-service/meta_service_helper.h
index 2f9ebd4fcc2..3ee903e3dfd 100644
--- a/cloud/src/meta-service/meta_service_helper.h
+++ b/cloud/src/meta-service/meta_service_helper.h
@@ -34,6 +34,7 @@
 #include "common/stopwatch.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
+#include "meta-service/meta_service_rate_limit_helper.h"
 #include "meta-store/keys.h"
 #include "meta-store/txn_kv.h"
 #include "meta-store/txn_kv_error.h"
@@ -311,6 +312,21 @@ inline MetaServiceCode cast_as(TxnErrorCode code) {
     [[maybe_unused]] std::string instance_id;                                  
               \
     [[maybe_unused]] bool drop_request = false;                                
               \
     [[maybe_unused]] KVStats stats;                                            
               \
+    [[maybe_unused]] MsStressDecision ms_stress_decision;                      
               \
+    if (config::enable_ms_rate_limit || 
config::enable_ms_rate_limit_injection) {             \
+        ms_stress_decision = get_ms_stress_decision();                         
               \
+    }                                                                          
               \
+    if ((config::enable_ms_rate_limit || 
config::enable_ms_rate_limit_injection) &&           \
+        RpcRateLimitWhitelist::instance().should_rate_limit(#func_name) &&     
               \
+        ms_stress_decision.under_great_stress()) {                             
               \
+        drop_request = true;                                                   
               \
+        code = MetaServiceCode::MS_TOO_BUSY;                                   
               \
+        msg = ms_stress_decision.debug_string();                               
               \
+        response->mutable_status()->set_code(code);                            
               \
+        response->mutable_status()->set_msg(msg);                              
               \
+        finish_rpc(#func_name, ctrl, request, response);                       
               \
+        return;                                                                
               \
+    }                                                                          
               \
     DORIS_CLOUD_DEFER {                                                        
               \
         response->mutable_status()->set_code(code);                            
               \
         response->mutable_status()->set_msg(msg);                              
               \
diff --git a/cloud/src/meta-service/meta_service_rate_limit_helper.cpp 
b/cloud/src/meta-service/meta_service_rate_limit_helper.cpp
new file mode 100644
index 00000000000..0f8c750be6c
--- /dev/null
+++ b/cloud/src/meta-service/meta_service_rate_limit_helper.cpp
@@ -0,0 +1,843 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "meta-service/meta_service_rate_limit_helper.h"
+
+#include <fmt/format.h>
+#include <sched.h>
+#include <sys/resource.h>
+
+#include <algorithm>
+#include <atomic>
+#include <bit>
+#include <chrono>
+#include <condition_variable>
+#include <cstdint>
+#include <deque>
+#include <filesystem>
+#include <fstream>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <random>
+#include <sstream>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include "common/config.h"
+#include "common/logging.h"
+
+namespace doris::cloud {
+namespace internal {
+namespace {
+constexpr std::string_view kProcSelfCgroupPath = "/proc/self/cgroup";
+constexpr std::string_view kProcSelfMountInfoPath = "/proc/self/mountinfo";
+constexpr std::string_view kCgroupRootPath = "/sys/fs/cgroup";
+} // namespace
+
+struct CgroupMemoryInfo {
+    int64_t limit_bytes;
+    int64_t usage_bytes;
+};
+
+std::optional<std::string> read_first_line(const std::filesystem::path& path) {
+    std::ifstream stream(path);
+    if (!stream.is_open()) {
+        return std::nullopt;
+    }
+    std::string line;
+    std::getline(stream, line);
+    if (stream.fail() || stream.bad()) {
+        return std::nullopt;
+    }
+    return line;
+}
+
+std::optional<int64_t> read_int64_line(const std::filesystem::path& path) {
+    auto line = read_first_line(path);
+    if (!line.has_value() || line->empty()) {
+        return std::nullopt;
+    }
+    try {
+        return std::stoll(*line);
+    } catch (...) {
+        return std::nullopt;
+    }
+}
+
+std::unordered_map<std::string, int64_t> read_metrics_map(const 
std::filesystem::path& path) {
+    std::unordered_map<std::string, int64_t> metrics;
+    std::ifstream stream(path);
+    if (!stream.is_open()) {
+        return metrics;
+    }
+
+    std::string key;
+    int64_t value = 0;
+    while (stream >> key >> value) {
+        metrics[key] = value;
+    }
+    return metrics;
+}
+
+std::vector<std::string> split(std::string_view text, char delimiter) {
+    std::vector<std::string> parts;
+    size_t start = 0;
+    while (start <= text.size()) {
+        size_t end = text.find(delimiter, start);
+        if (end == std::string_view::npos) {
+            end = text.size();
+        }
+        parts.emplace_back(text.substr(start, end - start));
+        start = end + 1;
+    }
+    return parts;
+}
+
+bool cgroups_v2_enabled() {
+    return std::filesystem::exists(std::filesystem::path(kCgroupRootPath) / 
"cgroup.controllers");
+}
+
+std::optional<std::string> cgroup_v2_relative_path_of_process() {
+    auto line = read_first_line(std::filesystem::path(kProcSelfCgroupPath));
+    if (!line.has_value() || !line->starts_with("0::")) {
+        return std::nullopt;
+    }
+    return line->substr(3);
+}
+
+std::optional<std::filesystem::path> get_cgroup_v2_dir(const std::string& 
subsystem_file) {
+    if (!cgroups_v2_enabled()) {
+        return std::nullopt;
+    }
+    auto relative_path = cgroup_v2_relative_path_of_process();
+    if (!relative_path.has_value()) {
+        return std::nullopt;
+    }
+
+    const std::filesystem::path cgroup_root(kCgroupRootPath);
+    std::filesystem::path current =
+            cgroup_root / 
relative_path->substr(relative_path->starts_with('/') ? 1 : 0);
+    while (current != cgroup_root.parent_path()) {
+        if (std::filesystem::exists(current / subsystem_file)) {
+            return current;
+        }
+        current = current.parent_path();
+    }
+    return std::nullopt;
+}
+
+std::optional<std::string> get_cgroup_v1_process_path(const std::string& 
subsystem) {
+    std::ifstream stream(kProcSelfCgroupPath.data());
+    if (!stream.is_open()) {
+        return std::nullopt;
+    }
+
+    std::string line;
+    while (std::getline(stream, line)) {
+        auto fields = split(line, ':');
+        if (fields.size() != 3) {
+            continue;
+        }
+        auto controllers = split(fields[1], ',');
+        if (std::find(controllers.begin(), controllers.end(), subsystem) != 
controllers.end()) {
+            return fields[2];
+        }
+    }
+    return std::nullopt;
+}
+
+std::optional<std::pair<std::string, std::string>> get_cgroup_v1_mount(
+        const std::string& subsystem) {
+    std::ifstream stream(kProcSelfMountInfoPath.data());
+    if (!stream.is_open()) {
+        return std::nullopt;
+    }
+
+    std::string line;
+    while (std::getline(stream, line)) {
+        auto separator = line.find(" - ");
+        if (separator == std::string::npos) {
+            continue;
+        }
+        auto left = split(std::string_view(line).substr(0, separator), ' ');
+        auto right = split(std::string_view(line).substr(separator + 3), ' ');
+        if (left.size() < 5 || right.size() < 3 || right[0] != "cgroup") {
+            continue;
+        }
+        auto options = split(right[2], ',');
+        if (std::find(options.begin(), options.end(), subsystem) == 
options.end()) {
+            continue;
+        }
+        std::string system_path = left[3];
+        if (system_path.size() > 1 && system_path.back() == '/') {
+            system_path.pop_back();
+        }
+        return std::make_pair(left[4], system_path);
+    }
+    return std::nullopt;
+}
+
+std::optional<std::filesystem::path> get_cgroup_v1_dir(const std::string& 
subsystem) {
+    auto process_path = get_cgroup_v1_process_path(subsystem);
+    auto mount = get_cgroup_v1_mount(subsystem);
+    if (!process_path.has_value() || !mount.has_value()) {
+        return std::nullopt;
+    }
+
+    const auto& [mount_path, system_path] = *mount;
+    if (!process_path->starts_with(system_path)) {
+        return std::nullopt;
+    }
+
+    std::string absolute = *process_path;
+    absolute.replace(0, system_path.size(), mount_path);
+    return std::filesystem::path(absolute);
+}
+
+int parse_cpuset_cpu_count(std::string_view cpuset_line) {
+    if (cpuset_line.empty()) {
+        return -1;
+    }
+
+    int cpu_count = 0;
+    for (const auto& range : split(cpuset_line, ',')) {
+        if (range.empty()) {
+            return -1;
+        }
+        auto cpu_values = split(range, '-');
+        try {
+            if (cpu_values.size() == 2) {
+                const int start = std::stoi(cpu_values[0]);
+                const int end = std::stoi(cpu_values[1]);
+                if (end < start) {
+                    return -1;
+                }
+                cpu_count += end - start + 1;
+            } else if (cpu_values.size() == 1) {
+                static_cast<void>(std::stoi(cpu_values[0]));
+                cpu_count += 1;
+            } else {
+                return -1;
+            }
+        } catch (...) {
+            return -1;
+        }
+    }
+    return cpu_count;
+}
+
+std::optional<double> parse_cgroup_v2_cpu_limit(std::string_view cpu_max_line) 
{
+    std::istringstream input {std::string(cpu_max_line)};
+    std::string quota;
+    double period = 0;
+    if (!(input >> quota >> period) || quota == "max" || period <= 0) {
+        return std::nullopt;
+    }
+    try {
+        const double quota_value = std::stod(quota);
+        if (quota_value <= 0) {
+            return std::nullopt;
+        }
+        return quota_value / period;
+    } catch (...) {
+        return std::nullopt;
+    }
+}
+
+std::optional<double> parse_cgroup_v1_cpu_limit(int64_t quota_us, int64_t 
period_us) {
+    if (quota_us <= 0 || period_us <= 0) {
+        return std::nullopt;
+    }
+    return static_cast<double>(quota_us) / static_cast<double>(period_us);
+}
+
+double get_process_affinity_cpu_limit() {
+    cpu_set_t cpu_set;
+    CPU_ZERO(&cpu_set);
+    if (sched_getaffinity(0, sizeof(cpu_set), &cpu_set) == 0) {
+        const int cpu_count = CPU_COUNT(&cpu_set);
+        if (cpu_count > 0) {
+            return static_cast<double>(cpu_count);
+        }
+    }
+    const uint32_t fallback = std::max<uint32_t>(1, 
std::thread::hardware_concurrency());
+    return static_cast<double>(fallback);
+}
+
+std::optional<double> get_cgroup_cpu_quota_limit() {
+    if (auto dir = get_cgroup_v2_dir("cpu.max"); dir.has_value()) {
+        const std::filesystem::path cgroup_root(kCgroupRootPath);
+        std::optional<double> limit;
+        auto current = *dir;
+        while (current != cgroup_root.parent_path()) {
+            if (auto line = read_first_line(current / "cpu.max"); 
line.has_value()) {
+                if (auto quota = parse_cgroup_v2_cpu_limit(*line); 
quota.has_value()) {
+                    limit = limit.has_value() ? std::min(*limit, *quota) : 
quota;
+                }
+            }
+            current = current.parent_path();
+        }
+        if (limit.has_value()) {
+            return limit;
+        }
+    }
+
+    if (auto dir = get_cgroup_v1_dir("cpu"); dir.has_value()) {
+        std::optional<double> limit;
+        auto current = *dir;
+        while (current != current.parent_path()) {
+            auto quota = read_int64_line(current / "cpu.cfs_quota_us");
+            auto period = read_int64_line(current / "cpu.cfs_period_us");
+            if (quota.has_value() && period.has_value()) {
+                if (auto parsed = parse_cgroup_v1_cpu_limit(*quota, *period); 
parsed.has_value()) {
+                    limit = limit.has_value() ? std::min(*limit, *parsed) : 
parsed;
+                }
+            }
+            current = current.parent_path();
+        }
+        if (limit.has_value()) {
+            return limit;
+        }
+    }
+
+    return std::nullopt;
+}
+
+double get_effective_process_cpu_limit() {
+    double limit = get_process_affinity_cpu_limit();
+    if (auto quota = get_cgroup_cpu_quota_limit(); quota.has_value() && *quota 
> 0) {
+        limit = std::min(limit, *quota);
+    }
+    return std::max(0.001, limit);
+}
+
+int64_t calculate_usage_percent(int64_t usage_bytes, int64_t limit_bytes) {
+    if (usage_bytes < 0 || limit_bytes <= 0) {
+        return -1;
+    }
+    return static_cast<int64_t>(static_cast<double>(usage_bytes) * 100.0 /
+                                static_cast<double>(limit_bytes));
+}
+
+int64_t calculate_cpu_usage_percent(double delta_cpu_ns, double delta_wall_ns, 
double cpu_limit) {
+    if (delta_cpu_ns < 0 || delta_wall_ns <= 0 || cpu_limit <= 0) {
+        return -1;
+    }
+    return static_cast<int64_t>(delta_cpu_ns * 100.0 / delta_wall_ns / 
cpu_limit);
+}
+
+std::optional<CgroupMemoryInfo> get_cgroup_memory_info() {
+    if (auto dir = get_cgroup_v2_dir("memory.current"); dir.has_value()) {
+        auto limit_line = read_first_line(*dir / "memory.max");
+        auto usage = read_int64_line(*dir / "memory.current");
+        if (limit_line.has_value() && usage.has_value()) {
+            int64_t limit_bytes = std::numeric_limits<int64_t>::max();
+            if (*limit_line != "max") {
+                try {
+                    limit_bytes = std::stoll(*limit_line);
+                } catch (...) {
+                    return std::nullopt;
+                }
+            }
+            auto metrics = read_metrics_map(*dir / "memory.stat");
+            int64_t adjusted_usage = *usage;
+            adjusted_usage -= metrics["inactive_file"];
+            adjusted_usage -= metrics["slab_reclaimable"];
+            adjusted_usage = std::max<int64_t>(0, adjusted_usage);
+            return CgroupMemoryInfo {limit_bytes, adjusted_usage};
+        }
+    }
+
+    if (auto dir = get_cgroup_v1_dir("memory"); dir.has_value()) {
+        auto limit = read_int64_line(*dir / "memory.limit_in_bytes");
+        if (limit.has_value()) {
+            auto metrics = read_metrics_map(*dir / "memory.stat");
+            return CgroupMemoryInfo {*limit, metrics["rss"]};
+        }
+    }
+
+    return std::nullopt;
+}
+} // namespace internal
+
+namespace {
+constexpr int64_t kNanosecondsPerMillisecond = 1000 * 1000;
+constexpr int64_t kInvalidPercent = -1;
+
+struct WindowSample {
+    int64_t second {0};
+    int64_t fdb_client_thread_busyness_percent {BVAR_FDB_INVALID_VALUE};
+    int64_t ms_cpu_usage_percent {kInvalidPercent};
+    int64_t ms_memory_usage_percent {kInvalidPercent};
+};
+
+struct ProcessResourceSample {
+    int64_t cpu_usage_percent {kInvalidPercent};
+    int64_t memory_usage_percent {kInvalidPercent};
+};
+
+class LatestDecisionStorage {
+public:
+    void store(const MsStressDecision& decision) {
+        version_.fetch_add(1, std::memory_order_acq_rel);
+
+        fdb_cluster_under_pressure_.store(decision.fdb_cluster_under_pressure,
+                                          std::memory_order_relaxed);
+        
fdb_client_thread_under_pressure_.store(decision.fdb_client_thread_under_pressure,
+                                                std::memory_order_relaxed);
+        ms_resource_under_pressure_.store(decision.ms_resource_under_pressure,
+                                          std::memory_order_relaxed);
+        
rate_limit_injected_for_test_.store(decision.rate_limit_injected_for_test,
+                                            std::memory_order_relaxed);
+        fdb_commit_latency_ns_.store(decision.fdb_commit_latency_ns, 
std::memory_order_relaxed);
+        fdb_read_latency_ns_.store(decision.fdb_read_latency_ns, 
std::memory_order_relaxed);
+        
fdb_performance_limited_by_name_.store(decision.fdb_performance_limited_by_name,
+                                               std::memory_order_relaxed);
+        
fdb_client_thread_busyness_percent_.store(decision.fdb_client_thread_busyness_percent,
+                                                  std::memory_order_relaxed);
+        fdb_client_thread_busyness_avg_percent_bits_.store(
+                encode_double(decision.fdb_client_thread_busyness_avg_percent),
+                std::memory_order_relaxed);
+        ms_cpu_usage_percent_.store(decision.ms_cpu_usage_percent, 
std::memory_order_relaxed);
+        
ms_cpu_usage_avg_percent_bits_.store(encode_double(decision.ms_cpu_usage_avg_percent),
+                                             std::memory_order_relaxed);
+        ms_memory_usage_percent_.store(decision.ms_memory_usage_percent, 
std::memory_order_relaxed);
+        
ms_memory_usage_avg_percent_bits_.store(encode_double(decision.ms_memory_usage_avg_percent),
+                                                std::memory_order_relaxed);
+        
rate_limit_injected_random_value_.store(decision.rate_limit_injected_random_value,
+                                                std::memory_order_relaxed);
+
+        version_.fetch_add(1, std::memory_order_release);
+    }
+
+    MsStressDecision load() const {
+        MsStressDecision decision;
+        while (true) {
+            const uint64_t version_before = 
version_.load(std::memory_order_acquire);
+            if ((version_before & 1) != 0) {
+                continue;
+            }
+
+            decision.fdb_cluster_under_pressure =
+                    
fdb_cluster_under_pressure_.load(std::memory_order_relaxed);
+            decision.fdb_client_thread_under_pressure =
+                    
fdb_client_thread_under_pressure_.load(std::memory_order_relaxed);
+            decision.ms_resource_under_pressure =
+                    
ms_resource_under_pressure_.load(std::memory_order_relaxed);
+            decision.rate_limit_injected_for_test =
+                    
rate_limit_injected_for_test_.load(std::memory_order_relaxed);
+            decision.fdb_commit_latency_ns = 
fdb_commit_latency_ns_.load(std::memory_order_relaxed);
+            decision.fdb_read_latency_ns = 
fdb_read_latency_ns_.load(std::memory_order_relaxed);
+            decision.fdb_performance_limited_by_name =
+                    
fdb_performance_limited_by_name_.load(std::memory_order_relaxed);
+            decision.fdb_client_thread_busyness_percent =
+                    
fdb_client_thread_busyness_percent_.load(std::memory_order_relaxed);
+            decision.fdb_client_thread_busyness_avg_percent = decode_double(
+                    
fdb_client_thread_busyness_avg_percent_bits_.load(std::memory_order_relaxed));
+            decision.ms_cpu_usage_percent = 
ms_cpu_usage_percent_.load(std::memory_order_relaxed);
+            decision.ms_cpu_usage_avg_percent =
+                    
decode_double(ms_cpu_usage_avg_percent_bits_.load(std::memory_order_relaxed));
+            decision.ms_memory_usage_percent =
+                    ms_memory_usage_percent_.load(std::memory_order_relaxed);
+            decision.ms_memory_usage_avg_percent = decode_double(
+                    
ms_memory_usage_avg_percent_bits_.load(std::memory_order_relaxed));
+            decision.rate_limit_injected_random_value =
+                    
rate_limit_injected_random_value_.load(std::memory_order_relaxed);
+
+            const uint64_t version_after = 
version_.load(std::memory_order_acquire);
+            if (version_before == version_after) {
+                return decision;
+            }
+        }
+    }
+
+private:
+    static constexpr uint64_t encode_double(double value) { return 
std::bit_cast<uint64_t>(value); }
+
+    static constexpr double decode_double(uint64_t bits) { return 
std::bit_cast<double>(bits); }
+
+    std::atomic<uint64_t> version_ {0};
+    std::atomic<bool> fdb_cluster_under_pressure_ {false};
+    std::atomic<bool> fdb_client_thread_under_pressure_ {false};
+    std::atomic<bool> ms_resource_under_pressure_ {false};
+    std::atomic<bool> rate_limit_injected_for_test_ {false};
+    std::atomic<int64_t> fdb_commit_latency_ns_ {BVAR_FDB_INVALID_VALUE};
+    std::atomic<int64_t> fdb_read_latency_ns_ {BVAR_FDB_INVALID_VALUE};
+    std::atomic<int64_t> fdb_performance_limited_by_name_ 
{BVAR_FDB_INVALID_VALUE};
+    std::atomic<int64_t> fdb_client_thread_busyness_percent_ 
{BVAR_FDB_INVALID_VALUE};
+    std::atomic<uint64_t> fdb_client_thread_busyness_avg_percent_bits_ 
{encode_double(-1)};
+    std::atomic<int64_t> ms_cpu_usage_percent_ {-1};
+    std::atomic<uint64_t> ms_cpu_usage_avg_percent_bits_ {encode_double(-1)};
+    std::atomic<int64_t> ms_memory_usage_percent_ {-1};
+    std::atomic<uint64_t> ms_memory_usage_avg_percent_bits_ 
{encode_double(-1)};
+    std::atomic<int32_t> rate_limit_injected_random_value_ {-1};
+};
+
+class ProcessResourceSampler {
+public:
+    ProcessResourceSample sample() {
+        using namespace std::chrono;
+
+        const auto now = steady_clock::now();
+        const int64_t current_cpu_time_ns = get_process_cpu_time_ns();
+        ProcessResourceSample sample;
+        sample.memory_usage_percent = get_process_memory_usage_percent();
+
+        const auto current_wall_time_ns =
+                duration_cast<nanoseconds>(now.time_since_epoch()).count();
+        std::lock_guard lock(mutex_);
+        if (last_cpu_time_ns_ != kInvalidPercent && current_cpu_time_ns != 
kInvalidPercent &&
+            current_wall_time_ns > last_wall_time_ns_) {
+            const double delta_cpu_ns = current_cpu_time_ns - 
last_cpu_time_ns_;
+            const double delta_wall_ns = current_wall_time_ns - 
last_wall_time_ns_;
+            sample.cpu_usage_percent = internal::calculate_cpu_usage_percent(
+                    delta_cpu_ns, delta_wall_ns, 
internal::get_effective_process_cpu_limit());
+        }
+        last_cpu_time_ns_ = current_cpu_time_ns;
+        last_wall_time_ns_ = current_wall_time_ns;
+        return sample;
+    }
+
+private:
+    static int64_t get_process_cpu_time_ns() {
+        rusage usage {};
+        if (getrusage(RUSAGE_SELF, &usage) != 0) {
+            return kInvalidPercent;
+        }
+        return usage.ru_utime.tv_sec * 1000L * 1000 * 1000 + 
usage.ru_utime.tv_usec * 1000L +
+               usage.ru_stime.tv_sec * 1000L * 1000 * 1000 + 
usage.ru_stime.tv_usec * 1000L;
+    }
+
+    static int64_t get_process_memory_usage_percent() {
+        if (auto cgroup_memory = internal::get_cgroup_memory_info(); 
cgroup_memory.has_value()) {
+            if (cgroup_memory->limit_bytes > 0 &&
+                cgroup_memory->limit_bytes < 
std::numeric_limits<int64_t>::max()) {
+                return 
internal::calculate_usage_percent(cgroup_memory->usage_bytes,
+                                                         
cgroup_memory->limit_bytes);
+            }
+        }
+
+        return kInvalidPercent;
+    }
+
+    std::mutex mutex_;
+    int64_t last_cpu_time_ns_ {kInvalidPercent};
+    int64_t last_wall_time_ns_ {0};
+};
+
+MsStressMetrics collect_ms_stress_metrics(ProcessResourceSampler* sampler) {
+    MsStressMetrics metrics;
+    metrics.fdb_commit_latency_ns = 
g_bvar_fdb_latency_probe_commit_ns.get_value();
+    metrics.fdb_read_latency_ns = g_bvar_fdb_latency_probe_read_ns.get_value();
+    metrics.fdb_performance_limited_by_name = 
g_bvar_fdb_performance_limited_by_name.get_value();
+    metrics.fdb_client_thread_busyness_percent =
+            g_bvar_fdb_client_thread_busyness_percent.get_value();
+    const auto resource_sample = sampler->sample();
+    metrics.ms_cpu_usage_percent = resource_sample.cpu_usage_percent;
+    metrics.ms_memory_usage_percent = resource_sample.memory_usage_percent;
+    return metrics;
+}
+
+class MsStressDetector {
+public:
+    ~MsStressDetector() { stop(); }
+
+    // Compute decision from metrics and store it in latest_decision_.
+    // Called by the background thread or synchronously in tests.
+    void update(int64_t now_ms, const MsStressMetrics& metrics) {
+        MsStressDecision decision;
+        decision.fdb_commit_latency_ns = metrics.fdb_commit_latency_ns;
+        decision.fdb_read_latency_ns = metrics.fdb_read_latency_ns;
+        decision.fdb_performance_limited_by_name = 
metrics.fdb_performance_limited_by_name;
+        decision.fdb_client_thread_busyness_percent = 
metrics.fdb_client_thread_busyness_percent;
+        decision.ms_cpu_usage_percent = metrics.ms_cpu_usage_percent;
+        decision.ms_memory_usage_percent = metrics.ms_memory_usage_percent;
+        const bool commit_latency_high =
+                metrics.fdb_commit_latency_ns != BVAR_FDB_INVALID_VALUE &&
+                metrics.fdb_commit_latency_ns >
+                        config::ms_rate_limit_fdb_commit_latency_ms * 
kNanosecondsPerMillisecond;
+        const bool read_latency_high =
+                metrics.fdb_read_latency_ns != BVAR_FDB_INVALID_VALUE &&
+                metrics.fdb_read_latency_ns >
+                        config::ms_rate_limit_fdb_read_latency_ms * 
kNanosecondsPerMillisecond;
+        decision.fdb_cluster_under_pressure =
+                (commit_latency_high || read_latency_high) &&
+                metrics.fdb_performance_limited_by_name != 
BVAR_FDB_INVALID_VALUE &&
+                metrics.fdb_performance_limited_by_name != 0;
+
+        const int64_t current_second = now_ms / 1000;
+        // No mutex needed: update() is only called from a single thread
+        // (background thread in production, test thread in tests).
+        record_sample(current_second, metrics);
+
+        const double avg_busyness =
+                get_window_avg(current_second, 
&WindowSample::fdb_client_thread_busyness_percent,
+                               BVAR_FDB_INVALID_VALUE);
+        decision.fdb_client_thread_busyness_avg_percent = avg_busyness;
+        if (avg_busyness >= 0 &&
+            metrics.fdb_client_thread_busyness_percent != 
BVAR_FDB_INVALID_VALUE) {
+            decision.fdb_client_thread_under_pressure =
+                    avg_busyness > 
config::ms_rate_limit_fdb_client_thread_busyness_avg_percent &&
+                    metrics.fdb_client_thread_busyness_percent >
+                            
config::ms_rate_limit_fdb_client_thread_busyness_instant_percent;
+        }
+
+        const double avg_cpu = get_window_avg(current_second, 
&WindowSample::ms_cpu_usage_percent,
+                                              kInvalidPercent);
+        const double avg_memory = get_window_avg(
+                current_second, &WindowSample::ms_memory_usage_percent, 
kInvalidPercent);
+        decision.ms_cpu_usage_avg_percent = avg_cpu;
+        decision.ms_memory_usage_avg_percent = avg_memory;
+        if (avg_cpu >= 0 && metrics.ms_cpu_usage_percent != kInvalidPercent) {
+            decision.ms_resource_under_pressure =
+                    metrics.ms_cpu_usage_percent > 
config::ms_rate_limit_cpu_usage_percent &&
+                    avg_cpu > config::ms_rate_limit_cpu_usage_percent;
+        }
+        if (avg_memory >= 0 && metrics.ms_memory_usage_percent != 
kInvalidPercent) {
+            decision.ms_resource_under_pressure =
+                    decision.ms_resource_under_pressure ||
+                    (metrics.ms_memory_usage_percent > 
config::ms_rate_limit_memory_usage_percent &&
+                     avg_memory > config::ms_rate_limit_memory_usage_percent);
+        }
+        latest_decision_.store(decision);
+    }
+
+    MsStressDecision get_latest_decision() const { return 
latest_decision_.load(); }
+
+    void reset() {
+        samples_.clear();
+        latest_decision_.store(MsStressDecision {});
+    }
+
+    // Start the background thread that periodically collects metrics and 
updates.
+    void start() {
+        if (running_.load() != 0) {
+            return;
+        }
+
+        std::unique_lock lock(mtx_);
+        if (running_.load() != 0) {
+            return;
+        }
+        running_.store(1);
+        bg_thread_ = std::make_unique<std::thread>([this] {
+            pthread_setname_np(pthread_self(), "ms_stress_det");
+            LOG(INFO) << "MsStressDetector background thread started";
+            ProcessResourceSampler sampler;
+            while (running_.load() == 1) {
+                const auto now_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                            
std::chrono::steady_clock::now().time_since_epoch())
+                                            .count();
+                const auto metrics = collect_ms_stress_metrics(&sampler);
+                update(now_ms, metrics);
+                std::unique_lock l(mtx_);
+                cv_.wait_for(l, std::chrono::seconds(1), [this]() { return 
running_.load() != 1; });
+            }
+            LOG(INFO) << "MsStressDetector background thread stopped";
+        });
+    }
+
+    void stop() {
+        {
+            std::unique_lock lock(mtx_);
+            if (running_.load() != 1) {
+                return;
+            }
+            running_.store(2);
+            cv_.notify_all();
+        }
+        if (bg_thread_ && bg_thread_->joinable()) {
+            bg_thread_->join();
+            bg_thread_.reset();
+        }
+    }
+
+private:
+    using SampleField = int64_t WindowSample::*;
+
+    void record_sample(int64_t current_second, const MsStressMetrics& metrics) 
{
+        WindowSample sample;
+        sample.second = current_second;
+        sample.fdb_client_thread_busyness_percent = 
metrics.fdb_client_thread_busyness_percent;
+        sample.ms_cpu_usage_percent = metrics.ms_cpu_usage_percent;
+        sample.ms_memory_usage_percent = metrics.ms_memory_usage_percent;
+        if (!samples_.empty() && samples_.back().second == current_second) {
+            samples_.back() = sample;
+        } else {
+            samples_.push_back(sample);
+        }
+
+        const int64_t window_start =
+                current_second - std::max<int64_t>(1, 
config::ms_rate_limit_window_seconds) + 1;
+        while (!samples_.empty() && samples_.front().second < window_start) {
+            samples_.pop_front();
+        }
+    }
+
+    double get_window_avg(int64_t current_second, SampleField field, int64_t 
invalid_value) const {
+        if (samples_.empty()) {
+            return -1;
+        }
+        const int64_t required_span =
+                std::max<int64_t>(1, config::ms_rate_limit_window_seconds) - 1;
+        if (samples_.back().second != current_second ||
+            current_second - samples_.front().second < required_span) {
+            return -1;
+        }
+
+        double sum = 0;
+        int64_t valid_count = 0;
+        for (const auto& sample : samples_) {
+            if (sample.*field == invalid_value) {
+                continue;
+            }
+            sum += sample.*field;
+            ++valid_count;
+        }
+        if (valid_count == 0) {
+            return -1;
+        }
+        return sum / valid_count;
+    }
+
+    LatestDecisionStorage latest_decision_;
+    std::deque<WindowSample> samples_;
+
+    // Background thread lifecycle
+    std::atomic<int> running_ {0};
+    mutable std::mutex mtx_;
+    std::condition_variable cv_;
+    std::unique_ptr<std::thread> bg_thread_;
+};
+
+MsStressDetector& global_ms_stress_detector() {
+    static MsStressDetector detector;
+    // Auto-start background thread on first access.
+    // start() is idempotent: subsequent calls are no-ops.
+    detector.start();
+    return detector;
+}
+
+int32_t get_ms_rate_limit_injection_random_value() {
+    thread_local std::mt19937 gen(std::random_device {}());
+    thread_local std::uniform_int_distribution<int32_t> dist(0, 99);
+    return dist(gen);
+}
+
+void maybe_apply_ms_rate_limit_injection(MsStressDecision* decision, int32_t 
random_value) {
+    if (!config::enable_ms_rate_limit_injection) {
+        return;
+    }
+    if (random_value < 0 || random_value >= 
config::ms_rate_limit_injection_probability) {
+        return;
+    }
+    decision->rate_limit_injected_for_test = true;
+    decision->rate_limit_injected_random_value = random_value;
+}
+} // namespace
+
+std::string MsStressDecision::debug_string() const {
+    if (!under_great_stress()) {
+        return "meta service rate limited: no stress condition matched";
+    }
+
+    std::vector<std::string> reasons;
+    if (fdb_cluster_under_pressure) {
+        reasons.push_back(fmt::format(
+                "fdb_cluster(commit_latency_ms={}, read_latency_ms={}, 
performance_limited_by={})",
+                fdb_commit_latency_ns == BVAR_FDB_INVALID_VALUE ? -1
+                                                                : 
fdb_commit_latency_ns / 1000000,
+                fdb_read_latency_ns == BVAR_FDB_INVALID_VALUE ? -1 : 
fdb_read_latency_ns / 1000000,
+                fdb_performance_limited_by_name));
+    }
+    if (fdb_client_thread_under_pressure) {
+        reasons.push_back(fmt::format(
+                "fdb_client_thread(busyness_avg={:.2f}%, busyness_instant={}%, 
thresholds=avg>{}% "
+                "and instant>{}%)",
+                fdb_client_thread_busyness_avg_percent, 
fdb_client_thread_busyness_percent,
+                config::ms_rate_limit_fdb_client_thread_busyness_avg_percent,
+                
config::ms_rate_limit_fdb_client_thread_busyness_instant_percent));
+    }
+    if (ms_resource_under_pressure) {
+        reasons.push_back(
+                fmt::format("ms_resource(cpu_current={}%, cpu_avg={:.2f}%, 
memory_current={}%, "
+                            "memory_avg={:.2f}%, thresholds=cpu>{}% or 
memory>{}%)",
+                            ms_cpu_usage_percent, ms_cpu_usage_avg_percent, 
ms_memory_usage_percent,
+                            ms_memory_usage_avg_percent, 
config::ms_rate_limit_cpu_usage_percent,
+                            config::ms_rate_limit_memory_usage_percent));
+    }
+    if (rate_limit_injected_for_test) {
+        reasons.push_back(fmt::format("test_injection(random_value={}, 
probability<{}%)",
+                                      rate_limit_injected_random_value,
+                                      
config::ms_rate_limit_injection_probability));
+    }
+    return fmt::format("meta service rate limited by {}", fmt::join(reasons, 
"; "));
+}
+
+MsStressDecision get_ms_stress_decision() {
+    MsStressDecision decision = 
global_ms_stress_detector().get_latest_decision();
+    // Rate limit injection is per-request (random), so apply it here, not in 
the background thread.
+    maybe_apply_ms_rate_limit_injection(&decision, 
get_ms_rate_limit_injection_random_value());
+    return decision;
+}
+
+MsStressDecision update_ms_stress_detector_for_test(int64_t now_ms, const 
MsStressMetrics& metrics,
+                                                    bool reset,
+                                                    int32_t 
rate_limit_injected_random_value) {
+    // Separate detector instance for tests — no background thread, 
synchronous updates.
+    static MsStressDetector detector;
+    if (reset) {
+        detector.reset();
+    }
+    detector.update(now_ms, metrics);
+    MsStressDecision decision = detector.get_latest_decision();
+    maybe_apply_ms_rate_limit_injection(&decision, 
rate_limit_injected_random_value);
+    return decision;
+}
+
+RpcRateLimitWhitelist& RpcRateLimitWhitelist::instance() {
+    static RpcRateLimitWhitelist inst;
+    static std::once_flag init_flag;
+    std::call_once(init_flag, []() {
+        inst.set_whitelist({"prepare_rowset", "commit_rowset", 
"update_tmp_rowset",
+                            "update_delete_bitmap", 
"update_packed_file_info"});
+    });
+    return inst;
+}
+
+bool RpcRateLimitWhitelist::should_rate_limit(const std::string& rpc_name) 
const {
+    std::lock_guard lock(mutex_);
+    return whitelist_.empty() || whitelist_.contains(rpc_name);
+}
+
+void RpcRateLimitWhitelist::set_whitelist(const std::vector<std::string>& 
rpcs) {
+    std::lock_guard lock(mutex_);
+    whitelist_.clear();
+    whitelist_.insert(rpcs.begin(), rpcs.end());
+}
+
+std::vector<std::string> RpcRateLimitWhitelist::get_whitelist() const {
+    std::lock_guard lock(mutex_);
+    return {whitelist_.begin(), whitelist_.end()};
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service_rate_limit_helper.h 
b/cloud/src/meta-service/meta_service_rate_limit_helper.h
new file mode 100644
index 00000000000..365c53e9aaa
--- /dev/null
+++ b/cloud/src/meta-service/meta_service_rate_limit_helper.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <mutex>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/bvars.h"
+
+namespace doris::cloud {
+
+struct MsStressMetrics {
+    int64_t fdb_commit_latency_ns {BVAR_FDB_INVALID_VALUE};
+    int64_t fdb_read_latency_ns {BVAR_FDB_INVALID_VALUE};
+    int64_t fdb_performance_limited_by_name {BVAR_FDB_INVALID_VALUE};
+    int64_t fdb_client_thread_busyness_percent {BVAR_FDB_INVALID_VALUE};
+    int64_t ms_cpu_usage_percent {-1};
+    int64_t ms_memory_usage_percent {-1};
+};
+
+struct MsStressDecision {
+    bool fdb_cluster_under_pressure {false};
+    bool fdb_client_thread_under_pressure {false};
+    bool ms_resource_under_pressure {false};
+    bool rate_limit_injected_for_test {false};
+    int64_t fdb_commit_latency_ns {BVAR_FDB_INVALID_VALUE};
+    int64_t fdb_read_latency_ns {BVAR_FDB_INVALID_VALUE};
+    int64_t fdb_performance_limited_by_name {BVAR_FDB_INVALID_VALUE};
+    int64_t fdb_client_thread_busyness_percent {BVAR_FDB_INVALID_VALUE};
+    double fdb_client_thread_busyness_avg_percent {-1};
+    int64_t ms_cpu_usage_percent {-1};
+    double ms_cpu_usage_avg_percent {-1};
+    int64_t ms_memory_usage_percent {-1};
+    double ms_memory_usage_avg_percent {-1};
+    int32_t rate_limit_injected_random_value {-1};
+
+    [[nodiscard]] bool under_great_stress() const {
+        return fdb_cluster_under_pressure || fdb_client_thread_under_pressure 
||
+               ms_resource_under_pressure || rate_limit_injected_for_test;
+    }
+
+    [[nodiscard]] std::string debug_string() const;
+};
+
+MsStressDecision get_ms_stress_decision();
+MsStressDecision update_ms_stress_detector_for_test(int64_t now_ms, const 
MsStressMetrics& metrics,
+                                                    bool reset = false,
+                                                    int32_t 
rate_limit_injected_random_value = -1);
+
+class RpcRateLimitWhitelist {
+public:
+    static RpcRateLimitWhitelist& instance();
+    bool should_rate_limit(const std::string& rpc_name) const;
+    void set_whitelist(const std::vector<std::string>& rpcs);
+    std::vector<std::string> get_whitelist() const;
+
+private:
+    mutable std::mutex mutex_;
+    std::unordered_set<std::string> whitelist_;
+};
+
+} // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 0d40bd5c2d6..7ed83887e10 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -56,6 +56,7 @@ add_executable(meta_reader_test meta_reader_test.cpp)
 
 add_executable(meta_service_test
    meta_service_test.cpp
+   meta_service_helper_test.cpp
    meta_service_job_test.cpp
    meta_service_http_test.cpp
    meta_service_operation_log_test.cpp
@@ -223,4 +224,3 @@ install(FILES
     GROUP_READ GROUP_WRITE GROUP_EXECUTE
     WORLD_READ WORLD_EXECUTE
     DESTINATION ${BUILD_DIR}/test)
-
diff --git a/cloud/test/meta_service_helper_test.cpp 
b/cloud/test/meta_service_helper_test.cpp
new file mode 100644
index 00000000000..73b3d37de5b
--- /dev/null
+++ b/cloud/test/meta_service_helper_test.cpp
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <limits>
+#include <optional>
+#include <string_view>
+
+#include "common/config.h"
+#include "meta-service/meta_service_rate_limit_helper.h"
+
+namespace doris::cloud {
+namespace internal {
+int parse_cpuset_cpu_count(std::string_view cpuset_line);
+std::optional<double> parse_cgroup_v2_cpu_limit(std::string_view cpu_max_line);
+std::optional<double> parse_cgroup_v1_cpu_limit(int64_t quota_us, int64_t 
period_us);
+int64_t calculate_usage_percent(int64_t usage_bytes, int64_t limit_bytes);
+int64_t calculate_cpu_usage_percent(double delta_cpu_ns, double delta_wall_ns, 
double cpu_limit);
+} // namespace internal
+
+namespace {
+struct MsRateLimitInjectionConfigGuard {
+    ~MsRateLimitInjectionConfigGuard() {
+        config::enable_ms_rate_limit_injection = original_enable;
+        config::ms_rate_limit_injection_probability = original_probability;
+    }
+
+    bool original_enable {config::enable_ms_rate_limit_injection};
+    int32_t original_probability {config::ms_rate_limit_injection_probability};
+};
+} // namespace
+
+TEST(MetaServiceHelperTest, FdbClusterPressureNeedsLatencyAndNonWorkload) {
+    MsStressMetrics metrics;
+    metrics.fdb_commit_latency_ns = 51L * 1000 * 1000;
+    metrics.fdb_performance_limited_by_name = -1;
+
+    auto decision = update_ms_stress_detector_for_test(0, metrics, true);
+    ASSERT_TRUE(decision.fdb_cluster_under_pressure);
+    ASSERT_TRUE(decision.under_great_stress());
+    std::cout << decision.debug_string() << std::endl;
+    ASSERT_NE(decision.debug_string().find("fdb_cluster"), std::string::npos);
+
+    metrics.fdb_performance_limited_by_name = 0;
+    decision = update_ms_stress_detector_for_test(1000, metrics, true);
+    ASSERT_FALSE(decision.fdb_cluster_under_pressure);
+    ASSERT_FALSE(decision.under_great_stress());
+}
+
+TEST(MetaServiceHelperTest, 
FdbClientThreadPressureNeedsWindowAverageAndInstantValue) {
+    MsStressMetrics metrics;
+    for (int second = 0; second < 60; ++second) {
+        metrics.fdb_client_thread_busyness_percent = 71;
+        auto decision = update_ms_stress_detector_for_test(second * 1000, 
metrics, second == 0);
+        ASSERT_FALSE(decision.fdb_client_thread_under_pressure);
+    }
+
+    metrics.fdb_client_thread_busyness_percent = 91;
+    auto decision = update_ms_stress_detector_for_test(60 * 1000, metrics);
+    ASSERT_TRUE(decision.fdb_client_thread_under_pressure);
+    ASSERT_TRUE(decision.under_great_stress());
+    std::cout << decision.debug_string() << std::endl;
+    ASSERT_NE(decision.debug_string().find("fdb_client_thread"), 
std::string::npos);
+}
+
+TEST(MetaServiceHelperTest, 
MsResourcePressureNeedsCurrentAndWindowAverageHigh) {
+    MsStressMetrics metrics;
+    for (int second = 0; second < 59; ++second) {
+        metrics.ms_cpu_usage_percent = 96;
+        auto decision = update_ms_stress_detector_for_test(second * 1000, 
metrics, second == 0);
+        ASSERT_FALSE(decision.ms_resource_under_pressure);
+    }
+
+    metrics.ms_cpu_usage_percent = 96;
+    auto decision = update_ms_stress_detector_for_test(59 * 1000, metrics);
+    ASSERT_TRUE(decision.ms_resource_under_pressure);
+    ASSERT_TRUE(decision.under_great_stress());
+    std::cout << decision.debug_string() << std::endl;
+    ASSERT_NE(decision.debug_string().find("ms_resource"), std::string::npos);
+
+    metrics.ms_cpu_usage_percent = 50;
+    decision = update_ms_stress_detector_for_test(60 * 1000, metrics);
+    ASSERT_FALSE(decision.ms_resource_under_pressure);
+}
+
+TEST(MetaServiceHelperTest, 
MsRateLimitInjectionRequiresSwitchAndProbabilityHit) {
+    MsRateLimitInjectionConfigGuard guard;
+
+    MsStressMetrics metrics;
+    config::enable_ms_rate_limit_injection = false;
+    config::ms_rate_limit_injection_probability = 100;
+    auto decision = update_ms_stress_detector_for_test(0, metrics, true, 0);
+    ASSERT_FALSE(decision.rate_limit_injected_for_test);
+    ASSERT_FALSE(decision.under_great_stress());
+
+    config::enable_ms_rate_limit_injection = true;
+    config::ms_rate_limit_injection_probability = 30;
+    decision = update_ms_stress_detector_for_test(1000, metrics, true, 30);
+    ASSERT_FALSE(decision.rate_limit_injected_for_test);
+    ASSERT_FALSE(decision.under_great_stress());
+
+    decision = update_ms_stress_detector_for_test(2000, metrics, true, 29);
+    ASSERT_TRUE(decision.rate_limit_injected_for_test);
+    ASSERT_TRUE(decision.under_great_stress());
+    ASSERT_NE(decision.debug_string().find("test_injection"), 
std::string::npos);
+}
+
+TEST(MetaServiceHelperTest, ParseCpusetCpuCount) {
+    ASSERT_EQ(internal::parse_cpuset_cpu_count("0-3,5,7-8"), 7);
+    ASSERT_EQ(internal::parse_cpuset_cpu_count("2"), 1);
+    ASSERT_EQ(internal::parse_cpuset_cpu_count(""), -1);
+    ASSERT_EQ(internal::parse_cpuset_cpu_count("3-1"), -1);
+}
+
+TEST(MetaServiceHelperTest, ParseCgroupCpuQuota) {
+    auto v2_limit = internal::parse_cgroup_v2_cpu_limit("50000 100000");
+    ASSERT_TRUE(v2_limit.has_value());
+    ASSERT_DOUBLE_EQ(*v2_limit, 0.5);
+    ASSERT_FALSE(internal::parse_cgroup_v2_cpu_limit("max 
100000").has_value());
+
+    auto v1_limit = internal::parse_cgroup_v1_cpu_limit(150000, 100000);
+    ASSERT_TRUE(v1_limit.has_value());
+    ASSERT_DOUBLE_EQ(*v1_limit, 1.5);
+    ASSERT_FALSE(internal::parse_cgroup_v1_cpu_limit(-1, 100000).has_value());
+}
+
+TEST(MetaServiceHelperTest, UsagePercentCalculationUsesEffectiveLimit) {
+    ASSERT_EQ(internal::calculate_usage_percent(512, 1024), 50);
+    ASSERT_EQ(internal::calculate_usage_percent(-1, 1024), -1);
+    ASSERT_EQ(internal::calculate_usage_percent(512, 
std::numeric_limits<int64_t>::max()), 0);
+
+    ASSERT_EQ(internal::calculate_cpu_usage_percent(5e8, 1e9, 0.5), 100);
+    ASSERT_EQ(internal::calculate_cpu_usage_percent(15e8, 1e9, 2.0), 75);
+    ASSERT_EQ(internal::calculate_cpu_usage_percent(1, 0, 2.0), -1);
+}
+} // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to