gavinchou commented on code in PR #42413: URL: https://github.com/apache/doris/pull/42413#discussion_r1835749363
########## cloud/src/rate-limiter/rate_limiter.h: ########## @@ -35,12 +38,32 @@ class RateLimiter { public: RateLimiter() = default; ~RateLimiter() = default; + void init(google::protobuf::Service* service); + std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string& rpc_name); + void for_each_rpc_limiter( + std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb); + + // set global default rate limit, will not infulence rpc and instance specific qps limit setting + bool set_rate_limit(int64_t qps_limit); + + // set rpc level rate limit, will not infulence instance specific qps limit setting + bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name); + + // set instance level rate limit for specific rpc + bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name, + const std::string& instance_id); + + // set instance level rate limit globally, will influence settings for the same instance of specific rpc + bool set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id); + private: // rpc_name -> RpcRateLimiter std::unordered_map<std::string, std::shared_ptr<RpcRateLimiter>> limiters_; + std::unordered_set<std::string> rpc_with_specific_limit_; Review Comment: add comment, it seems to be "This collection contains the RPCs that have had a limit set by the specified name" ########## cloud/src/rate-limiter/rate_limiter.h: ########## @@ -75,12 +107,17 @@ class RpcRateLimiter { int64_t max_qps_limit_; }; + void for_each_qps_token(std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb); + + // Todo: Recycle outdated instance_id + private: bthread::Mutex mutex_; // instance_id -> QpsToken std::unordered_map<std::string, std::shared_ptr<QpsToken>> qps_limiter_; + std::unordered_set<std::string> instance_with_specific_limit_; Review Comment: ditto ########## cloud/src/rate-limiter/rate_limiter.h: ########## @@ -35,12 +38,32 @@ class RateLimiter { public: RateLimiter() = default; ~RateLimiter() = default; + void init(google::protobuf::Service* service); + std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string& rpc_name); + void for_each_rpc_limiter( Review Comment: add comment even though it seems simple, describe: behavior, assumptions, params and return value ########## cloud/src/rate-limiter/rate_limiter.cpp: ########## @@ -93,6 +154,35 @@ bool RpcRateLimiter::get_qps_token(const std::string& instance_id, return qps_token->get_token(get_bvar_qps); } +void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) { + std::lock_guard<bthread::Mutex> l(mutex_); + max_qps_limit_ = max_qps_limit; + auto filter = [this](const auto& kv) { + return !instance_with_specific_limit_.contains(kv.first); + }; + for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) { + v->set_max_qps_limit(max_qps_limit); + } +} + +bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const std::string& instance_id) { Review Comment: "limit" already implies "max", "max" in the name seems redundant ########## cloud/src/rate-limiter/rate_limiter.h: ########## @@ -58,15 +81,24 @@ class RpcRateLimiter { */ bool get_qps_token(const std::string& instance_id, std::function<int()>& get_bvar_qps); - // Todo: Recycle outdated instance_id + std::string_view rpc_name() const { return rpc_name_; } Review Comment: add comment for every function: behavior, assumptions, params and return value ########## cloud/src/meta-service/meta_service_http.cpp: ########## @@ -331,6 +338,128 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller return http_json_reply(resp.status()); } +static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; + auto rpc_name = std::string {http_query(uri, "rpc_name")}; + auto instance_id = std::string {http_query(uri, "instance_id")}; + + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + static auto parse_qps_limit = + [](const std::string& qps_limit_str) -> std::variant<int64_t, HttpResponse> { + DCHECK(!qps_limit_str.empty()); + int64_t qps_limit = -1; + try { + qps_limit = std::stoll(qps_limit_str); + } catch (const std::exception& ex) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); + } + if (qps_limit < 0) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "`qps_limit` should not be less than 0"); + } + return qps_limit; + }; + + auto process_set_qps_limit = [&](std::function<bool(int64_t)> cb) -> HttpResponse { Review Comment: abuse lambdas, too many indentions and hard to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org