This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4783fc09011 [feature](meta-service) Support querying and adjusting rpc 
qps limit on meta service (#42413)
4783fc09011 is described below

commit 4783fc0901131287983620719dcaef1517fd1e43
Author: Siyang Tang <tangsiyang2...@foxmail.com>
AuthorDate: Wed Nov 13 18:34:53 2024 +0800

    [feature](meta-service) Support querying and adjusting rpc qps limit on 
meta service (#42413)
    
    ## Proposed changes
    
    Usage
    1. adjust limit
    ```
    curl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?${params}
    ```
    | Entry      | Description |
    | ----------- | ----------- |
    | param         | uint64 qps_limit  |
    |behavior   | set qps_limit global default value |
    |example|```curl
    
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000```|
    
    | Entry      | Description |
    | ----------- | ----------- |
    | param         | uint64 qps_limit, string rpc_name  |
    |behavior   | set RPC specific qps_limit  |
    |example|curl
    
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&rpc_name=get_cluster|
    
    | Entry      | Description |
    | ----------- | ----------- |
    | param | uint64 qps_limit, string rpc_name, string instance_id |
    |behavior   | set instance qps_limit for specific RPC  |
    |example|```ccurl
    
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&rpc_name=get_cluster&instance_id="doris-0"```|
    
    | Entry      | Description |
    | ----------- | ----------- |
    | param         | uint64 qps_limit, string instance_id  |
    |behavior   | set global qps_limit for specific instance  |
    |example|```curl
    
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&instance_id="doris-0"```|
    
    2. query limit
    
    | Entry      | Description |
    | ----------- | ----------- |
    | param         | none  |
    |behavior   | query qps limit for all RPC interface |
    |example|```curl
    http://ms_ip:ms_port/MetaService/http/v1/list_rate_limit```|
---
 cloud/src/meta-service/meta_service_http.cpp | 121 ++++++++++++-
 cloud/src/rate-limiter/rate_limiter.cpp      | 115 ++++++++++--
 cloud/src/rate-limiter/rate_limiter.h        |  76 +++++++-
 cloud/test/meta_service_http_test.cpp        |  90 ++++++++++
 cloud/test/rate_limiter_test.cpp             | 250 +++++++++++++++++++++++----
 5 files changed, 605 insertions(+), 47 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_http.cpp 
b/cloud/src/meta-service/meta_service_http.cpp
index 95907376dd2..2f7536e9989 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -22,6 +22,7 @@
 #include <brpc/uri.h>
 #include <fmt/format.h>
 #include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
 #include <google/protobuf/message.h>
 #include <google/protobuf/service.h>
 #include <google/protobuf/util/json_util.h>
@@ -30,8 +31,14 @@
 #include <rapidjson/prettywriter.h>
 #include <rapidjson/stringbuffer.h>
 
+#include <algorithm>
+#include <array>
+#include <cstdint>
+#include <functional>
 #include <memory>
 #include <optional>
+#include <string>
+#include <string_view>
 #include <type_traits>
 #include <variant>
 #include <vector>
@@ -42,6 +49,7 @@
 #include "meta-service/txn_kv.h"
 #include "meta-service/txn_kv_error.h"
 #include "meta_service.h"
+#include "rate-limiter/rate_limiter.h"
 
 namespace doris::cloud {
 
@@ -333,6 +341,113 @@ static HttpResponse process_alter_iam(MetaServiceImpl* 
service, brpc::Controller
     return http_json_reply(resp.status());
 }
 
+static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, 
brpc::Controller* cntl) {
+    const auto& uri = cntl->http_request().uri();
+    auto qps_limit_str = std::string {http_query(uri, "qps_limit")};
+    auto rpc_name = std::string {http_query(uri, "rpc_name")};
+    auto instance_id = std::string {http_query(uri, "instance_id")};
+
+    auto process_set_qps_limit = [&](std::function<bool(int64_t)> cb) -> 
HttpResponse {
+        DCHECK(!qps_limit_str.empty());
+        int64_t qps_limit = -1;
+        try {
+            qps_limit = std::stoll(qps_limit_str);
+        } catch (const std::exception& ex) {
+            return http_json_reply(
+                    MetaServiceCode::INVALID_ARGUMENT,
+                    fmt::format("param `qps_limit` is not a legal int64 
type:{}", ex.what()));
+        }
+        if (qps_limit < 0) {
+            return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                                   "`qps_limit` should not be less than 0");
+        }
+        if (cb(qps_limit)) {
+            return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate 
limit");
+        }
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                               fmt::format("failed to adjust rate limit for 
qps_limit={}, "
+                                           "rpc_name={}, instance_id={}, plz 
ensure correct "
+                                           "rpc/instance name",
+                                           qps_limit_str, rpc_name, 
instance_id));
+    };
+
+    auto set_global_qps_limit = [process_set_qps_limit, service]() {
+        return process_set_qps_limit([service](int64_t qps_limit) {
+            return service->rate_limiter()->set_rate_limit(qps_limit);
+        });
+    };
+
+    auto set_rpc_qps_limit = [&]() {
+        return process_set_qps_limit([&](int64_t qps_limit) {
+            return service->rate_limiter()->set_rate_limit(qps_limit, 
rpc_name);
+        });
+    };
+
+    auto set_instance_qps_limit = [&]() {
+        return process_set_qps_limit([&](int64_t qps_limit) {
+            return service->rate_limiter()->set_instance_rate_limit(qps_limit, 
instance_id);
+        });
+    };
+
+    auto set_instance_rpc_qps_limit = [&]() {
+        return process_set_qps_limit([&](int64_t qps_limit) {
+            return service->rate_limiter()->set_rate_limit(qps_limit, 
rpc_name, instance_id);
+        });
+    };
+
+    auto process_invalid_arguments = [&]() -> HttpResponse {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                               fmt::format("invalid argument: 
qps_limit(required)={}, "
+                                           "rpc_name(optional)={}, 
instance_id(optional)={}",
+                                           qps_limit_str, rpc_name, 
instance_id));
+    };
+
+    // We have 3 optional params and 2^3 combination, and 4 of them are 
illegal.
+    // We register callbacks for them in porcessors accordings to the level, 
represented by 3 bits.
+    std::array<std::function<HttpResponse()>, 8> processors;
+    std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments));
+    processors[0b001] = std::move(set_global_qps_limit);
+    processors[0b011] = std::move(set_rpc_qps_limit);
+    processors[0b101] = std::move(set_instance_qps_limit);
+    processors[0b111] = std::move(set_instance_rpc_qps_limit);
+
+    uint8_t level = (0x01 & qps_limit_str.empty()) | ((0x01 & 
rpc_name.empty()) << 1) |
+                    ((0x01 & instance_id.empty()) << 2);
+
+    DCHECK_LT(level, 8);
+
+    return processors[level]();
+}
+
+static HttpResponse process_query_rate_limit(MetaServiceImpl* service, 
brpc::Controller* cntl) {
+    auto rate_limiter = service->rate_limiter();
+    rapidjson::Document d;
+    auto get_qps_limit = [&d](std::string_view rpc_name,
+                              std::shared_ptr<RpcRateLimiter> rpc_limiter) {
+        rapidjson::Document node;
+        rapidjson::Document sub;
+        auto get_qps_token_limit = [&](std::string_view instance_id,
+                                       
std::shared_ptr<RpcRateLimiter::QpsToken> qps_token) {
+            sub.AddMember(rapidjson::StringRef(instance_id.data(), 
instance_id.size()),
+                          qps_token->max_qps_limit(), d.GetAllocator());
+        };
+        rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit));
+
+        auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit());
+        node.AddMember("RPC qps limit",
+                       rapidjson::StringRef(max_qps_limit.data(), 
max_qps_limit.size()),
+                       d.GetAllocator());
+        node.AddMember("instance specific qps limit", sub, d.GetAllocator());
+        d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), 
node, d.GetAllocator());
+    };
+    rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit));
+
+    rapidjson::StringBuffer sb;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
+    d.Accept(writer);
+    return http_json_reply(MetaServiceCode::OK, sb.GetString());
+}
+
 static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* 
ctrl) {
     auto& uri = ctrl->http_request().uri();
     std::string_view key = http_query(uri, "key");
@@ -615,13 +730,17 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"abort_tablet_job", process_abort_tablet_job},
             {"alter_ram_user", process_alter_ram_user},
             {"alter_iam", process_alter_iam},
+            {"adjust_rate_limit", process_adjust_rate_limit},
+            {"list_rate_limit", process_query_rate_limit},
             {"v1/abort_txn", process_abort_txn},
             {"v1/abort_tablet_job", process_abort_tablet_job},
             {"v1/alter_ram_user", process_alter_ram_user},
             {"v1/alter_iam", process_alter_iam},
+            {"v1/adjust_rate_limit", process_adjust_rate_limit},
+            {"v1/list_rate_limit", process_query_rate_limit},
     };
 
-    auto cntl = static_cast<brpc::Controller*>(controller);
+    auto* cntl = static_cast<brpc::Controller*>(controller);
     brpc::ClosureGuard closure_guard(done);
 
     // Prepare input request info
diff --git a/cloud/src/rate-limiter/rate_limiter.cpp 
b/cloud/src/rate-limiter/rate_limiter.cpp
index 8988ff0560b..1d7d0a10ac8 100644
--- a/cloud/src/rate-limiter/rate_limiter.cpp
+++ b/cloud/src/rate-limiter/rate_limiter.cpp
@@ -17,11 +17,16 @@
 
 #include "rate_limiter.h"
 
+#include <bthread/mutex.h>
 #include <butil/strings/string_split.h>
 
+#include <algorithm>
 #include <chrono>
+#include <cstdint>
 #include <memory>
 #include <mutex>
+#include <ranges>
+#include <shared_mutex>
 
 #include "common/bvars.h"
 #include "common/config.h"
@@ -29,10 +34,10 @@
 
 namespace doris::cloud {
 
-void RateLimiter::init(google::protobuf::Service* service) {
-    std::map<std::string, int64_t> rpc_name_to_max_qps_limit;
+std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const 
std::string& list_str) {
+    std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit;
     std::vector<std::string> max_qps_limit_list;
-    butil::SplitString(config::specific_max_qps_limit, ';', 
&max_qps_limit_list);
+    butil::SplitString(list_str, ';', &max_qps_limit_list);
     for (const auto& v : max_qps_limit_list) {
         auto p = v.find(':');
         if (p != std::string::npos && p != (v.size() - 1)) {
@@ -41,29 +46,42 @@ void RateLimiter::init(google::protobuf::Service* service) {
                 int64_t max_qps_limit = std::stoll(v.substr(p + 1));
                 if (max_qps_limit > 0) {
                     rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit;
-                    LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: " 
<< max_qps_limit;
                 }
             } catch (...) {
-                LOG(WARNING) << "failed to set max_qps_limit to rpc: " << 
rpc_name
+                LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << 
rpc_name
                              << " config: " << v;
             }
         }
     }
+    return rpc_name_to_max_qps_limit;
+}
+
+template <typename Callable>
+void for_each_rpc_name(google::protobuf::Service* service, Callable cb) {
     auto method_size = service->GetDescriptor()->method_count();
     for (auto i = 0; i < method_size; ++i) {
         std::string rpc_name = service->GetDescriptor()->method(i)->name();
-        int64_t max_qps_limit = config::default_max_qps_limit;
+        cb(rpc_name);
+    }
+}
 
-        auto it = rpc_name_to_max_qps_limit.find(rpc_name);
-        if (it != rpc_name_to_max_qps_limit.end()) {
+void RateLimiter::init(google::protobuf::Service* service) {
+    auto rpc_name_to_specific_limit = 
parse_specific_qps_limit(config::specific_max_qps_limit);
+    std::unique_lock write_lock(mutex_);
+    for_each_rpc_name(service, [&](const std::string& rpc_name) {
+        auto it = rpc_name_to_specific_limit.find(rpc_name);
+        int64_t max_qps_limit = config::default_max_qps_limit;
+        if (it != rpc_name_to_specific_limit.end()) {
             max_qps_limit = it->second;
         }
         limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, 
max_qps_limit);
+    });
+    for (const auto& [k, _] : rpc_name_to_specific_limit) {
+        rpc_with_specific_limit_.insert(k);
     }
 }
 
 std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const 
std::string& rpc_name) {
-    // no need to be locked, because it is only modified during initialization
     auto it = limiters_.find(rpc_name);
     if (it == limiters_.end()) {
         return nullptr;
@@ -71,6 +89,49 @@ std::shared_ptr<RpcRateLimiter> 
RateLimiter::get_rpc_rate_limiter(const std::str
     return it->second;
 }
 
+bool RateLimiter::set_rate_limit(int64_t qps_limit) {
+    std::lock_guard lock(mutex_);
+    auto filter = [this](const auto& kv) { return 
!rpc_with_specific_limit_.contains(kv.first); };
+    for (const auto& [_, v] : limiters_ | 
std::views::filter(std::move(filter))) {
+        v->set_max_qps_limit(qps_limit);
+    }
+    return true;
+}
+
+bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& 
rpc_name) {
+    if (!limiters_.contains(rpc_name)) {
+        return false;
+    }
+    auto limiter = limiters_.at(rpc_name);
+    std::lock_guard lock(mutex_);
+    limiter->set_max_qps_limit(qps_limit);
+    rpc_with_specific_limit_.insert(rpc_name);
+    return true;
+}
+
+bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& 
rpc_name,
+                                 const std::string& instance_id) {
+    if (!limiters_.contains(rpc_name)) {
+        return false;
+    }
+    auto limiter = limiters_.at(rpc_name);
+    return limiter->set_max_qps_limit(qps_limit, instance_id);
+}
+
+bool RateLimiter::set_instance_rate_limit(int64_t qps_limit, const 
std::string& instance_id) {
+    return std::ranges::all_of(limiters_, [&](const auto& kv) {
+        return kv.second->set_max_qps_limit(qps_limit, instance_id);
+    });
+    return true;
+}
+
+void RateLimiter::for_each_rpc_limiter(
+        std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> 
cb) {
+    for (const auto& [rpc_name, rpc_limiter] : limiters_) {
+        cb(rpc_name, rpc_limiter);
+    }
+}
+
 bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
                                    std::function<int()>& get_bvar_qps) {
     if (!config::use_detailed_metrics || instance_id.empty()) {
@@ -93,6 +154,35 @@ bool RpcRateLimiter::get_qps_token(const std::string& 
instance_id,
     return qps_token->get_token(get_bvar_qps);
 }
 
+void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) {
+    std::lock_guard<bthread::Mutex> l(mutex_);
+    max_qps_limit_ = max_qps_limit;
+    auto filter = [this](const auto& kv) {
+        return !instance_with_specific_limit_.contains(kv.first);
+    };
+    for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) {
+        v->set_max_qps_limit(max_qps_limit);
+    }
+}
+
+bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const 
std::string& instance_id) {
+    std::lock_guard<bthread::Mutex> l(mutex_);
+    if (!qps_limiter_.contains(instance_id)) {
+        qps_limiter_[instance_id] = std::make_shared<QpsToken>(max_qps_limit);
+    } else {
+        qps_limiter_.at(instance_id)->set_max_qps_limit(max_qps_limit);
+    }
+    instance_with_specific_limit_.insert(instance_id);
+    return true;
+}
+
+void RpcRateLimiter::for_each_qps_token(
+        std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb) {
+    for (const auto& [instance_id, qps_token] : qps_limiter_) {
+        cb(instance_id, qps_token);
+    }
+}
+
 bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
     using namespace std::chrono;
     auto now = steady_clock::now();
@@ -110,4 +200,9 @@ bool 
RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
     return current_qps_ < max_qps_limit_;
 }
 
-} // namespace doris::cloud
\ No newline at end of file
+void RpcRateLimiter::QpsToken::set_max_qps_limit(int64_t max_qps_limit) {
+    std::lock_guard<bthread::Mutex> l(mutex_);
+    max_qps_limit_ = max_qps_limit;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/rate-limiter/rate_limiter.h 
b/cloud/src/rate-limiter/rate_limiter.h
index df441656aa4..e557d0d8a10 100644
--- a/cloud/src/rate-limiter/rate_limiter.h
+++ b/cloud/src/rate-limiter/rate_limiter.h
@@ -19,10 +19,13 @@
 
 #include <brpc/server.h>
 #include <bthread/mutex.h>
+#include <google/protobuf/service.h>
 
 #include <cstdint>
 #include <memory>
+#include <shared_mutex>
 #include <string>
+#include <string_view>
 #include <unordered_map>
 
 #include "common/config.h"
@@ -35,12 +38,54 @@ class RateLimiter {
 public:
     RateLimiter() = default;
     ~RateLimiter() = default;
+
     void init(google::protobuf::Service* service);
+
     std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string& 
rpc_name);
 
+    /**
+     * @brief for each rpc limiter, apply callback
+     *
+     * @param cb callback function with params rpc name and rate limiter
+     */
+    void for_each_rpc_limiter(
+            std::function<void(std::string_view, 
std::shared_ptr<RpcRateLimiter>)> cb);
+
+    /** 
+     * @brief set global default rate limit, will not infulence rpc and 
instance specific qps limit setting
+     *
+     * @return true if set sucessfully
+     */
+    bool set_rate_limit(int64_t qps_limit);
+
+    /** 
+     * @brief set rpc level rate limit, will not infulence instance specific 
qps limit setting 
+     *
+     * @return true if set sucessfully
+     */
+    bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name);
+
+    /** 
+     * @brief set instance level rate limit for specific rpc
+     *
+     * @return true if set sucessfully
+     */
+    bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name,
+                        const std::string& instance_id);
+
+    /** 
+     * @brief set instance level rate limit globally, will influence settings 
for the same instance of specific rpc 
+     * 
+     * @return true if set sucessfully
+     */
+    bool set_instance_rate_limit(int64_t qps_limit, const std::string& 
instance_id);
+
 private:
     // rpc_name -> RpcRateLimiter
     std::unordered_map<std::string, std::shared_ptr<RpcRateLimiter>> limiters_;
+    // rpc names which specific limit have been set
+    std::unordered_set<std::string> rpc_with_specific_limit_;
+    bthread::Mutex mutex_;
 };
 
 class RpcRateLimiter {
@@ -58,15 +103,34 @@ public:
      */
     bool get_qps_token(const std::string& instance_id, std::function<int()>& 
get_bvar_qps);
 
-    // Todo: Recycle outdated instance_id
+    std::string_view rpc_name() const { return rpc_name_; }
+
+    int64_t max_qps_limit() const { return max_qps_limit_; }
+
+    /**
+     * @brief set max qps limit for this limiter
+     * 
+     * @return true if set sucessfully
+     */
+    void set_max_qps_limit(int64_t max_qps_limit);
+
+    /**
+     * @brief set max qps limit for specific instance within this limiter
+     *
+     * @return true if set sucessfully
+     */
+    bool set_max_qps_limit(int64_t max_qps_limit, const std::string& instance);
 
-private:
     class QpsToken {
     public:
         QpsToken(const int64_t max_qps_limit) : max_qps_limit_(max_qps_limit) 
{}
 
         bool get_token(std::function<int()>& get_bvar_qps);
 
+        void set_max_qps_limit(int64_t max_qps_limit);
+
+        int64_t max_qps_limit() const { return max_qps_limit_; }
+
     private:
         bthread::Mutex mutex_;
         std::chrono::steady_clock::time_point last_update_time_;
@@ -75,12 +139,18 @@ private:
         int64_t max_qps_limit_;
     };
 
+    void for_each_qps_token(std::function<void(std::string_view, 
std::shared_ptr<QpsToken>)> cb);
+
+    // Todo: Recycle outdated instance_id
+
 private:
     bthread::Mutex mutex_;
     // instance_id -> QpsToken
     std::unordered_map<std::string, std::shared_ptr<QpsToken>> qps_limiter_;
+    // instance ids which specific limit have been set
+    std::unordered_set<std::string> instance_with_specific_limit_;
     std::string rpc_name_;
     int64_t max_qps_limit_;
 };
 
-} // namespace doris::cloud
\ No newline at end of file
+} // namespace doris::cloud
diff --git a/cloud/test/meta_service_http_test.cpp 
b/cloud/test/meta_service_http_test.cpp
index d1b8fd66943..4360efeb442 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -1535,4 +1535,94 @@ TEST(MetaServiceHttpTest, 
get_obj_store_info_response_sk) {
     ms->get_obj_store_info(&cntl, &req1, &res1, nullptr);
 }
 
+TEST(MetaServiceHttpTest, AdjustRateLimit) {
+    HttpContext ctx;
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", "qps_limit=10000");
+        ASSERT_EQ(status_code, 200);
+    }
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", 
"qps_limit=10000&rpc_name=get_cluster");
+        ASSERT_EQ(status_code, 200);
+    }
+    {
+        auto [status_code, content] = ctx.query<std::string>(
+                "adjust_rate_limit",
+                
"qps_limit=10000&rpc_name=get_cluster&instance_id=test_instance");
+        ASSERT_EQ(status_code, 200);
+    }
+    {
+        auto [status_code, content] = ctx.query<std::string>(
+                "adjust_rate_limit", 
"qps_limit=10000&instance_id=test_instance");
+        ASSERT_EQ(status_code, 200);
+    }
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", 
"qps_limit=invalid");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "param `qps_limit` is not a legal int64 type:";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] = 
ctx.query<std::string>("adjust_rate_limit", "qps_limit=-1");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "qps_limit` should not be less than 0";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", 
"rpc_name=get_cluster");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "invalid argument:";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", 
"instance_id=test_instance");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "invalid argument:";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] = ctx.query<std::string>(
+                "adjust_rate_limit", 
"rpc_name=get_cluster&instance_id=test_instance");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "invalid argument:";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] = 
ctx.query<std::string>("adjust_rate_limit", "");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "invalid argument:";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", 
"qps_limit=1000&rpc_name=invalid");
+        ASSERT_EQ(status_code, 400);
+        std::string msg = "failed to adjust rate limit for qps_limit";
+        ASSERT_NE(content.find(msg), std::string::npos);
+    }
+    {
+        auto [status_code, content] =
+                ctx.query<std::string>("adjust_rate_limit", 
"qps_limit=1000&instance_id=invalid");
+        ASSERT_EQ(status_code, 200);
+    }
+    {
+        auto [status_code, content] = ctx.query<std::string>(
+                "adjust_rate_limit", 
"qps_limit=1000&rpc_name=get_cluster&instance_id=invalid");
+        ASSERT_EQ(status_code, 200);
+    }
+}
+
+TEST(MetaServiceHttpTest, QueryRateLimit) {
+    HttpContext ctx;
+    {
+        auto [status_code, content] = 
ctx.query<std::string>("list_rate_limit", "");
+        ASSERT_EQ(status_code, 200);
+    }
+}
+
 } // namespace doris::cloud
diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp
index 2a10451a69f..cab7b01774c 100644
--- a/cloud/test/rate_limiter_test.cpp
+++ b/cloud/test/rate_limiter_test.cpp
@@ -17,8 +17,11 @@
 
 #include "rate-limiter/rate_limiter.h"
 
+#include <gen_cpp/cloud.pb.h>
 #include <gtest/gtest.h>
 
+#include <cstddef>
+
 #include "common/config.h"
 #include "common/util.h"
 #include "meta-service/keys.h"
@@ -26,6 +29,7 @@
 #include "meta-service/meta_service.h"
 #include "meta-service/txn_kv_error.h"
 #include "mock_resource_manager.h"
+#include "resource-manager/resource_manager.h"
 
 int main(int argc, char** argv) {
     doris::cloud::config::init(nullptr, true);
@@ -35,25 +39,50 @@ int main(int argc, char** argv) {
 
 using namespace doris::cloud;
 
+const std::string mock_instance_0 = "mock_instance_0";
+const std::string mock_instance_1 = "mock_instance_1";
+const std::string mock_cluster_0 = "mock_cluster_0";
+const std::string mock_cluster_1 = "mock_cluster_1";
+const std::string mock_cluster_id_0 = "mock_cluster_id_0";
+const std::string mock_cluster_id_1 = "mock_cluster_id_1";
+const std::string mock_cloud_unique_id_0 = "mock_cloud_unique_id_0";
+const std::string mock_cloud_unique_id_1 = "mock_cloud_unique_id_1";
+
+class MockMultiInstanceRsMgr : public MockResourceManager {
+public:
+    using MockResourceManager::MockResourceManager;
+
+    std::string get_node(const std::string& cloud_unique_id,
+                         std::vector<NodeInfo>* nodes) override {
+        if (cloud_unique_id == mock_cloud_unique_id_0) {
+            nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_0, 
mock_cluster_0,
+                                mock_cluster_id_0);
+        } else if (cloud_unique_id == mock_cloud_unique_id_1) {
+            nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_1, 
mock_cluster_1,
+                                mock_cluster_id_1);
+        }
+        return {};
+    };
+};
+
 std::unique_ptr<MetaServiceProxy> get_meta_service() {
     auto txn_kv = 
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
     [&] { ASSERT_NE(txn_kv.get(), nullptr); }();
-    auto rs = std::make_shared<MockResourceManager>(txn_kv);
+    auto rs = std::make_shared<MockMultiInstanceRsMgr>(txn_kv);
     auto rl = std::make_shared<RateLimiter>();
     auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl);
     return std::make_unique<MetaServiceProxy>(std::move(meta_service));
 }
 
-TEST(RateLimiterTest, RateLimitGetClusterTest) {
-    auto meta_service = get_meta_service();
+void mock_add_cluster(MetaServiceProxy& meta_service, std::string instance_id) 
{
     // add cluster first
-    InstanceKeyInfo key_info {mock_instance};
+    InstanceKeyInfo key_info {instance_id};
     std::string key;
     std::string val;
     instance_key(key_info, &key);
 
     InstanceInfoPB instance;
-    instance.set_instance_id(mock_instance);
+    instance.set_instance_id(instance_id);
     ClusterPB c1;
     c1.set_cluster_name(mock_cluster_name);
     c1.set_cluster_id(mock_cluster_id);
@@ -63,50 +92,205 @@ TEST(RateLimiterTest, RateLimitGetClusterTest) {
 
     std::unique_ptr<Transaction> txn;
     std::string get_val;
-    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(meta_service.txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->put(key, val);
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
 
-    auto get_cluster = [&](MetaServiceCode code) {
-        GetClusterRequest req;
-        req.set_cloud_unique_id("test_cloud_unique_id");
-        req.set_cluster_id(mock_cluster_id);
-        req.set_cluster_name(mock_cluster_name);
-        brpc::Controller cntl;
-        GetClusterResponse res;
-        
meta_service->get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
-                                  &res, nullptr);
-
-        ASSERT_EQ(res.status().code(), code);
-    };
+void mock_get_cluster(MetaServiceProxy& meta_service, const std::string& 
cloud_uid,
+                      MetaServiceCode code) {
+    GetClusterRequest req;
+    req.set_cloud_unique_id(cloud_uid);
+    req.set_cluster_id(mock_cluster_id);
+    req.set_cluster_name(mock_cluster_name);
+    brpc::Controller cntl;
+    GetClusterResponse res;
+    
meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                             &res, nullptr);
+
+    ASSERT_EQ(res.status().code(), code);
+}
+
+template <typename Rpc>
+void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const 
std::string& cloud_uid,
+                       MetaServiceCode expected, size_t times) {
     std::vector<std::thread> threads;
-    for (int i = 0; i < 20; ++i) {
-        threads.emplace_back(get_cluster, MetaServiceCode::OK);
+    for (size_t i = 0; i < times; ++i) {
+        threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected); 
});
     }
     for (auto& t : threads) {
         t.join();
     }
-    threads.clear();
+}
+
+TEST(RateLimiterTest, RateLimitGetClusterTest) {
+    auto meta_service = get_meta_service();
+    mock_add_cluster(*meta_service, mock_instance_0);
+
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::OK, 20);
 
     std::this_thread::sleep_for(std::chrono::seconds(1));
     meta_service->rate_limiter()
             ->get_rpc_rate_limiter("get_cluster")
-            ->qps_limiter_[mock_instance]
+            ->qps_limiter_[mock_instance_0]
             ->max_qps_limit_ = 1;
-    threads.emplace_back(get_cluster, MetaServiceCode::MAX_QPS_LIMIT);
-    for (auto& t : threads) {
-        t.join();
-    }
-    threads.clear();
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::MAX_QPS_LIMIT, 1);
 
     std::this_thread::sleep_for(std::chrono::seconds(1));
     meta_service->rate_limiter()
             ->get_rpc_rate_limiter("get_cluster")
-            ->qps_limiter_[mock_instance]
+            ->qps_limiter_[mock_instance_0]
             ->max_qps_limit_ = 10000;
-    threads.emplace_back(get_cluster, MetaServiceCode::OK);
-    for (auto& t : threads) {
-        t.join();
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::OK, 1);
+}
+
+TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
+    auto meta_service = get_meta_service();
+    mock_add_cluster(*meta_service, mock_instance_0);
+    mock_add_cluster(*meta_service, mock_instance_1);
+
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::OK, 1);
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, 
mock_instance_1));
+        ASSERT_TRUE(
+                meta_service->rate_limiter()->set_rate_limit(100, 
"get_cluster", mock_instance_0));
+
+        auto limit = meta_service->rate_limiter()
+                             ->get_rpc_rate_limiter("get_cluster")
+                             ->qps_limiter_.at(mock_instance_0)
+                             ->max_qps_limit();
+        ASSERT_EQ(limit, 100);
+        limit = meta_service->rate_limiter()
+                        ->get_rpc_rate_limiter("get_cluster")
+                        ->qps_limiter_.at(mock_instance_1)
+                        ->max_qps_limit();
+        ASSERT_EQ(limit, 1);
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+        auto limit =
+                
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+        ASSERT_EQ(limit, 1);
+        limit = 
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+        ASSERT_EQ(limit, 5000000);
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(5000, 
"get_cluster"));
+        auto limit =
+                
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+        ASSERT_EQ(limit, 1);
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1000));
+        limit = 
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+        ASSERT_EQ(limit, 5000);
+        limit = 
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+        ASSERT_EQ(limit, 1000);
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(3000, 
"commit_txn"));
+        auto limit =
+                
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+        ASSERT_EQ(limit, 3000);
+        limit = 
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+        ASSERT_EQ(limit, 5000);
+    }
+    {
+        auto limit = meta_service->rate_limiter()
+                             ->get_rpc_rate_limiter("get_cluster")
+                             ->qps_limiter_.at(mock_instance_0)
+                             ->max_qps_limit();
+        ASSERT_EQ(limit, 100);
+        limit = meta_service->rate_limiter()
+                        ->get_rpc_rate_limiter("get_cluster")
+                        ->qps_limiter_.at(mock_instance_1)
+                        ->max_qps_limit();
+        ASSERT_EQ(limit, 1);
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(200, 
mock_instance_1));
+        auto limit = meta_service->rate_limiter()
+                             ->get_rpc_rate_limiter("get_cluster")
+                             ->qps_limiter_.at(mock_instance_0)
+                             ->max_qps_limit();
+        ASSERT_EQ(limit, 100);
+        limit = meta_service->rate_limiter()
+                        ->get_rpc_rate_limiter("get_cluster")
+                        ->qps_limiter_.at(mock_instance_1)
+                        ->max_qps_limit();
+        ASSERT_EQ(limit, 200);
     }
-    threads.clear();
-}
\ No newline at end of file
+}
+
+TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
+    auto meta_service = get_meta_service();
+    mock_add_cluster(*meta_service, mock_instance_0);
+    mock_add_cluster(*meta_service, mock_instance_1);
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::OK, 20);
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                      MetaServiceCode::OK, 20);
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, 
"get_cluster"));
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 20);
+        auto limit =
+                
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+        ASSERT_EQ(limit, 10000);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1, 
"get_cluster"));
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 1);
+        auto limit =
+                
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+        ASSERT_EQ(limit, 1);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, 
"get_cluster",
+                                                                 
mock_instance_0));
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, 
"get_cluster",
+                                                                 
mock_instance_1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 20);
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::OK, 20);
+        auto limit = meta_service->rate_limiter()
+                             ->get_rpc_rate_limiter("get_cluster")
+                             ->qps_limiter_.at(mock_instance_0)
+                             ->max_qps_limit();
+        ASSERT_EQ(limit, 10000);
+        limit = meta_service->rate_limiter()
+                        ->get_rpc_rate_limiter("get_cluster")
+                        ->qps_limiter_.at(mock_instance_1)
+                        ->max_qps_limit();
+        ASSERT_EQ(limit, 10000);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, 
mock_instance_0));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 1);
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::OK, 20);
+        auto limit = meta_service->rate_limiter()
+                             ->get_rpc_rate_limiter("get_cluster")
+                             ->qps_limiter_.at(mock_instance_0)
+                             ->max_qps_limit();
+        ASSERT_EQ(limit, 1);
+        limit = meta_service->rate_limiter()
+                        ->get_rpc_rate_limiter("get_cluster")
+                        ->qps_limiter_.at(mock_instance_1)
+                        ->max_qps_limit();
+        ASSERT_EQ(limit, 10000);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to