This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 493dc26d743 [opt](s3) auto retry when meeting 429 error (#35396) 493dc26d743 is described below commit 493dc26d7435d9e78fe87a873b5174df9e64a886 Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu Jun 27 16:01:23 2024 +0800 [opt](s3) auto retry when meeting 429 error (#35396) Sometime the s3 sdk will return error like: ``` QpsLimitExceeded Unable to parse ExceptionName: QpsLimitExceeded Message: Please reduce your request rate. code=429 type=100, request_id=66516C288EC49 ``` We should slowdown the request rate by sleeping for a while. - Add 2 new BE config - `s3_read_base_wait_time_ms` and `s3_read_max_wait_time_ms` When meet s3 429 error, the "get" request will sleep `s3_read_base_wait_time_ms (*1, *2, *3, *4)` ms get try again. The max sleep time is s3_read_max_wait_time_ms and the max retry time is max_s3_client_retry - Add more metrics for s3 file reader - `s3_file_reader_too_many_request`: counter of 429 error. - `s3_file_reader_s3_get_request`: the QPS of s3 get request. - `TotalGetRequest`: Get request counter in profile - `TooManyRequestErr`: 429 error counter in profile - `TooManyRequestSleepTime`: Sum of sleep time after 429 error in profile - `TotalBytesRead`: Total bytes read from s3 in profile  --- be/src/common/config.cpp | 2 + be/src/common/config.h | 7 +++ be/src/io/file_factory.cpp | 2 +- be/src/io/fs/s3_file_reader.cpp | 109 ++++++++++++++++++++++++++++++++-------- be/src/io/fs/s3_file_reader.h | 24 +++++++-- be/src/io/fs/s3_file_system.cpp | 2 +- 6 files changed, 120 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4460e477c8f..a08bb43db56 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1222,6 +1222,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400"); DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); DEFINE_mInt32(max_s3_client_retry, "10"); +DEFINE_mInt32(s3_read_base_wait_time_ms, "100"); +DEFINE_mInt32(s3_read_max_wait_time_ms, "800"); DEFINE_mBool(enable_s3_rate_limiter, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index dbf18002704..6942e316dcd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1304,6 +1304,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta); DECLARE_mBool(enable_s3_rate_limiter); // max s3 client retry times DECLARE_mInt32(max_s3_client_retry); +// When meet s3 429 error, the "get" request will +// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms +// get try again. +// The max sleep time is s3_read_max_wait_time_ms +// and the max retry time is max_s3_client_retry +DECLARE_mInt32(s3_read_base_wait_time_ms); +DECLARE_mInt32(s3_read_max_wait_time_ms); // write as inverted index tmp directory DECLARE_String(tmp_file_dir); diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index e61ed144486..0c84c2eb74c 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -156,7 +156,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader( auto client_holder = std::make_shared<io::ObjClientHolder>(s3_conf.client_conf); RETURN_IF_ERROR_RESULT(client_holder->init()); return io::S3FileReader::create(std::move(client_holder), s3_conf.bucket, s3_uri.get_key(), - file_description.file_size) + file_description.file_size, profile) .and_then([&](auto&& reader) { return io::create_cached_file_reader(std::move(reader), reader_options); }); diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index e7775803198..a5c6ec09162 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -37,6 +37,7 @@ #include "io/fs/s3_common.h" #include "util/bvar_helper.h" #include "util/doris_metrics.h" +#include "util/runtime_profile.h" #include "util/s3_util.h" namespace doris::io { @@ -45,13 +46,18 @@ bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at"); bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num"); bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read"); bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read"); +bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request"); bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput", &s3_bytes_read_total); +// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only +// record successfull request, and s3_get_request_qps will record all request. +bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request", + &s3_file_reader_read_counter); Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolder> client, - std::string bucket, std::string key, - int64_t file_size) { + std::string bucket, std::string key, int64_t file_size, + RuntimeProfile* profile) { if (file_size < 0) { auto res = client->object_file_size(bucket, key); if (!res.has_value()) { @@ -62,16 +68,17 @@ Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolde } return std::make_shared<S3FileReader>(std::move(client), std::move(bucket), std::move(key), - file_size); + file_size, profile); } S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket, - std::string key, size_t file_size) + std::string key, size_t file_size, RuntimeProfile* profile) : _path(fmt::format("s3://{}/{}", bucket, key)), _file_size(file_size), _bucket(std::move(bucket)), _key(std::move(key)), - _client(std::move(client)) { + _client(std::move(client)), + _profile(profile) { DorisMetrics::instance()->s3_file_open_reading->increment(1); DorisMetrics::instance()->s3_file_reader_total->increment(1); s3_file_reader_total << 1; @@ -113,23 +120,85 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea if (!client) { return Status::InternalError("init s3 client error"); } - // clang-format off - auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, - to, offset, bytes_req, bytes_read); - // clang-format on - if (resp.status.code != ErrorCode::OK) { - return std::move(Status(resp.status.code, std::move(resp.status.msg)) - .append(fmt::format("failed to read from {}", _path.native()))); + // // clang-format off + // auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, + // to, offset, bytes_req, bytes_read); + // // clang-format on + // if (resp.status.code != ErrorCode::OK) { + // return std::move(Status(resp.status.code, std::move(resp.status.msg)) + // .append(fmt::format("failed to read from {}", _path.native()))); + // } + // if (*bytes_read != bytes_req) { + // return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", + // _path.native(), *bytes_read, bytes_req); + SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); + + int retry_count = 0; + const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds + const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds + const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff + + int total_sleep_time = 0; + while (retry_count <= max_retries) { + s3_file_reader_read_counter << 1; + // clang-format off + auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, + to, offset, bytes_req, bytes_read); + // clang-format on + _s3_stats.total_get_request_counter++; + if (resp.status.code != ErrorCode::OK) { + if (resp.http_code == + static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) { + s3_file_reader_too_many_request_counter << 1; + retry_count++; + int wait_time = std::min(base_wait_time * (1 << retry_count), + max_wait_time); // Exponential backoff + std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); + _s3_stats.too_many_request_err_counter++; + _s3_stats.too_many_request_sleep_time_ms += wait_time; + total_sleep_time += wait_time; + continue; + } else { + // Handle other errors + return std::move(Status(resp.status.code, std::move(resp.status.msg)) + .append("failed to read")); + } + } + if (*bytes_read != bytes_req) { + return Status::InternalError("failed to read (bytes read: {}, bytes req: {})", + *bytes_read, bytes_req); + } + _s3_stats.total_bytes_read += bytes_req; + s3_bytes_read_total << bytes_req; + s3_bytes_per_read << bytes_req; + DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req); + if (retry_count > 0) { + LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", + _path.native(), retry_count, total_sleep_time); + } + return Status::OK(); } - if (*bytes_read != bytes_req) { - return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", - _path.native(), *bytes_read, bytes_req); + return Status::InternalError("failed to read from s3, exceeded maximum retries"); +} + +void S3FileReader::_collect_profile_before_close() { + if (_profile != nullptr) { + const char* s3_profile_name = "S3Profile"; + ADD_TIMER(_profile, s3_profile_name); + RuntimeProfile::Counter* total_get_request_counter = + ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name); + RuntimeProfile::Counter* too_many_request_err_counter = + ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name); + RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER( + _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name); + RuntimeProfile::Counter* total_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name); + + COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter); + COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter); + COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms); + COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read); } - s3_bytes_read_total << *bytes_read; - s3_bytes_per_read << *bytes_read; - s3_file_reader_read_counter << 1; - DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read); - return Status::OK(); } } // namespace doris::io diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index d681161ebed..36fe67b342c 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -28,16 +28,20 @@ #include "io/fs/s3_file_system.h" #include "util/slice.h" -namespace doris::io { +namespace doris { +class RuntimeProfile; + +namespace io { struct IOContext; class S3FileReader final : public FileReader { public: static Result<FileReaderSPtr> create(std::shared_ptr<const ObjClientHolder> client, - std::string bucket, std::string key, int64_t file_size); + std::string bucket, std::string key, int64_t file_size, + RuntimeProfile* profile); S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket, std::string key, - size_t file_size); + size_t file_size, RuntimeProfile* profile); ~S3FileReader() override; @@ -53,7 +57,15 @@ protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override; + private: + struct S3Statistics { + int64_t total_get_request_counter = 0; + int64_t too_many_request_err_counter = 0; + int64_t too_many_request_sleep_time_ms = 0; + int64_t total_bytes_read = 0; + }; Path _path; size_t _file_size; @@ -62,6 +74,10 @@ private: std::shared_ptr<const ObjClientHolder> _client; std::atomic<bool> _closed = false; + + RuntimeProfile* _profile = nullptr; + S3Statistics _s3_stats; }; -} // namespace doris::io +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 2cc82d2da0f..93f36429485 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -196,7 +196,7 @@ Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader, const FileReaderOptions& opts) { auto key = DORIS_TRY(get_key(file)); - *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size)); + *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size, nullptr)); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org