This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4783fc09011 [feature](meta-service) Support querying and adjusting rpc qps limit on meta service (#42413) 4783fc09011 is described below commit 4783fc0901131287983620719dcaef1517fd1e43 Author: Siyang Tang <tangsiyang2...@foxmail.com> AuthorDate: Wed Nov 13 18:34:53 2024 +0800 [feature](meta-service) Support querying and adjusting rpc qps limit on meta service (#42413) ## Proposed changes Usage 1. adjust limit ``` curl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?${params} ``` | Entry | Description | | ----------- | ----------- | | param | uint64 qps_limit | |behavior | set qps_limit global default value | |example|```curl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000```| | Entry | Description | | ----------- | ----------- | | param | uint64 qps_limit, string rpc_name | |behavior | set RPC specific qps_limit | |example|curl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&rpc_name=get_cluster| | Entry | Description | | ----------- | ----------- | | param | uint64 qps_limit, string rpc_name, string instance_id | |behavior | set instance qps_limit for specific RPC | |example|```ccurl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&rpc_name=get_cluster&instance_id="doris-0"```| | Entry | Description | | ----------- | ----------- | | param | uint64 qps_limit, string instance_id | |behavior | set global qps_limit for specific instance | |example|```curl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&instance_id="doris-0"```| 2. query limit | Entry | Description | | ----------- | ----------- | | param | none | |behavior | query qps limit for all RPC interface | |example|```curl http://ms_ip:ms_port/MetaService/http/v1/list_rate_limit```| --- cloud/src/meta-service/meta_service_http.cpp | 121 ++++++++++++- cloud/src/rate-limiter/rate_limiter.cpp | 115 ++++++++++-- cloud/src/rate-limiter/rate_limiter.h | 76 +++++++- cloud/test/meta_service_http_test.cpp | 90 ++++++++++ cloud/test/rate_limiter_test.cpp | 250 +++++++++++++++++++++++---- 5 files changed, 605 insertions(+), 47 deletions(-) diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 95907376dd2..2f7536e9989 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -22,6 +22,7 @@ #include <brpc/uri.h> #include <fmt/format.h> #include <gen_cpp/cloud.pb.h> +#include <glog/logging.h> #include <google/protobuf/message.h> #include <google/protobuf/service.h> #include <google/protobuf/util/json_util.h> @@ -30,8 +31,14 @@ #include <rapidjson/prettywriter.h> #include <rapidjson/stringbuffer.h> +#include <algorithm> +#include <array> +#include <cstdint> +#include <functional> #include <memory> #include <optional> +#include <string> +#include <string_view> #include <type_traits> #include <variant> #include <vector> @@ -42,6 +49,7 @@ #include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" #include "meta_service.h" +#include "rate-limiter/rate_limiter.h" namespace doris::cloud { @@ -333,6 +341,113 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller return http_json_reply(resp.status()); } +static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; + auto rpc_name = std::string {http_query(uri, "rpc_name")}; + auto instance_id = std::string {http_query(uri, "instance_id")}; + + auto process_set_qps_limit = [&](std::function<bool(int64_t)> cb) -> HttpResponse { + DCHECK(!qps_limit_str.empty()); + int64_t qps_limit = -1; + try { + qps_limit = std::stoll(qps_limit_str); + } catch (const std::exception& ex) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); + } + if (qps_limit < 0) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "`qps_limit` should not be less than 0"); + } + if (cb(qps_limit)) { + return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); + } + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("failed to adjust rate limit for qps_limit={}, " + "rpc_name={}, instance_id={}, plz ensure correct " + "rpc/instance name", + qps_limit_str, rpc_name, instance_id)); + }; + + auto set_global_qps_limit = [process_set_qps_limit, service]() { + return process_set_qps_limit([service](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit); + }); + }; + + auto set_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); + }); + }; + + auto set_instance_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); + }); + }; + + auto set_instance_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); + }); + }; + + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + // We have 3 optional params and 2^3 combination, and 4 of them are illegal. + // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. + std::array<std::function<HttpResponse()>, 8> processors; + std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); + processors[0b001] = std::move(set_global_qps_limit); + processors[0b011] = std::move(set_rpc_qps_limit); + processors[0b101] = std::move(set_instance_qps_limit); + processors[0b111] = std::move(set_instance_rpc_qps_limit); + + uint8_t level = (0x01 & qps_limit_str.empty()) | ((0x01 & rpc_name.empty()) << 1) | + ((0x01 & instance_id.empty()) << 2); + + DCHECK_LT(level, 8); + + return processors[level](); +} + +static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + auto rate_limiter = service->rate_limiter(); + rapidjson::Document d; + auto get_qps_limit = [&d](std::string_view rpc_name, + std::shared_ptr<RpcRateLimiter> rpc_limiter) { + rapidjson::Document node; + rapidjson::Document sub; + auto get_qps_token_limit = [&](std::string_view instance_id, + std::shared_ptr<RpcRateLimiter::QpsToken> qps_token) { + sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), + qps_token->max_qps_limit(), d.GetAllocator()); + }; + rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); + + auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit()); + node.AddMember("RPC qps limit", + rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()), + d.GetAllocator()); + node.AddMember("instance specific qps limit", sub, d.GetAllocator()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); + }; + rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); + + rapidjson::StringBuffer sb; + rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb); + d.Accept(writer); + return http_json_reply(MetaServiceCode::OK, sb.GetString()); +} + static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { auto& uri = ctrl->http_request().uri(); std::string_view key = http_query(uri, "key"); @@ -615,13 +730,17 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"abort_tablet_job", process_abort_tablet_job}, {"alter_ram_user", process_alter_ram_user}, {"alter_iam", process_alter_iam}, + {"adjust_rate_limit", process_adjust_rate_limit}, + {"list_rate_limit", process_query_rate_limit}, {"v1/abort_txn", process_abort_txn}, {"v1/abort_tablet_job", process_abort_tablet_job}, {"v1/alter_ram_user", process_alter_ram_user}, {"v1/alter_iam", process_alter_iam}, + {"v1/adjust_rate_limit", process_adjust_rate_limit}, + {"v1/list_rate_limit", process_query_rate_limit}, }; - auto cntl = static_cast<brpc::Controller*>(controller); + auto* cntl = static_cast<brpc::Controller*>(controller); brpc::ClosureGuard closure_guard(done); // Prepare input request info diff --git a/cloud/src/rate-limiter/rate_limiter.cpp b/cloud/src/rate-limiter/rate_limiter.cpp index 8988ff0560b..1d7d0a10ac8 100644 --- a/cloud/src/rate-limiter/rate_limiter.cpp +++ b/cloud/src/rate-limiter/rate_limiter.cpp @@ -17,11 +17,16 @@ #include "rate_limiter.h" +#include <bthread/mutex.h> #include <butil/strings/string_split.h> +#include <algorithm> #include <chrono> +#include <cstdint> #include <memory> #include <mutex> +#include <ranges> +#include <shared_mutex> #include "common/bvars.h" #include "common/config.h" @@ -29,10 +34,10 @@ namespace doris::cloud { -void RateLimiter::init(google::protobuf::Service* service) { - std::map<std::string, int64_t> rpc_name_to_max_qps_limit; +std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const std::string& list_str) { + std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit; std::vector<std::string> max_qps_limit_list; - butil::SplitString(config::specific_max_qps_limit, ';', &max_qps_limit_list); + butil::SplitString(list_str, ';', &max_qps_limit_list); for (const auto& v : max_qps_limit_list) { auto p = v.find(':'); if (p != std::string::npos && p != (v.size() - 1)) { @@ -41,29 +46,42 @@ void RateLimiter::init(google::protobuf::Service* service) { int64_t max_qps_limit = std::stoll(v.substr(p + 1)); if (max_qps_limit > 0) { rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit; - LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: " << max_qps_limit; } } catch (...) { - LOG(WARNING) << "failed to set max_qps_limit to rpc: " << rpc_name + LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << rpc_name << " config: " << v; } } } + return rpc_name_to_max_qps_limit; +} + +template <typename Callable> +void for_each_rpc_name(google::protobuf::Service* service, Callable cb) { auto method_size = service->GetDescriptor()->method_count(); for (auto i = 0; i < method_size; ++i) { std::string rpc_name = service->GetDescriptor()->method(i)->name(); - int64_t max_qps_limit = config::default_max_qps_limit; + cb(rpc_name); + } +} - auto it = rpc_name_to_max_qps_limit.find(rpc_name); - if (it != rpc_name_to_max_qps_limit.end()) { +void RateLimiter::init(google::protobuf::Service* service) { + auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit); + std::unique_lock write_lock(mutex_); + for_each_rpc_name(service, [&](const std::string& rpc_name) { + auto it = rpc_name_to_specific_limit.find(rpc_name); + int64_t max_qps_limit = config::default_max_qps_limit; + if (it != rpc_name_to_specific_limit.end()) { max_qps_limit = it->second; } limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, max_qps_limit); + }); + for (const auto& [k, _] : rpc_name_to_specific_limit) { + rpc_with_specific_limit_.insert(k); } } std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) { - // no need to be locked, because it is only modified during initialization auto it = limiters_.find(rpc_name); if (it == limiters_.end()) { return nullptr; @@ -71,6 +89,49 @@ std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const std::str return it->second; } +bool RateLimiter::set_rate_limit(int64_t qps_limit) { + std::lock_guard lock(mutex_); + auto filter = [this](const auto& kv) { return !rpc_with_specific_limit_.contains(kv.first); }; + for (const auto& [_, v] : limiters_ | std::views::filter(std::move(filter))) { + v->set_max_qps_limit(qps_limit); + } + return true; +} + +bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name) { + if (!limiters_.contains(rpc_name)) { + return false; + } + auto limiter = limiters_.at(rpc_name); + std::lock_guard lock(mutex_); + limiter->set_max_qps_limit(qps_limit); + rpc_with_specific_limit_.insert(rpc_name); + return true; +} + +bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name, + const std::string& instance_id) { + if (!limiters_.contains(rpc_name)) { + return false; + } + auto limiter = limiters_.at(rpc_name); + return limiter->set_max_qps_limit(qps_limit, instance_id); +} + +bool RateLimiter::set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id) { + return std::ranges::all_of(limiters_, [&](const auto& kv) { + return kv.second->set_max_qps_limit(qps_limit, instance_id); + }); + return true; +} + +void RateLimiter::for_each_rpc_limiter( + std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb) { + for (const auto& [rpc_name, rpc_limiter] : limiters_) { + cb(rpc_name, rpc_limiter); + } +} + bool RpcRateLimiter::get_qps_token(const std::string& instance_id, std::function<int()>& get_bvar_qps) { if (!config::use_detailed_metrics || instance_id.empty()) { @@ -93,6 +154,35 @@ bool RpcRateLimiter::get_qps_token(const std::string& instance_id, return qps_token->get_token(get_bvar_qps); } +void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) { + std::lock_guard<bthread::Mutex> l(mutex_); + max_qps_limit_ = max_qps_limit; + auto filter = [this](const auto& kv) { + return !instance_with_specific_limit_.contains(kv.first); + }; + for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) { + v->set_max_qps_limit(max_qps_limit); + } +} + +bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const std::string& instance_id) { + std::lock_guard<bthread::Mutex> l(mutex_); + if (!qps_limiter_.contains(instance_id)) { + qps_limiter_[instance_id] = std::make_shared<QpsToken>(max_qps_limit); + } else { + qps_limiter_.at(instance_id)->set_max_qps_limit(max_qps_limit); + } + instance_with_specific_limit_.insert(instance_id); + return true; +} + +void RpcRateLimiter::for_each_qps_token( + std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb) { + for (const auto& [instance_id, qps_token] : qps_limiter_) { + cb(instance_id, qps_token); + } +} + bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) { using namespace std::chrono; auto now = steady_clock::now(); @@ -110,4 +200,9 @@ bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) { return current_qps_ < max_qps_limit_; } -} // namespace doris::cloud \ No newline at end of file +void RpcRateLimiter::QpsToken::set_max_qps_limit(int64_t max_qps_limit) { + std::lock_guard<bthread::Mutex> l(mutex_); + max_qps_limit_ = max_qps_limit; +} + +} // namespace doris::cloud diff --git a/cloud/src/rate-limiter/rate_limiter.h b/cloud/src/rate-limiter/rate_limiter.h index df441656aa4..e557d0d8a10 100644 --- a/cloud/src/rate-limiter/rate_limiter.h +++ b/cloud/src/rate-limiter/rate_limiter.h @@ -19,10 +19,13 @@ #include <brpc/server.h> #include <bthread/mutex.h> +#include <google/protobuf/service.h> #include <cstdint> #include <memory> +#include <shared_mutex> #include <string> +#include <string_view> #include <unordered_map> #include "common/config.h" @@ -35,12 +38,54 @@ class RateLimiter { public: RateLimiter() = default; ~RateLimiter() = default; + void init(google::protobuf::Service* service); + std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string& rpc_name); + /** + * @brief for each rpc limiter, apply callback + * + * @param cb callback function with params rpc name and rate limiter + */ + void for_each_rpc_limiter( + std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb); + + /** + * @brief set global default rate limit, will not infulence rpc and instance specific qps limit setting + * + * @return true if set sucessfully + */ + bool set_rate_limit(int64_t qps_limit); + + /** + * @brief set rpc level rate limit, will not infulence instance specific qps limit setting + * + * @return true if set sucessfully + */ + bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name); + + /** + * @brief set instance level rate limit for specific rpc + * + * @return true if set sucessfully + */ + bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name, + const std::string& instance_id); + + /** + * @brief set instance level rate limit globally, will influence settings for the same instance of specific rpc + * + * @return true if set sucessfully + */ + bool set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id); + private: // rpc_name -> RpcRateLimiter std::unordered_map<std::string, std::shared_ptr<RpcRateLimiter>> limiters_; + // rpc names which specific limit have been set + std::unordered_set<std::string> rpc_with_specific_limit_; + bthread::Mutex mutex_; }; class RpcRateLimiter { @@ -58,15 +103,34 @@ public: */ bool get_qps_token(const std::string& instance_id, std::function<int()>& get_bvar_qps); - // Todo: Recycle outdated instance_id + std::string_view rpc_name() const { return rpc_name_; } + + int64_t max_qps_limit() const { return max_qps_limit_; } + + /** + * @brief set max qps limit for this limiter + * + * @return true if set sucessfully + */ + void set_max_qps_limit(int64_t max_qps_limit); + + /** + * @brief set max qps limit for specific instance within this limiter + * + * @return true if set sucessfully + */ + bool set_max_qps_limit(int64_t max_qps_limit, const std::string& instance); -private: class QpsToken { public: QpsToken(const int64_t max_qps_limit) : max_qps_limit_(max_qps_limit) {} bool get_token(std::function<int()>& get_bvar_qps); + void set_max_qps_limit(int64_t max_qps_limit); + + int64_t max_qps_limit() const { return max_qps_limit_; } + private: bthread::Mutex mutex_; std::chrono::steady_clock::time_point last_update_time_; @@ -75,12 +139,18 @@ private: int64_t max_qps_limit_; }; + void for_each_qps_token(std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb); + + // Todo: Recycle outdated instance_id + private: bthread::Mutex mutex_; // instance_id -> QpsToken std::unordered_map<std::string, std::shared_ptr<QpsToken>> qps_limiter_; + // instance ids which specific limit have been set + std::unordered_set<std::string> instance_with_specific_limit_; std::string rpc_name_; int64_t max_qps_limit_; }; -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index d1b8fd66943..4360efeb442 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -1535,4 +1535,94 @@ TEST(MetaServiceHttpTest, get_obj_store_info_response_sk) { ms->get_obj_store_info(&cntl, &req1, &res1, nullptr); } +TEST(MetaServiceHttpTest, AdjustRateLimit) { + HttpContext ctx; + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "qps_limit=10000"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "qps_limit=10000&rpc_name=get_cluster"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query<std::string>( + "adjust_rate_limit", + "qps_limit=10000&rpc_name=get_cluster&instance_id=test_instance"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query<std::string>( + "adjust_rate_limit", "qps_limit=10000&instance_id=test_instance"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "qps_limit=invalid"); + ASSERT_EQ(status_code, 400); + std::string msg = "param `qps_limit` is not a legal int64 type:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = ctx.query<std::string>("adjust_rate_limit", "qps_limit=-1"); + ASSERT_EQ(status_code, 400); + std::string msg = "qps_limit` should not be less than 0"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "rpc_name=get_cluster"); + ASSERT_EQ(status_code, 400); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "instance_id=test_instance"); + ASSERT_EQ(status_code, 400); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = ctx.query<std::string>( + "adjust_rate_limit", "rpc_name=get_cluster&instance_id=test_instance"); + ASSERT_EQ(status_code, 400); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = ctx.query<std::string>("adjust_rate_limit", ""); + ASSERT_EQ(status_code, 400); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "qps_limit=1000&rpc_name=invalid"); + ASSERT_EQ(status_code, 400); + std::string msg = "failed to adjust rate limit for qps_limit"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = + ctx.query<std::string>("adjust_rate_limit", "qps_limit=1000&instance_id=invalid"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query<std::string>( + "adjust_rate_limit", "qps_limit=1000&rpc_name=get_cluster&instance_id=invalid"); + ASSERT_EQ(status_code, 200); + } +} + +TEST(MetaServiceHttpTest, QueryRateLimit) { + HttpContext ctx; + { + auto [status_code, content] = ctx.query<std::string>("list_rate_limit", ""); + ASSERT_EQ(status_code, 200); + } +} + } // namespace doris::cloud diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp index 2a10451a69f..cab7b01774c 100644 --- a/cloud/test/rate_limiter_test.cpp +++ b/cloud/test/rate_limiter_test.cpp @@ -17,8 +17,11 @@ #include "rate-limiter/rate_limiter.h" +#include <gen_cpp/cloud.pb.h> #include <gtest/gtest.h> +#include <cstddef> + #include "common/config.h" #include "common/util.h" #include "meta-service/keys.h" @@ -26,6 +29,7 @@ #include "meta-service/meta_service.h" #include "meta-service/txn_kv_error.h" #include "mock_resource_manager.h" +#include "resource-manager/resource_manager.h" int main(int argc, char** argv) { doris::cloud::config::init(nullptr, true); @@ -35,25 +39,50 @@ int main(int argc, char** argv) { using namespace doris::cloud; +const std::string mock_instance_0 = "mock_instance_0"; +const std::string mock_instance_1 = "mock_instance_1"; +const std::string mock_cluster_0 = "mock_cluster_0"; +const std::string mock_cluster_1 = "mock_cluster_1"; +const std::string mock_cluster_id_0 = "mock_cluster_id_0"; +const std::string mock_cluster_id_1 = "mock_cluster_id_1"; +const std::string mock_cloud_unique_id_0 = "mock_cloud_unique_id_0"; +const std::string mock_cloud_unique_id_1 = "mock_cloud_unique_id_1"; + +class MockMultiInstanceRsMgr : public MockResourceManager { +public: + using MockResourceManager::MockResourceManager; + + std::string get_node(const std::string& cloud_unique_id, + std::vector<NodeInfo>* nodes) override { + if (cloud_unique_id == mock_cloud_unique_id_0) { + nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_0, mock_cluster_0, + mock_cluster_id_0); + } else if (cloud_unique_id == mock_cloud_unique_id_1) { + nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_1, mock_cluster_1, + mock_cluster_id_1); + } + return {}; + }; +}; + std::unique_ptr<MetaServiceProxy> get_meta_service() { auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); - auto rs = std::make_shared<MockResourceManager>(txn_kv); + auto rs = std::make_shared<MockMultiInstanceRsMgr>(txn_kv); auto rl = std::make_shared<RateLimiter>(); auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl); return std::make_unique<MetaServiceProxy>(std::move(meta_service)); } -TEST(RateLimiterTest, RateLimitGetClusterTest) { - auto meta_service = get_meta_service(); +void mock_add_cluster(MetaServiceProxy& meta_service, std::string instance_id) { // add cluster first - InstanceKeyInfo key_info {mock_instance}; + InstanceKeyInfo key_info {instance_id}; std::string key; std::string val; instance_key(key_info, &key); InstanceInfoPB instance; - instance.set_instance_id(mock_instance); + instance.set_instance_id(instance_id); ClusterPB c1; c1.set_cluster_name(mock_cluster_name); c1.set_cluster_id(mock_cluster_id); @@ -63,50 +92,205 @@ TEST(RateLimiterTest, RateLimitGetClusterTest) { std::unique_ptr<Transaction> txn; std::string get_val; - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(meta_service.txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->put(key, val); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} - auto get_cluster = [&](MetaServiceCode code) { - GetClusterRequest req; - req.set_cloud_unique_id("test_cloud_unique_id"); - req.set_cluster_id(mock_cluster_id); - req.set_cluster_name(mock_cluster_name); - brpc::Controller cntl; - GetClusterResponse res; - meta_service->get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, - &res, nullptr); - - ASSERT_EQ(res.status().code(), code); - }; +void mock_get_cluster(MetaServiceProxy& meta_service, const std::string& cloud_uid, + MetaServiceCode code) { + GetClusterRequest req; + req.set_cloud_unique_id(cloud_uid); + req.set_cluster_id(mock_cluster_id); + req.set_cluster_name(mock_cluster_name); + brpc::Controller cntl; + GetClusterResponse res; + meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + + ASSERT_EQ(res.status().code(), code); +} + +template <typename Rpc> +void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const std::string& cloud_uid, + MetaServiceCode expected, size_t times) { std::vector<std::thread> threads; - for (int i = 0; i < 20; ++i) { - threads.emplace_back(get_cluster, MetaServiceCode::OK); + for (size_t i = 0; i < times; ++i) { + threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected); }); } for (auto& t : threads) { t.join(); } - threads.clear(); +} + +TEST(RateLimiterTest, RateLimitGetClusterTest) { + auto meta_service = get_meta_service(); + mock_add_cluster(*meta_service, mock_instance_0); + + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); std::this_thread::sleep_for(std::chrono::seconds(1)); meta_service->rate_limiter() ->get_rpc_rate_limiter("get_cluster") - ->qps_limiter_[mock_instance] + ->qps_limiter_[mock_instance_0] ->max_qps_limit_ = 1; - threads.emplace_back(get_cluster, MetaServiceCode::MAX_QPS_LIMIT); - for (auto& t : threads) { - t.join(); - } - threads.clear(); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::MAX_QPS_LIMIT, 1); std::this_thread::sleep_for(std::chrono::seconds(1)); meta_service->rate_limiter() ->get_rpc_rate_limiter("get_cluster") - ->qps_limiter_[mock_instance] + ->qps_limiter_[mock_instance_0] ->max_qps_limit_ = 10000; - threads.emplace_back(get_cluster, MetaServiceCode::OK); - for (auto& t : threads) { - t.join(); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 1); +} + +TEST(RateLimiterTest, AdjustLimitInfluenceTest) { + auto meta_service = get_meta_service(); + mock_add_cluster(*meta_service, mock_instance_0); + mock_add_cluster(*meta_service, mock_instance_1); + + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + { + ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, mock_instance_1)); + ASSERT_TRUE( + meta_service->rate_limiter()->set_rate_limit(100, "get_cluster", mock_instance_0)); + + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 100); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 1); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1)); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000000); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(5000, "get_cluster")); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1000)); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1000); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(3000, "commit_txn")); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 3000); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000); + } + { + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 100); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 1); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(200, mock_instance_1)); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 100); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 200); } - threads.clear(); -} \ No newline at end of file +} + +TEST(RateLimiterTest, AdjustLimitMockRPCTest) { + auto meta_service = get_meta_service(); + mock_add_cluster(*meta_service, mock_instance_0); + mock_add_cluster(*meta_service, mock_instance_1); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1, + MetaServiceCode::OK, 20); + std::this_thread::sleep_for(std::chrono::seconds(1)); + + { + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster")); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 10000); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1, "get_cluster")); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::MAX_QPS_LIMIT, 1); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster", + mock_instance_0)); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster", + mock_instance_1)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1, + MetaServiceCode::OK, 20); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 10000); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 10000); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, mock_instance_0)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::MAX_QPS_LIMIT, 1); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1, + MetaServiceCode::OK, 20); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 1); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 10000); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org