This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2aeb3b5af4a0d0339d568cfcf42750eb8d8c19bf
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Wed Jul 31 11:35:41 2024 +0800

    [enhance](RateLimit) Add bvar to monitor object storage rate limit sleep 
time and failure time (#38294)
    
    Add one bvar to monitor the failed request due to exceeds rate limit.
---
 be/src/io/fs/azure_obj_storage_client.cpp | 90 ++++++++++++++++++++++++++-----
 be/src/io/fs/s3_file_reader.cpp           | 12 -----
 be/src/io/fs/s3_obj_storage_client.cpp    | 70 +++++++++++++++++-------
 be/src/util/s3_util.cpp                   | 40 +++++++++-----
 be/src/util/s3_util.h                     | 22 +-------
 cloud/src/recycler/azure_obj_client.cpp   | 24 +++++++--
 cloud/src/recycler/s3_accessor.cpp        | 50 +++++++++++++----
 cloud/src/recycler/s3_accessor.h          | 22 ++++++++
 cloud/src/recycler/s3_obj_client.cpp      | 26 ++++++---
 common/cpp/s3_rate_limiter.cpp            | 28 +++++-----
 common/cpp/s3_rate_limiter.h              |  6 +--
 11 files changed, 275 insertions(+), 115 deletions(-)

diff --git a/be/src/io/fs/azure_obj_storage_client.cpp 
b/be/src/io/fs/azure_obj_storage_client.cpp
index 043886672a2..9f33db3400a 100644
--- a/be/src/io/fs/azure_obj_storage_client.cpp
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -42,6 +42,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/obj_storage_client.h"
+#include "util/bvar_helper.h"
 #include "util/s3_util.h"
 
 using namespace Azure::Storage::Blobs;
@@ -57,6 +58,28 @@ auto base64_encode_part_num(int part_num) {
             {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
 }
 
+template <typename Func>
+auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> 
decltype(callback()) {
+    if (!doris::config::enable_s3_rate_limiter) {
+        return callback();
+    }
+    auto sleep_duration = 
doris::S3ClientFactory::instance().rate_limiter(op)->add(1);
+    if (sleep_duration < 0) {
+        throw std::runtime_error("Azure exceeds request limit");
+    }
+    return callback();
+}
+
+template <typename Func>
+auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
+    return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback));
+}
+
+template <typename Func>
+auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
+    return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback));
+}
+
 constexpr char SAS_TOKEN_URL_TEMPLATE[] = 
"https://{}.blob.core.windows.net/{}/{}{}";;
 constexpr char BlobNotFound[] = "BlobNotFound";
 } // namespace
@@ -101,7 +124,14 @@ struct AzureBatchDeleter {
         if (deferred_resps.empty()) {
             return ObjectStorageResponse::OK();
         }
-        auto resp = do_azure_client_call([&]() { _client->SubmitBatch(_batch); 
}, _opts);
+        auto resp = do_azure_client_call(
+                [&]() {
+                    s3_put_rate_limit([&]() {
+                        
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
+                        _client->SubmitBatch(_batch);
+                    });
+                },
+                _opts);
         if (resp.status.code != ErrorCode::OK) {
             return resp;
         }
@@ -156,7 +186,11 @@ ObjectStorageResponse 
AzureObjStorageClient::put_object(const ObjectStoragePathO
     auto client = _client->GetBlockBlobClient(opts.key);
     return do_azure_client_call(
             [&]() {
-                client.UploadFrom(reinterpret_cast<const 
uint8_t*>(stream.data()), stream.size());
+                s3_put_rate_limit([&]() {
+                    SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
+                    client.UploadFrom(reinterpret_cast<const 
uint8_t*>(stream.data()),
+                                      stream.size());
+                });
             },
             opts);
 }
@@ -169,7 +203,10 @@ ObjectStorageUploadResponse 
AzureObjStorageClient::upload_part(const ObjectStora
         Azure::Core::IO::MemoryBodyStream memory_body(
                 reinterpret_cast<const uint8_t*>(stream.data()), 
stream.size());
         // The blockId must be base64 encoded
-        client.StageBlock(base64_encode_part_num(part_num), memory_body);
+        s3_put_rate_limit([&]() {
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
+            client.StageBlock(base64_encode_part_num(part_num), memory_body);
+        });
     } catch (Azure::Core::RequestFailedException& e) {
         auto msg = fmt::format(
                 "Azure request failed because {}, error msg {}, http code {}, 
path msg {}",
@@ -200,13 +237,22 @@ ObjectStorageResponse 
AzureObjStorageClient::complete_multipart_upload(
     std::ranges::transform(
             completed_parts, std::back_inserter(string_block_ids),
             [](const ObjectCompleteMultiPart& i) { return 
base64_encode_part_num(i.part_num); });
-    return do_azure_client_call([&]() { 
client.CommitBlockList(string_block_ids); }, opts);
+    return do_azure_client_call(
+            [&]() {
+                s3_put_rate_limit([&]() {
+                    SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
+                    client.CommitBlockList(string_block_ids);
+                });
+            },
+            opts);
 }
 
 ObjectStorageHeadResponse AzureObjStorageClient::head_object(const 
ObjectStoragePathOptions& opts) {
     try {
-        Models::BlobProperties properties =
-                _client->GetBlockBlobClient(opts.key).GetProperties().Value;
+        Models::BlobProperties properties = s3_get_rate_limit([&]() {
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
+            return _client->GetBlockBlobClient(opts.key).GetProperties().Value;
+        });
         return {.file_size = properties.BlobSize};
     } catch (Azure::Core::RequestFailedException& e) {
         if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
@@ -238,8 +284,11 @@ ObjectStorageResponse 
AzureObjStorageClient::get_object(const ObjectStoragePathO
                 DownloadBlobToOptions download_opts;
                 Azure::Core::Http::HttpRange range 
{static_cast<int64_t>(offset), bytes_read};
                 download_opts.Range = range;
-                auto resp = 
client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read,
-                                              download_opts);
+                auto resp = s3_get_rate_limit([&]() {
+                    SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
+                    return 
client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read,
+                                             download_opts);
+                });
                 *size_return = resp.Value.ContentRange.Length.Value();
             },
             opts);
@@ -257,11 +306,17 @@ ObjectStorageResponse 
AzureObjStorageClient::list_objects(const ObjectStoragePat
             [&]() {
                 ListBlobsOptions list_opts;
                 list_opts.Prefix = opts.prefix;
-                auto resp = _client->ListBlobs(list_opts);
+                auto resp = s3_get_rate_limit([&]() {
+                    SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+                    return _client->ListBlobs(list_opts);
+                });
                 get_file_file(resp);
                 while (!resp.NextPageToken->empty()) {
                     list_opts.ContinuationToken = resp.NextPageToken;
-                    resp = _client->ListBlobs(list_opts);
+                    resp = s3_get_rate_limit([&]() {
+                        SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+                        return _client->ListBlobs(list_opts);
+                    });
                     get_file_file(resp);
                 }
             },
@@ -297,7 +352,10 @@ ObjectStorageResponse 
AzureObjStorageClient::delete_objects(const ObjectStorageP
 ObjectStorageResponse AzureObjStorageClient::delete_object(const 
ObjectStoragePathOptions& opts) {
     return do_azure_client_call(
             [&]() {
-                auto resp = _client->DeleteBlob(opts.key);
+                auto resp = s3_put_rate_limit([&]() {
+                    SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
+                    return _client->DeleteBlob(opts.key);
+                });
                 if (!resp.Value.Deleted) {
                     throw Exception(Status::IOError<false>("Delete azure blob 
failed"));
                 }
@@ -321,14 +379,20 @@ ObjectStorageResponse 
AzureObjStorageClient::delete_objects_recursively(
         }
         return ObjectStorageResponse::OK();
     };
-    auto resp = _client->ListBlobs(list_opts);
+    auto resp = s3_get_rate_limit([&]() {
+        SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+        return _client->ListBlobs(list_opts);
+    });
     if (auto response = delete_func(resp.Blobs); response.status.code != 
ErrorCode::OK) {
         return response;
     }
 
     while (!resp.NextPageToken->empty()) {
         list_opts.ContinuationToken = resp.NextPageToken;
-        resp = _client->ListBlobs(list_opts);
+        resp = s3_get_rate_limit([&]() {
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+            return _client->ListBlobs(list_opts);
+        });
 
         if (auto response = delete_func(resp.Blobs); response.status.code != 
ErrorCode::OK) {
             return response;
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index a5c6ec09162..ab9033e586d 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -120,18 +120,6 @@ Status S3FileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_rea
     if (!client) {
         return Status::InternalError("init s3 client error");
     }
-    // // clang-format off
-    // auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
-    //         to, offset, bytes_req, bytes_read);
-    // // clang-format on
-    // if (resp.status.code != ErrorCode::OK) {
-    //     return std::move(Status(resp.status.code, 
std::move(resp.status.msg))
-    //                              .append(fmt::format("failed to read from 
{}", _path.native())));
-    // }
-    // if (*bytes_read != bytes_req) {
-    //     return Status::InternalError("failed to read from {}(bytes read: 
{}, bytes req: {})",
-    //                                  _path.native(), *bytes_read, 
bytes_req);
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
 
     int retry_count = 0;
     const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait 
time in milliseconds
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp 
b/be/src/io/fs/s3_obj_storage_client.cpp
index 2bed3241e30..2c66e819833 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -71,6 +71,35 @@
 #include "io/fs/s3_common.h"
 #include "util/bvar_helper.h"
 
+namespace {
+inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() {
+    return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds 
limit", false};
+}
+
+template <typename Func>
+auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> 
decltype(callback()) {
+    using T = decltype(callback());
+    if (!doris::config::enable_s3_rate_limiter) {
+        return callback();
+    }
+    auto sleep_duration = 
doris::S3ClientFactory::instance().rate_limiter(op)->add(1);
+    if (sleep_duration < 0) {
+        return T(s3_error_factory());
+    }
+    return callback();
+}
+
+template <typename Func>
+auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
+    return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback));
+}
+
+template <typename Func>
+auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
+    return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback));
+}
+} // namespace
+
 namespace Aws::S3::Model {
 class DeleteObjectRequest;
 } // namespace Aws::S3::Model
@@ -92,9 +121,9 @@ ObjectStorageUploadResponse 
S3ObjStorageClient::create_multipart_upload(
     create_request.SetContentType("application/octet-stream");
 
     SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
-    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(_client->CreateMultipartUpload(create_request),
-                                                
"s3_file_writer::create_multi_part_upload",
-                                                
std::cref(create_request).get());
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_put_rate_limit([&]() { return 
_client->CreateMultipartUpload(create_request); }),
+            "s3_file_writer::create_multi_part_upload", 
std::cref(create_request).get());
     SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
 
     if (outcome.IsSuccess()) {
@@ -122,9 +151,9 @@ ObjectStorageResponse S3ObjStorageClient::put_object(const 
ObjectStoragePathOpti
     request.SetContentLength(stream.size());
     request.SetContentType("application/octet-stream");
     SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
-    auto response =
-            SYNC_POINT_HOOK_RETURN_VALUE(_client->PutObject(request), 
"s3_file_writer::put_object",
-                                         std::cref(request).get(), &stream);
+    auto response = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_put_rate_limit([&]() { return _client->PutObject(request); }),
+            "s3_file_writer::put_object", std::cref(request).get(), &stream);
     if (!response.IsSuccess()) {
         auto st = s3fs_error(response.GetError(),
                              fmt::format("failed to put object {}", 
opts.path.native()));
@@ -157,8 +186,8 @@ ObjectStorageUploadResponse 
S3ObjStorageClient::upload_part(const ObjectStorageP
     {
         SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
         upload_part_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                _client->UploadPart(upload_request), 
"s3_file_writer::upload_part",
-                std::cref(upload_request).get(), &stream);
+                s3_put_rate_limit([&]() { return 
_client->UploadPart(upload_request); }),
+                "s3_file_writer::upload_part", 
std::cref(upload_request).get(), &stream);
     }
     TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", 
&upload_part_outcome);
     if (!upload_part_outcome.IsSuccess()) {
@@ -199,7 +228,7 @@ ObjectStorageResponse 
S3ObjStorageClient::complete_multipart_upload(
     TEST_SYNC_POINT_RETURN_WITH_VALUE("S3FileWriter::_complete:3", 
ObjectStorageResponse(), this);
     SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
     auto complete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-            _client->CompleteMultipartUpload(complete_request),
+            s3_put_rate_limit([&]() { return 
_client->CompleteMultipartUpload(complete_request); }),
             "s3_file_writer::complete_multi_part", 
std::cref(complete_request).get());
 
     if (!complete_outcome.IsSuccess()) {
@@ -220,7 +249,8 @@ ObjectStorageHeadResponse 
S3ObjStorageClient::head_object(const ObjectStoragePat
 
     SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
     auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-            _client->HeadObject(request), "s3_file_system::head_object", 
std::ref(request).get());
+            s3_get_rate_limit([&]() { return _client->HeadObject(request); }),
+            "s3_file_system::head_object", std::ref(request).get());
     if (outcome.IsSuccess()) {
         return {.resp = {convert_to_obj_response(Status::OK())},
                 .file_size = outcome.GetResult().GetContentLength()};
@@ -247,7 +277,7 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const 
ObjectStoragePathOpti
     request.SetResponseStreamFactory(AwsWriteableStreamFactory(buffer, 
bytes_read));
 
     SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
-    auto outcome = _client->GetObject(request);
+    auto outcome = s3_get_rate_limit([&]() { return 
_client->GetObject(request); });
     if (!outcome.IsSuccess()) {
         return {convert_to_obj_response(
                         s3fs_error(outcome.GetError(),
@@ -273,7 +303,7 @@ ObjectStorageResponse 
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
         Aws::S3::Model::ListObjectsV2Outcome outcome;
         {
             SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
-            outcome = _client->ListObjectsV2(request);
+            outcome = s3_get_rate_limit([&]() { return 
_client->ListObjectsV2(request); });
         }
         if (!outcome.IsSuccess()) {
             files->clear();
@@ -310,8 +340,9 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects(const ObjectStoragePath
     });
     del.WithObjects(std::move(objects)).SetQuiet(true);
     delete_request.SetDelete(std::move(del));
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
-    auto delete_outcome = _client->DeleteObjects(delete_request);
+    SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
+    auto delete_outcome =
+            s3_put_rate_limit([&]() { return 
_client->DeleteObjects(delete_request); });
     if (!delete_outcome.IsSuccess()) {
         return {convert_to_obj_response(
                         s3fs_error(delete_outcome.GetError(),
@@ -331,8 +362,8 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_object(const ObjectStoragePathO
     Aws::S3::Model::DeleteObjectRequest request;
     request.WithBucket(opts.bucket).WithKey(opts.key);
 
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
-    auto outcome = _client->DeleteObject(request);
+    SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
+    auto outcome = s3_put_rate_limit([&]() { return 
_client->DeleteObject(request); });
     if (outcome.IsSuccess() ||
         outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
         return ObjectStorageResponse::OK();
@@ -354,7 +385,7 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
         Aws::S3::Model::ListObjectsV2Outcome outcome;
         {
             SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
-            outcome = _client->ListObjectsV2(request);
+            outcome = s3_get_rate_limit([&]() { return 
_client->ListObjectsV2(request); });
         }
         if (!outcome.IsSuccess()) {
             return {convert_to_obj_response(s3fs_error(
@@ -373,8 +404,9 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
             Aws::S3::Model::Delete del;
             del.WithObjects(std::move(objects)).SetQuiet(true);
             delete_request.SetDelete(std::move(del));
-            SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
-            auto delete_outcome = _client->DeleteObjects(delete_request);
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
+            auto delete_outcome =
+                    s3_put_rate_limit([&]() { return 
_client->DeleteObjects(delete_request); });
             if (!delete_outcome.IsSuccess()) {
                 return {convert_to_obj_response(
                                 s3fs_error(delete_outcome.GetError(),
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 24e11b03b0b..d19fcdd63c2 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -56,7 +56,8 @@ namespace doris {
 namespace s3_bvar {
 bvar::LatencyRecorder s3_get_latency("s3_get");
 bvar::LatencyRecorder s3_put_latency("s3_put");
-bvar::LatencyRecorder s3_delete_latency("s3_delete");
+bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object");
+bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects");
 bvar::LatencyRecorder s3_head_latency("s3_head");
 bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload");
 bvar::LatencyRecorder s3_list_latency("s3_list");
@@ -90,10 +91,22 @@ constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONN_SIZE";
 constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS";
 constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";
 
+auto metric_func_factory(bvar::Adder<int64_t>& ns_bvar, bvar::Adder<int64_t>& 
req_num_bvar) {
+    return [&](int64_t ns) {
+        if (ns > 0) {
+            ns_bvar << ns;
+        } else {
+            req_num_bvar << 1;
+        }
+    };
+}
+
 } // namespace
 
-bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms");
-bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms");
+bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns");
+bvar::Adder<int64_t> 
get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num");
+bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
+bvar::Adder<int64_t> 
put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
 
 S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
     CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << 
to_string(type);
@@ -164,18 +177,19 @@ S3ClientFactory::S3ClientFactory() {
     };
     Aws::InitAPI(_aws_options);
     _ca_cert_file_path = get_valid_ca_cert_path();
-    _rate_limiters = {std::make_unique<S3RateLimiterHolder>(
-                              S3RateLimitType::GET, 
config::s3_get_token_per_second,
-                              config::s3_get_bucket_tokens, 
config::s3_get_token_limit,
-                              [&](int64_t ms) { get_rate_limit_ms << ms; }),
-                      std::make_unique<S3RateLimiterHolder>(
-                              S3RateLimitType::PUT, 
config::s3_put_token_per_second,
-                              config::s3_put_bucket_tokens, 
config::s3_put_token_limit,
-                              [&](int64_t ms) { put_rate_limit_ms << ms; })};
+    _rate_limiters = {
+            std::make_unique<S3RateLimiterHolder>(
+                    S3RateLimitType::GET, config::s3_get_token_per_second,
+                    config::s3_get_bucket_tokens, config::s3_get_token_limit,
+                    metric_func_factory(get_rate_limit_ns, 
get_rate_limit_exceed_req_num)),
+            std::make_unique<S3RateLimiterHolder>(
+                    S3RateLimitType::PUT, config::s3_put_token_per_second,
+                    config::s3_put_bucket_tokens, config::s3_put_token_limit,
+                    metric_func_factory(put_rate_limit_ns, 
put_rate_limit_exceed_req_num))};
 }
 
-string S3ClientFactory::get_valid_ca_cert_path() {
-    vector<std::string> vec_ca_file_path = 
doris::split(config::ca_cert_file_paths, ";");
+std::string S3ClientFactory::get_valid_ca_cert_path() {
+    auto vec_ca_file_path = doris::split(config::ca_cert_file_paths, ";");
     auto it = vec_ca_file_path.begin();
     for (; it != vec_ca_file_path.end(); ++it) {
         if (std::filesystem::exists(*it)) {
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 3d8f55e7613..1a1a5ae39ca 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -50,7 +50,8 @@ namespace doris {
 namespace s3_bvar {
 extern bvar::LatencyRecorder s3_get_latency;
 extern bvar::LatencyRecorder s3_put_latency;
-extern bvar::LatencyRecorder s3_delete_latency;
+extern bvar::LatencyRecorder s3_delete_object_latency;
+extern bvar::LatencyRecorder s3_delete_objects_latency;
 extern bvar::LatencyRecorder s3_head_latency;
 extern bvar::LatencyRecorder s3_multi_part_upload_latency;
 extern bvar::LatencyRecorder s3_list_latency;
@@ -61,25 +62,6 @@ extern bvar::LatencyRecorder s3_copy_object_latency;
 
 class S3URI;
 
-inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() {
-    return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds 
limit", false};
-}
-
-#define DO_S3_RATE_LIMIT(op, code)                                             
     \
-    [&]() mutable {                                                            
     \
-        if (!config::enable_s3_rate_limiter) {                                 
     \
-            return (code);                                                     
     \
-        }                                                                      
     \
-        auto sleep_duration = 
S3ClientFactory::instance().rate_limiter(op)->add(1); \
-        if (sleep_duration < 0) {                                              
     \
-            using T = decltype((code));                                        
     \
-            return T(s3_error_factory());                                      
     \
-        }                                                                      
     \
-        return (code);                                                         
     \
-    }()
-
-#define DO_S3_GET_RATE_LIMIT(code) DO_S3_RATE_LIMIT(S3RateLimitType::GET, code)
-
 struct S3ClientConf {
     std::string endpoint;
     std::string region;
diff --git a/cloud/src/recycler/azure_obj_client.cpp 
b/cloud/src/recycler/azure_obj_client.cpp
index 1ad5d5dca7d..8674768fcd8 100644
--- a/cloud/src/recycler/azure_obj_client.cpp
+++ b/cloud/src/recycler/azure_obj_client.cpp
@@ -115,7 +115,10 @@ public:
         }
 
         try {
-            auto resp = s3_get_rate_limit([&]() { return 
client_->ListBlobs(req_); });
+            auto resp = s3_get_rate_limit([&]() {
+                SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+                return client_->ListBlobs(req_);
+            });
             has_more_ = resp.NextPageToken.HasValue();
             DCHECK(!(has_more_ && resp.Blobs.empty())) << has_more_ << ' ' << 
resp.Blobs.empty();
             req_.ContinuationToken = std::move(resp.NextPageToken);
@@ -172,6 +175,7 @@ ObjectStorageResponse 
AzureObjClient::put_object(ObjectStoragePathRef path,
     return do_azure_client_call(
             [&]() {
                 s3_put_rate_limit([&]() {
+                    SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
                     return client.UploadFrom(reinterpret_cast<const 
uint8_t*>(stream.data()),
                                              stream.size());
                 });
@@ -181,8 +185,10 @@ ObjectStorageResponse 
AzureObjClient::put_object(ObjectStoragePathRef path,
 
 ObjectStorageResponse AzureObjClient::head_object(ObjectStoragePathRef path, 
ObjectMeta* res) {
     try {
-        auto&& properties = s3_get_rate_limit(
-                [&]() { return 
client_->GetBlockBlobClient(path.key).GetProperties().Value; });
+        auto&& properties = s3_get_rate_limit([&]() {
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
+            return client_->GetBlockBlobClient(path.key).GetProperties().Value;
+        });
         res->key = path.key;
         res->mtime_s = properties.LastModified.time_since_epoch().count();
         res->size = properties.BlobSize;
@@ -232,7 +238,12 @@ ObjectStorageResponse AzureObjClient::delete_objects(const 
std::string& bucket,
             deferred_resps.emplace_back(batch.DeleteBlob(*it));
         }
         auto resp = do_azure_client_call(
-                [&]() { s3_put_rate_limit([&]() { return 
client_->SubmitBatch(batch); }); },
+                [&]() {
+                    s3_put_rate_limit([&]() {
+                        
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
+                        return client_->SubmitBatch(batch);
+                    });
+                },
                 client_->GetUrl(), *begin);
         if (resp.ret != 0) {
             return resp;
@@ -266,7 +277,10 @@ ObjectStorageResponse AzureObjClient::delete_objects(const 
std::string& bucket,
 ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) 
{
     return do_azure_client_call(
             [&]() {
-                if (auto r = s3_put_rate_limit([&]() { return 
client_->DeleteBlob(path.key); });
+                if (auto r = s3_put_rate_limit([&]() {
+                        SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
+                        return client_->DeleteBlob(path.key);
+                    });
                     !r.Value.Deleted) {
                     throw std::runtime_error("Delete azure blob failed");
                 }
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 80717fac81e..c96d3336fa8 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -47,19 +47,49 @@
 #include "recycler/s3_obj_client.h"
 #include "recycler/storage_vault_accessor.h"
 
+namespace {
+auto metric_func_factory(bvar::Adder<int64_t>& ns_bvar, bvar::Adder<int64_t>& 
req_num_bvar) {
+    return [&](int64_t ns) {
+        if (ns > 0) {
+            ns_bvar << ns;
+        } else {
+            req_num_bvar << 1;
+        }
+    };
+}
+} // namespace
+
 namespace doris::cloud {
-bvar::Adder<int64_t> get_rate_limit_ms("get_rate_limit_ms");
-bvar::Adder<int64_t> put_rate_limit_ms("put_rate_limit_ms");
+
+namespace s3_bvar {
+bvar::LatencyRecorder s3_get_latency("s3_get");
+bvar::LatencyRecorder s3_put_latency("s3_put");
+bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object");
+bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects");
+bvar::LatencyRecorder s3_head_latency("s3_head");
+bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload");
+bvar::LatencyRecorder s3_list_latency("s3_list");
+bvar::LatencyRecorder 
s3_list_object_versions_latency("s3_list_object_versions");
+bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version");
+bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object");
+}; // namespace s3_bvar
+
+bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns");
+bvar::Adder<int64_t> 
get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num");
+bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
+bvar::Adder<int64_t> 
put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
 
 AccessorRateLimiter::AccessorRateLimiter()
-        : _rate_limiters({std::make_unique<S3RateLimiterHolder>(
-                                  S3RateLimitType::GET, 
config::s3_get_token_per_second,
-                                  config::s3_get_bucket_tokens, 
config::s3_get_token_limit,
-                                  [&](int64_t ms) { get_rate_limit_ms << ms; 
}),
-                          std::make_unique<S3RateLimiterHolder>(
-                                  S3RateLimitType::PUT, 
config::s3_put_token_per_second,
-                                  config::s3_put_bucket_tokens, 
config::s3_put_token_limit,
-                                  [&](int64_t ms) { put_rate_limit_ms << ms; 
})}) {}
+        : _rate_limiters(
+                  {std::make_unique<S3RateLimiterHolder>(
+                           S3RateLimitType::GET, 
config::s3_get_token_per_second,
+                           config::s3_get_bucket_tokens, 
config::s3_get_token_limit,
+                           metric_func_factory(get_rate_limit_ns, 
get_rate_limit_exceed_req_num)),
+                   std::make_unique<S3RateLimiterHolder>(
+                           S3RateLimitType::PUT, 
config::s3_put_token_per_second,
+                           config::s3_put_bucket_tokens, 
config::s3_put_token_limit,
+                           metric_func_factory(put_rate_limit_ns,
+                                               
put_rate_limit_exceed_req_num))}) {}
 
 S3RateLimiterHolder* AccessorRateLimiter::rate_limiter(S3RateLimitType type) {
     CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << 
to_string(type);
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 19dc08fe83e..91f75765a9b 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -17,10 +17,13 @@
 
 #pragma once
 
+#include <bvar/latency_recorder.h>
+
 #include <array>
 #include <cstdint>
 #include <memory>
 
+#include "common/stopwatch.h"
 #include "recycler/s3_obj_client.h"
 #include "recycler/storage_vault_accessor.h"
 
@@ -35,6 +38,25 @@ enum class S3RateLimitType;
 namespace cloud {
 class ObjectStoreInfoPB;
 
+namespace s3_bvar {
+extern bvar::LatencyRecorder s3_get_latency;
+extern bvar::LatencyRecorder s3_put_latency;
+extern bvar::LatencyRecorder s3_delete_object_latency;
+extern bvar::LatencyRecorder s3_delete_objects_latency;
+extern bvar::LatencyRecorder s3_head_latency;
+extern bvar::LatencyRecorder s3_multi_part_upload_latency;
+extern bvar::LatencyRecorder s3_list_latency;
+extern bvar::LatencyRecorder s3_list_object_versions_latency;
+extern bvar::LatencyRecorder s3_get_bucket_version_latency;
+extern bvar::LatencyRecorder s3_copy_object_latency;
+}; // namespace s3_bvar
+
+// The time unit is the same with BE: us
+#define SCOPED_BVAR_LATENCY(bvar_item)                     \
+    StopWatch sw;                                          \
+    std::unique_ptr<int, std::function<void(int*)>> defer( \
+            (int*)0x01, [&](int*) { bvar_item << sw.elapsed_us(); });
+
 struct AccessorRateLimiter {
 public:
     ~AccessorRateLimiter() = default;
diff --git a/cloud/src/recycler/s3_obj_client.cpp 
b/cloud/src/recycler/s3_obj_client.cpp
index c44473bb9eb..d1307de53b7 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -89,7 +89,10 @@ public:
             return false;
         }
 
-        auto outcome = s3_get_rate_limit([&]() { return 
client_->ListObjectsV2(req_); });
+        auto outcome = s3_get_rate_limit([&]() {
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+            return client_->ListObjectsV2(req_);
+        });
 
         if (!outcome.IsSuccess()) {
             LOG_WARNING("failed to list objects")
@@ -152,7 +155,10 @@ ObjectStorageResponse 
S3ObjClient::put_object(ObjectStoragePathRef path, std::st
     auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
     *input << stream;
     request.SetBody(input);
-    auto outcome = s3_put_rate_limit([&]() { return 
s3_client_->PutObject(request); });
+    auto outcome = s3_put_rate_limit([&]() {
+        SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
+        return s3_client_->PutObject(request);
+    });
     if (!outcome.IsSuccess()) {
         LOG_WARNING("failed to put object")
                 .tag("endpoint", endpoint_)
@@ -168,7 +174,10 @@ ObjectStorageResponse 
S3ObjClient::put_object(ObjectStoragePathRef path, std::st
 ObjectStorageResponse S3ObjClient::head_object(ObjectStoragePathRef path, 
ObjectMeta* res) {
     Aws::S3::Model::HeadObjectRequest request;
     request.WithBucket(path.bucket).WithKey(path.key);
-    auto outcome = s3_get_rate_limit([&]() { return 
s3_client_->HeadObject(request); });
+    auto outcome = s3_get_rate_limit([&]() {
+        SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
+        return s3_client_->HeadObject(request);
+    });
     if (outcome.IsSuccess()) {
         res->key = path.key;
         res->size = outcome.GetResult().GetContentLength();
@@ -209,8 +218,10 @@ ObjectStorageResponse S3ObjClient::delete_objects(const 
std::string& bucket,
         Aws::S3::Model::Delete del;
         del.WithObjects(std::move(objects)).SetQuiet(true);
         delete_request.SetDelete(std::move(del));
-        auto delete_outcome =
-                s3_put_rate_limit([&]() { return 
s3_client_->DeleteObjects(delete_request); });
+        auto delete_outcome = s3_put_rate_limit([&]() {
+            SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
+            return s3_client_->DeleteObjects(delete_request);
+        });
         if (!delete_outcome.IsSuccess()) {
             LOG_WARNING("failed to delete objects")
                     .tag("endpoint", endpoint_)
@@ -266,7 +277,10 @@ ObjectStorageResponse S3ObjClient::delete_objects(const 
std::string& bucket,
 ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) {
     Aws::S3::Model::DeleteObjectRequest request;
     request.WithBucket(path.bucket).WithKey(path.key);
-    auto outcome = s3_put_rate_limit([&]() { return 
s3_client_->DeleteObject(request); });
+    auto outcome = s3_put_rate_limit([&]() {
+        SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
+        return s3_client_->DeleteObject(request);
+    });
     if (!outcome.IsSuccess()) {
         LOG_WARNING("failed to delete object")
                 .tag("endpoint", endpoint_)
diff --git a/common/cpp/s3_rate_limiter.cpp b/common/cpp/s3_rate_limiter.cpp
index 3bd311bc955..64ee4ce19d8 100644
--- a/common/cpp/s3_rate_limiter.cpp
+++ b/common/cpp/s3_rate_limiter.cpp
@@ -22,6 +22,7 @@
 #include <chrono>
 #include <mutex>
 #include <thread>
+
 #if defined(__APPLE__)
 #include <ctime>
 #define CURRENT_TIME std::chrono::system_clock::now()
@@ -31,7 +32,7 @@
 
 namespace doris {
 // Just 10^6.
-static constexpr auto MS = 1000000UL;
+static constexpr auto NS = 1000000000UL;
 
 class S3RateLimiter::SimpleSpinLock {
 public:
@@ -69,29 +70,32 @@ S3RateLimiter::~S3RateLimiter() = default;
 
 S3RateLimiterHolder::~S3RateLimiterHolder() = default;
 
-std::pair<size_t, double> S3RateLimiter::_update_remain_token(
-        std::chrono::system_clock::time_point now, size_t amount) {
+std::pair<size_t, double> S3RateLimiter::_update_remain_token(long now, size_t 
amount) {
     // Values obtained under lock to be checked after release
     size_t count_value;
     double tokens_value;
     {
         std::lock_guard<SimpleSpinLock> lock(*_mutex);
+        now = (now < _prev_ns_count) ? _prev_ns_count : now;
         if (_max_speed) {
-            double delta_seconds = static_cast<double>((now - 
_prev_ms).count()) / MS;
+            double delta_seconds =
+                    _prev_ns_count ? static_cast<double>(now - _prev_ns_count) 
/ NS : 0;
             _remain_tokens = std::min<double>(_remain_tokens + _max_speed * 
delta_seconds - amount,
                                               _max_burst);
         }
         _count += amount;
         count_value = _count;
         tokens_value = _remain_tokens;
-        _prev_ms = now;
+        _prev_ns_count = now;
     }
     return {count_value, tokens_value};
 }
 
 int64_t S3RateLimiter::add(size_t amount) {
     // Values obtained under lock to be checked after release
-    auto [count_value, tokens_value] = _update_remain_token(CURRENT_TIME, 
amount);
+    auto time = CURRENT_TIME;
+    auto time_nano_count = time.time_since_epoch().count();
+    auto [count_value, tokens_value] = _update_remain_token(time_nano_count, 
amount);
 
     if (_limit && count_value > _limit) {
         // CK would throw exception
@@ -99,13 +103,13 @@ int64_t S3RateLimiter::add(size_t amount) {
     }
 
     // Wait unless there is positive amount of remain_tokens - throttling
-    int64_t sleep_time_ms = 0;
+    int64_t sleep_time_ns = 0;
     if (_max_speed && tokens_value < 0) {
-        sleep_time_ms = static_cast<int64_t>(-tokens_value / _max_speed * MS);
-        std::this_thread::sleep_for(std::chrono::microseconds(sleep_time_ms));
+        sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS);
+        std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time_ns));
     }
 
-    return sleep_time_ms;
+    return sleep_time_ns;
 }
 
 S3RateLimiterHolder::S3RateLimiterHolder(S3RateLimitType type, size_t 
max_speed, size_t max_burst,
@@ -119,9 +123,7 @@ int64_t S3RateLimiterHolder::add(size_t amount) {
         std::shared_lock read {rate_limiter_rw_lock};
         sleep = rate_limiter->add(amount);
     }
-    if (sleep > 0) {
-        metric_func(sleep);
-    }
+    metric_func(sleep);
     return sleep;
 }
 
diff --git a/common/cpp/s3_rate_limiter.h b/common/cpp/s3_rate_limiter.h
index 6f01cf33226..ad2bf3db9ed 100644
--- a/common/cpp/s3_rate_limiter.h
+++ b/common/cpp/s3_rate_limiter.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include <chrono>
 #include <functional>
 #include <memory>
 #include <shared_mutex>
@@ -44,8 +43,7 @@ public:
     int64_t add(size_t amount);
 
 private:
-    std::pair<size_t, double> 
_update_remain_token(std::chrono::system_clock::time_point now,
-                                                   size_t amount);
+    std::pair<size_t, double> _update_remain_token(long now, size_t amount);
     size_t _count {0};
     const size_t _max_speed {0}; // in tokens per second. which indicates the 
QPS
     const size_t _max_burst {0}; // in tokens. which indicates the token 
bucket size
@@ -54,7 +52,7 @@ private:
     std::unique_ptr<SimpleSpinLock> _mutex;
     // Amount of remain_tokens available in token bucket. Updated in `add` 
method.
     double _remain_tokens {0};
-    std::chrono::system_clock::time_point _prev_ms; // Previous `add` call 
time (in nanoseconds).
+    long _prev_ns_count {0}; // Previous `add` call time (in nanoseconds).
 };
 
 class S3RateLimiterHolder {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to