This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2aeb3b5af4a0d0339d568cfcf42750eb8d8c19bf Author: AlexYue <yj976240...@gmail.com> AuthorDate: Wed Jul 31 11:35:41 2024 +0800 [enhance](RateLimit) Add bvar to monitor object storage rate limit sleep time and failure time (#38294) Add one bvar to monitor the failed request due to exceeds rate limit. --- be/src/io/fs/azure_obj_storage_client.cpp | 90 ++++++++++++++++++++++++++----- be/src/io/fs/s3_file_reader.cpp | 12 ----- be/src/io/fs/s3_obj_storage_client.cpp | 70 +++++++++++++++++------- be/src/util/s3_util.cpp | 40 +++++++++----- be/src/util/s3_util.h | 22 +------- cloud/src/recycler/azure_obj_client.cpp | 24 +++++++-- cloud/src/recycler/s3_accessor.cpp | 50 +++++++++++++---- cloud/src/recycler/s3_accessor.h | 22 ++++++++ cloud/src/recycler/s3_obj_client.cpp | 26 ++++++--- common/cpp/s3_rate_limiter.cpp | 28 +++++----- common/cpp/s3_rate_limiter.h | 6 +-- 11 files changed, 275 insertions(+), 115 deletions(-) diff --git a/be/src/io/fs/azure_obj_storage_client.cpp b/be/src/io/fs/azure_obj_storage_client.cpp index 043886672a2..9f33db3400a 100644 --- a/be/src/io/fs/azure_obj_storage_client.cpp +++ b/be/src/io/fs/azure_obj_storage_client.cpp @@ -42,6 +42,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/obj_storage_client.h" +#include "util/bvar_helper.h" #include "util/s3_util.h" using namespace Azure::Storage::Blobs; @@ -57,6 +58,28 @@ auto base64_encode_part_num(int part_num) { {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)}); } +template <typename Func> +auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> decltype(callback()) { + if (!doris::config::enable_s3_rate_limiter) { + return callback(); + } + auto sleep_duration = doris::S3ClientFactory::instance().rate_limiter(op)->add(1); + if (sleep_duration < 0) { + throw std::runtime_error("Azure exceeds request limit"); + } + return callback(); +} + +template <typename Func> +auto s3_get_rate_limit(Func callback) -> decltype(callback()) { + return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback)); +} + +template <typename Func> +auto s3_put_rate_limit(Func callback) -> decltype(callback()) { + return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback)); +} + constexpr char SAS_TOKEN_URL_TEMPLATE[] = "https://{}.blob.core.windows.net/{}/{}{}"; constexpr char BlobNotFound[] = "BlobNotFound"; } // namespace @@ -101,7 +124,14 @@ struct AzureBatchDeleter { if (deferred_resps.empty()) { return ObjectStorageResponse::OK(); } - auto resp = do_azure_client_call([&]() { _client->SubmitBatch(_batch); }, _opts); + auto resp = do_azure_client_call( + [&]() { + s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency); + _client->SubmitBatch(_batch); + }); + }, + _opts); if (resp.status.code != ErrorCode::OK) { return resp; } @@ -156,7 +186,11 @@ ObjectStorageResponse AzureObjStorageClient::put_object(const ObjectStoragePathO auto client = _client->GetBlockBlobClient(opts.key); return do_azure_client_call( [&]() { - client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size()); + s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency); + client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), + stream.size()); + }); }, opts); } @@ -169,7 +203,10 @@ ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStora Azure::Core::IO::MemoryBodyStream memory_body( reinterpret_cast<const uint8_t*>(stream.data()), stream.size()); // The blockId must be base64 encoded - client.StageBlock(base64_encode_part_num(part_num), memory_body); + s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); + client.StageBlock(base64_encode_part_num(part_num), memory_body); + }); } catch (Azure::Core::RequestFailedException& e) { auto msg = fmt::format( "Azure request failed because {}, error msg {}, http code {}, path msg {}", @@ -200,13 +237,22 @@ ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload( std::ranges::transform( completed_parts, std::back_inserter(string_block_ids), [](const ObjectCompleteMultiPart& i) { return base64_encode_part_num(i.part_num); }); - return do_azure_client_call([&]() { client.CommitBlockList(string_block_ids); }, opts); + return do_azure_client_call( + [&]() { + s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); + client.CommitBlockList(string_block_ids); + }); + }, + opts); } ObjectStorageHeadResponse AzureObjStorageClient::head_object(const ObjectStoragePathOptions& opts) { try { - Models::BlobProperties properties = - _client->GetBlockBlobClient(opts.key).GetProperties().Value; + Models::BlobProperties properties = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency); + return _client->GetBlockBlobClient(opts.key).GetProperties().Value; + }); return {.file_size = properties.BlobSize}; } catch (Azure::Core::RequestFailedException& e) { if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { @@ -238,8 +284,11 @@ ObjectStorageResponse AzureObjStorageClient::get_object(const ObjectStoragePathO DownloadBlobToOptions download_opts; Azure::Core::Http::HttpRange range {static_cast<int64_t>(offset), bytes_read}; download_opts.Range = range; - auto resp = client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read, - download_opts); + auto resp = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); + return client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read, + download_opts); + }); *size_return = resp.Value.ContentRange.Length.Value(); }, opts); @@ -257,11 +306,17 @@ ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePat [&]() { ListBlobsOptions list_opts; list_opts.Prefix = opts.prefix; - auto resp = _client->ListBlobs(list_opts); + auto resp = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + return _client->ListBlobs(list_opts); + }); get_file_file(resp); while (!resp.NextPageToken->empty()) { list_opts.ContinuationToken = resp.NextPageToken; - resp = _client->ListBlobs(list_opts); + resp = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + return _client->ListBlobs(list_opts); + }); get_file_file(resp); } }, @@ -297,7 +352,10 @@ ObjectStorageResponse AzureObjStorageClient::delete_objects(const ObjectStorageP ObjectStorageResponse AzureObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) { return do_azure_client_call( [&]() { - auto resp = _client->DeleteBlob(opts.key); + auto resp = s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency); + return _client->DeleteBlob(opts.key); + }); if (!resp.Value.Deleted) { throw Exception(Status::IOError<false>("Delete azure blob failed")); } @@ -321,14 +379,20 @@ ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively( } return ObjectStorageResponse::OK(); }; - auto resp = _client->ListBlobs(list_opts); + auto resp = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + return _client->ListBlobs(list_opts); + }); if (auto response = delete_func(resp.Blobs); response.status.code != ErrorCode::OK) { return response; } while (!resp.NextPageToken->empty()) { list_opts.ContinuationToken = resp.NextPageToken; - resp = _client->ListBlobs(list_opts); + resp = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + return _client->ListBlobs(list_opts); + }); if (auto response = delete_func(resp.Blobs); response.status.code != ErrorCode::OK) { return response; diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index a5c6ec09162..ab9033e586d 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -120,18 +120,6 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea if (!client) { return Status::InternalError("init s3 client error"); } - // // clang-format off - // auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, - // to, offset, bytes_req, bytes_read); - // // clang-format on - // if (resp.status.code != ErrorCode::OK) { - // return std::move(Status(resp.status.code, std::move(resp.status.msg)) - // .append(fmt::format("failed to read from {}", _path.native()))); - // } - // if (*bytes_read != bytes_req) { - // return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", - // _path.native(), *bytes_read, bytes_req); - SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); int retry_count = 0; const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds diff --git a/be/src/io/fs/s3_obj_storage_client.cpp b/be/src/io/fs/s3_obj_storage_client.cpp index 2bed3241e30..2c66e819833 100644 --- a/be/src/io/fs/s3_obj_storage_client.cpp +++ b/be/src/io/fs/s3_obj_storage_client.cpp @@ -71,6 +71,35 @@ #include "io/fs/s3_common.h" #include "util/bvar_helper.h" +namespace { +inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() { + return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false}; +} + +template <typename Func> +auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> decltype(callback()) { + using T = decltype(callback()); + if (!doris::config::enable_s3_rate_limiter) { + return callback(); + } + auto sleep_duration = doris::S3ClientFactory::instance().rate_limiter(op)->add(1); + if (sleep_duration < 0) { + return T(s3_error_factory()); + } + return callback(); +} + +template <typename Func> +auto s3_get_rate_limit(Func callback) -> decltype(callback()) { + return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback)); +} + +template <typename Func> +auto s3_put_rate_limit(Func callback) -> decltype(callback()) { + return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback)); +} +} // namespace + namespace Aws::S3::Model { class DeleteObjectRequest; } // namespace Aws::S3::Model @@ -92,9 +121,9 @@ ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload( create_request.SetContentType("application/octet-stream"); SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); - auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(_client->CreateMultipartUpload(create_request), - "s3_file_writer::create_multi_part_upload", - std::cref(create_request).get()); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( + s3_put_rate_limit([&]() { return _client->CreateMultipartUpload(create_request); }), + "s3_file_writer::create_multi_part_upload", std::cref(create_request).get()); SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome); if (outcome.IsSuccess()) { @@ -122,9 +151,9 @@ ObjectStorageResponse S3ObjStorageClient::put_object(const ObjectStoragePathOpti request.SetContentLength(stream.size()); request.SetContentType("application/octet-stream"); SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency); - auto response = - SYNC_POINT_HOOK_RETURN_VALUE(_client->PutObject(request), "s3_file_writer::put_object", - std::cref(request).get(), &stream); + auto response = SYNC_POINT_HOOK_RETURN_VALUE( + s3_put_rate_limit([&]() { return _client->PutObject(request); }), + "s3_file_writer::put_object", std::cref(request).get(), &stream); if (!response.IsSuccess()) { auto st = s3fs_error(response.GetError(), fmt::format("failed to put object {}", opts.path.native())); @@ -157,8 +186,8 @@ ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const ObjectStorageP { SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); upload_part_outcome = SYNC_POINT_HOOK_RETURN_VALUE( - _client->UploadPart(upload_request), "s3_file_writer::upload_part", - std::cref(upload_request).get(), &stream); + s3_put_rate_limit([&]() { return _client->UploadPart(upload_request); }), + "s3_file_writer::upload_part", std::cref(upload_request).get(), &stream); } TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", &upload_part_outcome); if (!upload_part_outcome.IsSuccess()) { @@ -199,7 +228,7 @@ ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload( TEST_SYNC_POINT_RETURN_WITH_VALUE("S3FileWriter::_complete:3", ObjectStorageResponse(), this); SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); auto complete_outcome = SYNC_POINT_HOOK_RETURN_VALUE( - _client->CompleteMultipartUpload(complete_request), + s3_put_rate_limit([&]() { return _client->CompleteMultipartUpload(complete_request); }), "s3_file_writer::complete_multi_part", std::cref(complete_request).get()); if (!complete_outcome.IsSuccess()) { @@ -220,7 +249,8 @@ ObjectStorageHeadResponse S3ObjStorageClient::head_object(const ObjectStoragePat SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency); auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( - _client->HeadObject(request), "s3_file_system::head_object", std::ref(request).get()); + s3_get_rate_limit([&]() { return _client->HeadObject(request); }), + "s3_file_system::head_object", std::ref(request).get()); if (outcome.IsSuccess()) { return {.resp = {convert_to_obj_response(Status::OK())}, .file_size = outcome.GetResult().GetContentLength()}; @@ -247,7 +277,7 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const ObjectStoragePathOpti request.SetResponseStreamFactory(AwsWriteableStreamFactory(buffer, bytes_read)); SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); - auto outcome = _client->GetObject(request); + auto outcome = s3_get_rate_limit([&]() { return _client->GetObject(request); }); if (!outcome.IsSuccess()) { return {convert_to_obj_response( s3fs_error(outcome.GetError(), @@ -273,7 +303,7 @@ ObjectStorageResponse S3ObjStorageClient::list_objects(const ObjectStoragePathOp Aws::S3::Model::ListObjectsV2Outcome outcome; { SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); - outcome = _client->ListObjectsV2(request); + outcome = s3_get_rate_limit([&]() { return _client->ListObjectsV2(request); }); } if (!outcome.IsSuccess()) { files->clear(); @@ -310,8 +340,9 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects(const ObjectStoragePath }); del.WithObjects(std::move(objects)).SetQuiet(true); delete_request.SetDelete(std::move(del)); - SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); - auto delete_outcome = _client->DeleteObjects(delete_request); + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency); + auto delete_outcome = + s3_put_rate_limit([&]() { return _client->DeleteObjects(delete_request); }); if (!delete_outcome.IsSuccess()) { return {convert_to_obj_response( s3fs_error(delete_outcome.GetError(), @@ -331,8 +362,8 @@ ObjectStorageResponse S3ObjStorageClient::delete_object(const ObjectStoragePathO Aws::S3::Model::DeleteObjectRequest request; request.WithBucket(opts.bucket).WithKey(opts.key); - SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); - auto outcome = _client->DeleteObject(request); + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency); + auto outcome = s3_put_rate_limit([&]() { return _client->DeleteObject(request); }); if (outcome.IsSuccess() || outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { return ObjectStorageResponse::OK(); @@ -354,7 +385,7 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively( Aws::S3::Model::ListObjectsV2Outcome outcome; { SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); - outcome = _client->ListObjectsV2(request); + outcome = s3_get_rate_limit([&]() { return _client->ListObjectsV2(request); }); } if (!outcome.IsSuccess()) { return {convert_to_obj_response(s3fs_error( @@ -373,8 +404,9 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively( Aws::S3::Model::Delete del; del.WithObjects(std::move(objects)).SetQuiet(true); delete_request.SetDelete(std::move(del)); - SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); - auto delete_outcome = _client->DeleteObjects(delete_request); + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency); + auto delete_outcome = + s3_put_rate_limit([&]() { return _client->DeleteObjects(delete_request); }); if (!delete_outcome.IsSuccess()) { return {convert_to_obj_response( s3fs_error(delete_outcome.GetError(), diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index 24e11b03b0b..d19fcdd63c2 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -56,7 +56,8 @@ namespace doris { namespace s3_bvar { bvar::LatencyRecorder s3_get_latency("s3_get"); bvar::LatencyRecorder s3_put_latency("s3_put"); -bvar::LatencyRecorder s3_delete_latency("s3_delete"); +bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object"); +bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects"); bvar::LatencyRecorder s3_head_latency("s3_head"); bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload"); bvar::LatencyRecorder s3_list_latency("s3_list"); @@ -90,10 +91,22 @@ constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONN_SIZE"; constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS"; constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS"; +auto metric_func_factory(bvar::Adder<int64_t>& ns_bvar, bvar::Adder<int64_t>& req_num_bvar) { + return [&](int64_t ns) { + if (ns > 0) { + ns_bvar << ns; + } else { + req_num_bvar << 1; + } + }; +} + } // namespace -bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms"); -bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms"); +bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns"); +bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num"); +bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns"); +bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num"); S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) { CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type); @@ -164,18 +177,19 @@ S3ClientFactory::S3ClientFactory() { }; Aws::InitAPI(_aws_options); _ca_cert_file_path = get_valid_ca_cert_path(); - _rate_limiters = {std::make_unique<S3RateLimiterHolder>( - S3RateLimitType::GET, config::s3_get_token_per_second, - config::s3_get_bucket_tokens, config::s3_get_token_limit, - [&](int64_t ms) { get_rate_limit_ms << ms; }), - std::make_unique<S3RateLimiterHolder>( - S3RateLimitType::PUT, config::s3_put_token_per_second, - config::s3_put_bucket_tokens, config::s3_put_token_limit, - [&](int64_t ms) { put_rate_limit_ms << ms; })}; + _rate_limiters = { + std::make_unique<S3RateLimiterHolder>( + S3RateLimitType::GET, config::s3_get_token_per_second, + config::s3_get_bucket_tokens, config::s3_get_token_limit, + metric_func_factory(get_rate_limit_ns, get_rate_limit_exceed_req_num)), + std::make_unique<S3RateLimiterHolder>( + S3RateLimitType::PUT, config::s3_put_token_per_second, + config::s3_put_bucket_tokens, config::s3_put_token_limit, + metric_func_factory(put_rate_limit_ns, put_rate_limit_exceed_req_num))}; } -string S3ClientFactory::get_valid_ca_cert_path() { - vector<std::string> vec_ca_file_path = doris::split(config::ca_cert_file_paths, ";"); +std::string S3ClientFactory::get_valid_ca_cert_path() { + auto vec_ca_file_path = doris::split(config::ca_cert_file_paths, ";"); auto it = vec_ca_file_path.begin(); for (; it != vec_ca_file_path.end(); ++it) { if (std::filesystem::exists(*it)) { diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index 3d8f55e7613..1a1a5ae39ca 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -50,7 +50,8 @@ namespace doris { namespace s3_bvar { extern bvar::LatencyRecorder s3_get_latency; extern bvar::LatencyRecorder s3_put_latency; -extern bvar::LatencyRecorder s3_delete_latency; +extern bvar::LatencyRecorder s3_delete_object_latency; +extern bvar::LatencyRecorder s3_delete_objects_latency; extern bvar::LatencyRecorder s3_head_latency; extern bvar::LatencyRecorder s3_multi_part_upload_latency; extern bvar::LatencyRecorder s3_list_latency; @@ -61,25 +62,6 @@ extern bvar::LatencyRecorder s3_copy_object_latency; class S3URI; -inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() { - return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false}; -} - -#define DO_S3_RATE_LIMIT(op, code) \ - [&]() mutable { \ - if (!config::enable_s3_rate_limiter) { \ - return (code); \ - } \ - auto sleep_duration = S3ClientFactory::instance().rate_limiter(op)->add(1); \ - if (sleep_duration < 0) { \ - using T = decltype((code)); \ - return T(s3_error_factory()); \ - } \ - return (code); \ - }() - -#define DO_S3_GET_RATE_LIMIT(code) DO_S3_RATE_LIMIT(S3RateLimitType::GET, code) - struct S3ClientConf { std::string endpoint; std::string region; diff --git a/cloud/src/recycler/azure_obj_client.cpp b/cloud/src/recycler/azure_obj_client.cpp index 1ad5d5dca7d..8674768fcd8 100644 --- a/cloud/src/recycler/azure_obj_client.cpp +++ b/cloud/src/recycler/azure_obj_client.cpp @@ -115,7 +115,10 @@ public: } try { - auto resp = s3_get_rate_limit([&]() { return client_->ListBlobs(req_); }); + auto resp = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + return client_->ListBlobs(req_); + }); has_more_ = resp.NextPageToken.HasValue(); DCHECK(!(has_more_ && resp.Blobs.empty())) << has_more_ << ' ' << resp.Blobs.empty(); req_.ContinuationToken = std::move(resp.NextPageToken); @@ -172,6 +175,7 @@ ObjectStorageResponse AzureObjClient::put_object(ObjectStoragePathRef path, return do_azure_client_call( [&]() { s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency); return client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size()); }); @@ -181,8 +185,10 @@ ObjectStorageResponse AzureObjClient::put_object(ObjectStoragePathRef path, ObjectStorageResponse AzureObjClient::head_object(ObjectStoragePathRef path, ObjectMeta* res) { try { - auto&& properties = s3_get_rate_limit( - [&]() { return client_->GetBlockBlobClient(path.key).GetProperties().Value; }); + auto&& properties = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency); + return client_->GetBlockBlobClient(path.key).GetProperties().Value; + }); res->key = path.key; res->mtime_s = properties.LastModified.time_since_epoch().count(); res->size = properties.BlobSize; @@ -232,7 +238,12 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, deferred_resps.emplace_back(batch.DeleteBlob(*it)); } auto resp = do_azure_client_call( - [&]() { s3_put_rate_limit([&]() { return client_->SubmitBatch(batch); }); }, + [&]() { + s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency); + return client_->SubmitBatch(batch); + }); + }, client_->GetUrl(), *begin); if (resp.ret != 0) { return resp; @@ -266,7 +277,10 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) { return do_azure_client_call( [&]() { - if (auto r = s3_put_rate_limit([&]() { return client_->DeleteBlob(path.key); }); + if (auto r = s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency); + return client_->DeleteBlob(path.key); + }); !r.Value.Deleted) { throw std::runtime_error("Delete azure blob failed"); } diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 80717fac81e..c96d3336fa8 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -47,19 +47,49 @@ #include "recycler/s3_obj_client.h" #include "recycler/storage_vault_accessor.h" +namespace { +auto metric_func_factory(bvar::Adder<int64_t>& ns_bvar, bvar::Adder<int64_t>& req_num_bvar) { + return [&](int64_t ns) { + if (ns > 0) { + ns_bvar << ns; + } else { + req_num_bvar << 1; + } + }; +} +} // namespace + namespace doris::cloud { -bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms"); -bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms"); + +namespace s3_bvar { +bvar::LatencyRecorder s3_get_latency("s3_get"); +bvar::LatencyRecorder s3_put_latency("s3_put"); +bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object"); +bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects"); +bvar::LatencyRecorder s3_head_latency("s3_head"); +bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload"); +bvar::LatencyRecorder s3_list_latency("s3_list"); +bvar::LatencyRecorder s3_list_object_versions_latency("s3_list_object_versions"); +bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version"); +bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object"); +}; // namespace s3_bvar + +bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns"); +bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num"); +bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns"); +bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num"); AccessorRateLimiter::AccessorRateLimiter() - : _rate_limiters({std::make_unique<S3RateLimiterHolder>( - S3RateLimitType::GET, config::s3_get_token_per_second, - config::s3_get_bucket_tokens, config::s3_get_token_limit, - [&](int64_t ms) { get_rate_limit_ms << ms; }), - std::make_unique<S3RateLimiterHolder>( - S3RateLimitType::PUT, config::s3_put_token_per_second, - config::s3_put_bucket_tokens, config::s3_put_token_limit, - [&](int64_t ms) { put_rate_limit_ms << ms; })}) {} + : _rate_limiters( + {std::make_unique<S3RateLimiterHolder>( + S3RateLimitType::GET, config::s3_get_token_per_second, + config::s3_get_bucket_tokens, config::s3_get_token_limit, + metric_func_factory(get_rate_limit_ns, get_rate_limit_exceed_req_num)), + std::make_unique<S3RateLimiterHolder>( + S3RateLimitType::PUT, config::s3_put_token_per_second, + config::s3_put_bucket_tokens, config::s3_put_token_limit, + metric_func_factory(put_rate_limit_ns, + put_rate_limit_exceed_req_num))}) {} S3RateLimiterHolder* AccessorRateLimiter::rate_limiter(S3RateLimitType type) { CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type); diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 19dc08fe83e..91f75765a9b 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -17,10 +17,13 @@ #pragma once +#include <bvar/latency_recorder.h> + #include <array> #include <cstdint> #include <memory> +#include "common/stopwatch.h" #include "recycler/s3_obj_client.h" #include "recycler/storage_vault_accessor.h" @@ -35,6 +38,25 @@ enum class S3RateLimitType; namespace cloud { class ObjectStoreInfoPB; +namespace s3_bvar { +extern bvar::LatencyRecorder s3_get_latency; +extern bvar::LatencyRecorder s3_put_latency; +extern bvar::LatencyRecorder s3_delete_object_latency; +extern bvar::LatencyRecorder s3_delete_objects_latency; +extern bvar::LatencyRecorder s3_head_latency; +extern bvar::LatencyRecorder s3_multi_part_upload_latency; +extern bvar::LatencyRecorder s3_list_latency; +extern bvar::LatencyRecorder s3_list_object_versions_latency; +extern bvar::LatencyRecorder s3_get_bucket_version_latency; +extern bvar::LatencyRecorder s3_copy_object_latency; +}; // namespace s3_bvar + +// The time unit is the same with BE: us +#define SCOPED_BVAR_LATENCY(bvar_item) \ + StopWatch sw; \ + std::unique_ptr<int, std::function<void(int*)>> defer( \ + (int*)0x01, [&](int*) { bvar_item << sw.elapsed_us(); }); + struct AccessorRateLimiter { public: ~AccessorRateLimiter() = default; diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp index c44473bb9eb..d1307de53b7 100644 --- a/cloud/src/recycler/s3_obj_client.cpp +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -89,7 +89,10 @@ public: return false; } - auto outcome = s3_get_rate_limit([&]() { return client_->ListObjectsV2(req_); }); + auto outcome = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); + return client_->ListObjectsV2(req_); + }); if (!outcome.IsSuccess()) { LOG_WARNING("failed to list objects") @@ -152,7 +155,10 @@ ObjectStorageResponse S3ObjClient::put_object(ObjectStoragePathRef path, std::st auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor"); *input << stream; request.SetBody(input); - auto outcome = s3_put_rate_limit([&]() { return s3_client_->PutObject(request); }); + auto outcome = s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency); + return s3_client_->PutObject(request); + }); if (!outcome.IsSuccess()) { LOG_WARNING("failed to put object") .tag("endpoint", endpoint_) @@ -168,7 +174,10 @@ ObjectStorageResponse S3ObjClient::put_object(ObjectStoragePathRef path, std::st ObjectStorageResponse S3ObjClient::head_object(ObjectStoragePathRef path, ObjectMeta* res) { Aws::S3::Model::HeadObjectRequest request; request.WithBucket(path.bucket).WithKey(path.key); - auto outcome = s3_get_rate_limit([&]() { return s3_client_->HeadObject(request); }); + auto outcome = s3_get_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency); + return s3_client_->HeadObject(request); + }); if (outcome.IsSuccess()) { res->key = path.key; res->size = outcome.GetResult().GetContentLength(); @@ -209,8 +218,10 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, Aws::S3::Model::Delete del; del.WithObjects(std::move(objects)).SetQuiet(true); delete_request.SetDelete(std::move(del)); - auto delete_outcome = - s3_put_rate_limit([&]() { return s3_client_->DeleteObjects(delete_request); }); + auto delete_outcome = s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency); + return s3_client_->DeleteObjects(delete_request); + }); if (!delete_outcome.IsSuccess()) { LOG_WARNING("failed to delete objects") .tag("endpoint", endpoint_) @@ -266,7 +277,10 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) { Aws::S3::Model::DeleteObjectRequest request; request.WithBucket(path.bucket).WithKey(path.key); - auto outcome = s3_put_rate_limit([&]() { return s3_client_->DeleteObject(request); }); + auto outcome = s3_put_rate_limit([&]() { + SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency); + return s3_client_->DeleteObject(request); + }); if (!outcome.IsSuccess()) { LOG_WARNING("failed to delete object") .tag("endpoint", endpoint_) diff --git a/common/cpp/s3_rate_limiter.cpp b/common/cpp/s3_rate_limiter.cpp index 3bd311bc955..64ee4ce19d8 100644 --- a/common/cpp/s3_rate_limiter.cpp +++ b/common/cpp/s3_rate_limiter.cpp @@ -22,6 +22,7 @@ #include <chrono> #include <mutex> #include <thread> + #if defined(__APPLE__) #include <ctime> #define CURRENT_TIME std::chrono::system_clock::now() @@ -31,7 +32,7 @@ namespace doris { // Just 10^6. -static constexpr auto MS = 1000000UL; +static constexpr auto NS = 1000000000UL; class S3RateLimiter::SimpleSpinLock { public: @@ -69,29 +70,32 @@ S3RateLimiter::~S3RateLimiter() = default; S3RateLimiterHolder::~S3RateLimiterHolder() = default; -std::pair<size_t, double> S3RateLimiter::_update_remain_token( - std::chrono::system_clock::time_point now, size_t amount) { +std::pair<size_t, double> S3RateLimiter::_update_remain_token(long now, size_t amount) { // Values obtained under lock to be checked after release size_t count_value; double tokens_value; { std::lock_guard<SimpleSpinLock> lock(*_mutex); + now = (now < _prev_ns_count) ? _prev_ns_count : now; if (_max_speed) { - double delta_seconds = static_cast<double>((now - _prev_ms).count()) / MS; + double delta_seconds = + _prev_ns_count ? static_cast<double>(now - _prev_ns_count) / NS : 0; _remain_tokens = std::min<double>(_remain_tokens + _max_speed * delta_seconds - amount, _max_burst); } _count += amount; count_value = _count; tokens_value = _remain_tokens; - _prev_ms = now; + _prev_ns_count = now; } return {count_value, tokens_value}; } int64_t S3RateLimiter::add(size_t amount) { // Values obtained under lock to be checked after release - auto [count_value, tokens_value] = _update_remain_token(CURRENT_TIME, amount); + auto time = CURRENT_TIME; + auto time_nano_count = time.time_since_epoch().count(); + auto [count_value, tokens_value] = _update_remain_token(time_nano_count, amount); if (_limit && count_value > _limit) { // CK would throw exception @@ -99,13 +103,13 @@ int64_t S3RateLimiter::add(size_t amount) { } // Wait unless there is positive amount of remain_tokens - throttling - int64_t sleep_time_ms = 0; + int64_t sleep_time_ns = 0; if (_max_speed && tokens_value < 0) { - sleep_time_ms = static_cast<int64_t>(-tokens_value / _max_speed * MS); - std::this_thread::sleep_for(std::chrono::microseconds(sleep_time_ms)); + sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS); + std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time_ns)); } - return sleep_time_ms; + return sleep_time_ns; } S3RateLimiterHolder::S3RateLimiterHolder(S3RateLimitType type, size_t max_speed, size_t max_burst, @@ -119,9 +123,7 @@ int64_t S3RateLimiterHolder::add(size_t amount) { std::shared_lock read {rate_limiter_rw_lock}; sleep = rate_limiter->add(amount); } - if (sleep > 0) { - metric_func(sleep); - } + metric_func(sleep); return sleep; } diff --git a/common/cpp/s3_rate_limiter.h b/common/cpp/s3_rate_limiter.h index 6f01cf33226..ad2bf3db9ed 100644 --- a/common/cpp/s3_rate_limiter.h +++ b/common/cpp/s3_rate_limiter.h @@ -17,7 +17,6 @@ #pragma once -#include <chrono> #include <functional> #include <memory> #include <shared_mutex> @@ -44,8 +43,7 @@ public: int64_t add(size_t amount); private: - std::pair<size_t, double> _update_remain_token(std::chrono::system_clock::time_point now, - size_t amount); + std::pair<size_t, double> _update_remain_token(long now, size_t amount); size_t _count {0}; const size_t _max_speed {0}; // in tokens per second. which indicates the QPS const size_t _max_burst {0}; // in tokens. which indicates the token bucket size @@ -54,7 +52,7 @@ private: std::unique_ptr<SimpleSpinLock> _mutex; // Amount of remain_tokens available in token bucket. Updated in `add` method. double _remain_tokens {0}; - std::chrono::system_clock::time_point _prev_ms; // Previous `add` call time (in nanoseconds). + long _prev_ns_count {0}; // Previous `add` call time (in nanoseconds). }; class S3RateLimiterHolder { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org