This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 493dc26d743 [opt](s3) auto retry when meeting 429 error (#35396)
493dc26d743 is described below

commit 493dc26d7435d9e78fe87a873b5174df9e64a886
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Thu Jun 27 16:01:23 2024 +0800

    [opt](s3) auto retry when meeting 429 error (#35396)
    
    Sometime the s3 sdk will return error like:
    ```
    QpsLimitExceeded Unable to parse ExceptionName: QpsLimitExceeded Message: 
Please reduce your request rate. code=429 type=100, request_id=66516C288EC49
    ```
    We should slowdown the request rate by sleeping for a while.
    
    - Add 2 new BE config
    
            - `s3_read_base_wait_time_ms` and `s3_read_max_wait_time_ms`
    
                    When meet s3 429 error, the "get" request will
                    sleep `s3_read_base_wait_time_ms (*1, *2, *3, *4)` ms get 
try again.
                    The max sleep time is s3_read_max_wait_time_ms
                    and the max retry time is max_s3_client_retry
    
    - Add more metrics for s3 file reader
    
            - `s3_file_reader_too_many_request`: counter of 429 error.
            - `s3_file_reader_s3_get_request`: the QPS of s3 get request.
    
            - `TotalGetRequest`: Get request counter in profile
            - `TooManyRequestErr`: 429 error counter in profile
    - `TooManyRequestSleepTime`: Sum of sleep time after 429 error in
    profile
            - `TotalBytesRead`: Total bytes read from s3 in profile
    
    
    
![image](https://github.com/apache/doris/assets/2899462/2e8f5837-270b-48c7-9397-160aeac143eb)
---
 be/src/common/config.cpp        |   2 +
 be/src/common/config.h          |   7 +++
 be/src/io/file_factory.cpp      |   2 +-
 be/src/io/fs/s3_file_reader.cpp | 109 ++++++++++++++++++++++++++++++++--------
 be/src/io/fs/s3_file_reader.h   |  24 +++++++--
 be/src/io/fs/s3_file_system.cpp |   2 +-
 6 files changed, 120 insertions(+), 26 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 4460e477c8f..a08bb43db56 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1222,6 +1222,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
 DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
 
 DEFINE_mInt32(max_s3_client_retry, "10");
+DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
+DEFINE_mInt32(s3_read_max_wait_time_ms, "800");
 
 DEFINE_mBool(enable_s3_rate_limiter, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index dbf18002704..6942e316dcd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1304,6 +1304,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);
 DECLARE_mBool(enable_s3_rate_limiter);
 // max s3 client retry times
 DECLARE_mInt32(max_s3_client_retry);
+// When meet s3 429 error, the "get" request will
+// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms
+// get try again.
+// The max sleep time is s3_read_max_wait_time_ms
+// and the max retry time is max_s3_client_retry
+DECLARE_mInt32(s3_read_base_wait_time_ms);
+DECLARE_mInt32(s3_read_max_wait_time_ms);
 
 // write as inverted index tmp directory
 DECLARE_String(tmp_file_dir);
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index e61ed144486..0c84c2eb74c 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -156,7 +156,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
         auto client_holder = 
std::make_shared<io::ObjClientHolder>(s3_conf.client_conf);
         RETURN_IF_ERROR_RESULT(client_holder->init());
         return io::S3FileReader::create(std::move(client_holder), 
s3_conf.bucket, s3_uri.get_key(),
-                                        file_description.file_size)
+                                        file_description.file_size, profile)
                 .and_then([&](auto&& reader) {
                     return io::create_cached_file_reader(std::move(reader), 
reader_options);
                 });
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index e7775803198..a5c6ec09162 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -37,6 +37,7 @@
 #include "io/fs/s3_common.h"
 #include "util/bvar_helper.h"
 #include "util/doris_metrics.h"
+#include "util/runtime_profile.h"
 #include "util/s3_util.h"
 
 namespace doris::io {
@@ -45,13 +46,18 @@ bvar::Adder<uint64_t> 
s3_file_reader_read_counter("s3_file_reader", "read_at");
 bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
 bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
 bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read");
+bvar::Adder<uint64_t> 
s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request");
 bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); 
// also QPS
 bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", 
"s3_read_throughput",
                                                            
&s3_bytes_read_total);
+// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only
+// record successfull request, and s3_get_request_qps will record all request.
+bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", 
"s3_get_request",
+                                                          
&s3_file_reader_read_counter);
 
 Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const 
ObjClientHolder> client,
-                                            std::string bucket, std::string 
key,
-                                            int64_t file_size) {
+                                            std::string bucket, std::string 
key, int64_t file_size,
+                                            RuntimeProfile* profile) {
     if (file_size < 0) {
         auto res = client->object_file_size(bucket, key);
         if (!res.has_value()) {
@@ -62,16 +68,17 @@ Result<FileReaderSPtr> 
S3FileReader::create(std::shared_ptr<const ObjClientHolde
     }
 
     return std::make_shared<S3FileReader>(std::move(client), 
std::move(bucket), std::move(key),
-                                          file_size);
+                                          file_size, profile);
 }
 
 S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, 
std::string bucket,
-                           std::string key, size_t file_size)
+                           std::string key, size_t file_size, RuntimeProfile* 
profile)
         : _path(fmt::format("s3://{}/{}", bucket, key)),
           _file_size(file_size),
           _bucket(std::move(bucket)),
           _key(std::move(key)),
-          _client(std::move(client)) {
+          _client(std::move(client)),
+          _profile(profile) {
     DorisMetrics::instance()->s3_file_open_reading->increment(1);
     DorisMetrics::instance()->s3_file_reader_total->increment(1);
     s3_file_reader_total << 1;
@@ -113,23 +120,85 @@ Status S3FileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_rea
     if (!client) {
         return Status::InternalError("init s3 client error");
     }
-    // clang-format off
-    auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
-            to, offset, bytes_req, bytes_read);
-    // clang-format on
-    if (resp.status.code != ErrorCode::OK) {
-        return std::move(Status(resp.status.code, std::move(resp.status.msg))
-                                 .append(fmt::format("failed to read from {}", 
_path.native())));
+    // // clang-format off
+    // auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
+    //         to, offset, bytes_req, bytes_read);
+    // // clang-format on
+    // if (resp.status.code != ErrorCode::OK) {
+    //     return std::move(Status(resp.status.code, 
std::move(resp.status.msg))
+    //                              .append(fmt::format("failed to read from 
{}", _path.native())));
+    // }
+    // if (*bytes_read != bytes_req) {
+    //     return Status::InternalError("failed to read from {}(bytes read: 
{}, bytes req: {})",
+    //                                  _path.native(), *bytes_read, 
bytes_req);
+    SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
+
+    int retry_count = 0;
+    const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait 
time in milliseconds
+    const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum 
wait time in milliseconds
+    const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 
8s for each backoff
+
+    int total_sleep_time = 0;
+    while (retry_count <= max_retries) {
+        s3_file_reader_read_counter << 1;
+        // clang-format off
+        auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
+                to, offset, bytes_req, bytes_read);
+        // clang-format on
+        _s3_stats.total_get_request_counter++;
+        if (resp.status.code != ErrorCode::OK) {
+            if (resp.http_code ==
+                
static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) {
+                s3_file_reader_too_many_request_counter << 1;
+                retry_count++;
+                int wait_time = std::min(base_wait_time * (1 << retry_count),
+                                         max_wait_time); // Exponential backoff
+                
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
+                _s3_stats.too_many_request_err_counter++;
+                _s3_stats.too_many_request_sleep_time_ms += wait_time;
+                total_sleep_time += wait_time;
+                continue;
+            } else {
+                // Handle other errors
+                return std::move(Status(resp.status.code, 
std::move(resp.status.msg))
+                                         .append("failed to read"));
+            }
+        }
+        if (*bytes_read != bytes_req) {
+            return Status::InternalError("failed to read (bytes read: {}, 
bytes req: {})",
+                                         *bytes_read, bytes_req);
+        }
+        _s3_stats.total_bytes_read += bytes_req;
+        s3_bytes_read_total << bytes_req;
+        s3_bytes_per_read << bytes_req;
+        DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req);
+        if (retry_count > 0) {
+            LOG(INFO) << fmt::format("read s3 file {} succeed after {} times 
with {} ms sleeping",
+                                     _path.native(), retry_count, 
total_sleep_time);
+        }
+        return Status::OK();
     }
-    if (*bytes_read != bytes_req) {
-        return Status::InternalError("failed to read from {}(bytes read: {}, 
bytes req: {})",
-                                     _path.native(), *bytes_read, bytes_req);
+    return Status::InternalError("failed to read from s3, exceeded maximum 
retries");
+}
+
+void S3FileReader::_collect_profile_before_close() {
+    if (_profile != nullptr) {
+        const char* s3_profile_name = "S3Profile";
+        ADD_TIMER(_profile, s3_profile_name);
+        RuntimeProfile::Counter* total_get_request_counter =
+                ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, 
s3_profile_name);
+        RuntimeProfile::Counter* too_many_request_err_counter =
+                ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, 
s3_profile_name);
+        RuntimeProfile::Counter* too_many_request_sleep_time = 
ADD_CHILD_COUNTER(
+                _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, 
s3_profile_name);
+        RuntimeProfile::Counter* total_bytes_read =
+                ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, 
s3_profile_name);
+
+        COUNTER_UPDATE(total_get_request_counter, 
_s3_stats.total_get_request_counter);
+        COUNTER_UPDATE(too_many_request_err_counter, 
_s3_stats.too_many_request_err_counter);
+        COUNTER_UPDATE(too_many_request_sleep_time, 
_s3_stats.too_many_request_sleep_time_ms);
+        COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read);
     }
-    s3_bytes_read_total << *bytes_read;
-    s3_bytes_per_read << *bytes_read;
-    s3_file_reader_read_counter << 1;
-    DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
-    return Status::OK();
 }
 
 } // namespace doris::io
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index d681161ebed..36fe67b342c 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -28,16 +28,20 @@
 #include "io/fs/s3_file_system.h"
 #include "util/slice.h"
 
-namespace doris::io {
+namespace doris {
+class RuntimeProfile;
+
+namespace io {
 struct IOContext;
 
 class S3FileReader final : public FileReader {
 public:
     static Result<FileReaderSPtr> create(std::shared_ptr<const 
ObjClientHolder> client,
-                                         std::string bucket, std::string key, 
int64_t file_size);
+                                         std::string bucket, std::string key, 
int64_t file_size,
+                                         RuntimeProfile* profile);
 
     S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string 
bucket, std::string key,
-                 size_t file_size);
+                 size_t file_size, RuntimeProfile* profile);
 
     ~S3FileReader() override;
 
@@ -53,7 +57,15 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
+    struct S3Statistics {
+        int64_t total_get_request_counter = 0;
+        int64_t too_many_request_err_counter = 0;
+        int64_t too_many_request_sleep_time_ms = 0;
+        int64_t total_bytes_read = 0;
+    };
     Path _path;
     size_t _file_size;
 
@@ -62,6 +74,10 @@ private:
     std::shared_ptr<const ObjClientHolder> _client;
 
     std::atomic<bool> _closed = false;
+
+    RuntimeProfile* _profile = nullptr;
+    S3Statistics _s3_stats;
 };
 
-} // namespace doris::io
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 2cc82d2da0f..93f36429485 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -196,7 +196,7 @@ Status S3FileSystem::create_file_impl(const Path& file, 
FileWriterPtr* writer,
 Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* 
reader,
                                         const FileReaderOptions& opts) {
     auto key = DORIS_TRY(get_key(file));
-    *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, 
opts.file_size));
+    *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, 
opts.file_size, nullptr));
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to