This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 83040c8f25 [feature](S3FileWriter) Reduce network RTT for files that multipart are not applicable (#19135) 83040c8f25 is described below commit 83040c8f250478e0986fb4fc569d53aee1d6874f Author: AlexYue <yj976240...@gmail.com> AuthorDate: Sat May 6 14:46:18 2023 +0800 [feature](S3FileWriter) Reduce network RTT for files that multipart are not applicable (#19135) For files less than 5MB, we don't need to use multi part upload which would at least takes 3 network IO. Instead we can just call PutObject which only takes one shot. --- be/src/io/fs/s3_file_writer.cpp | 94 +++++++++++++++++++++++++++++------------ be/src/io/fs/s3_file_writer.h | 50 +++------------------- 2 files changed, 75 insertions(+), 69 deletions(-) diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 30ee8cee58..5e1313b9c6 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -32,8 +32,10 @@ #include <aws/s3/model/CompletedPart.h> #include <aws/s3/model/CreateMultipartUploadRequest.h> #include <aws/s3/model/CreateMultipartUploadResult.h> +#include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/model/UploadPartRequest.h> #include <aws/s3/model/UploadPartResult.h> +#include <fmt/core.h> #include <glog/logging.h> #include <sstream> @@ -73,7 +75,7 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3 : FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, std::move(fs)), _bucket(s3_conf.bucket), _key(std::move(path)), - _upload_cost_ms(std::make_shared<int64_t>()), + _upload_cost_ms(std::make_unique<int64_t>()), _client(std::move(client)) {} S3FileWriter::~S3FileWriter() { @@ -83,36 +85,47 @@ S3FileWriter::~S3FileWriter() { CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << _closed; } -Status S3FileWriter::open() { - VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native(); +Status S3FileWriter::_create_multi_upload_request() { CreateMultipartUploadRequest create_request; create_request.WithBucket(_bucket).WithKey(_key); - create_request.SetContentType("text/plain"); + create_request.SetContentType("application/octet-stream"); auto outcome = _client->CreateMultipartUpload(create_request); if (outcome.IsSuccess()) { _upload_id = outcome.GetResult().GetUploadId(); - _closed = false; - _opened = true; return Status::OK(); } return Status::IOError("failed to create multipart upload(bucket={}, key={}, upload_id={}): {}", _bucket, _path.native(), _upload_id, outcome.GetError().GetMessage()); } +void S3FileWriter::_wait_until_finish(std::string task_name) { + auto msg = + fmt::format("{} multipart upload already takes 5 min, bucket={}, key={}, upload_id={}", + std::move(task_name), _bucket, _path.native(), _upload_id); + while (_count.timed_wait({5 * 60, 0}) < 0) { + LOG(WARNING) << msg; + } +} + Status S3FileWriter::abort() { _failed = true; if (_closed || !_opened) { return Status::OK(); } + // we need to reclaim the memory + if (_pending_buf) { + _pending_buf->on_finished(); + _pending_buf = nullptr; + } + // upload id is empty means there was no create multi upload + if (_upload_id.empty()) { + return Status::OK(); + } VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native(); _closed = true; - while (!_wait.wait()) { - LOG(WARNING) << "Abort multipart upload already takes 5 min" - << "bucket=" << _bucket << ", key=" << _path.native() - << ", upload_id=" << _upload_id; - } + _wait_until_finish("Abort"); AbortMultipartUploadRequest request; request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); auto outcome = _client->AbortMultipartUpload(request); @@ -135,7 +148,7 @@ Status S3FileWriter::close() { VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); _closed = true; if (_pending_buf != nullptr) { - _wait.add(); + _count.add_count(); _pending_buf->submit(); _pending_buf = nullptr; } @@ -147,7 +160,9 @@ Status S3FileWriter::close() { Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { // lazy open if (!_opened) { - RETURN_IF_ERROR(open()); + VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native(); + _closed = false; + _opened = true; } DCHECK(!_closed); size_t buffer_size = config::s3_write_buffer_size; @@ -164,7 +179,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { }); _pending_buf->set_file_offset(_bytes_appended); // later we might need to wait all prior tasks to be finished - _pending_buf->set_finish_upload([this]() { _wait.done(); }); + _pending_buf->set_finish_upload([this]() { _count.signal(); }); _pending_buf->set_is_cancel([this]() { return _failed.load(); }); _pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) { VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num @@ -187,8 +202,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { // satisfy that the size is larger than or euqal to 5MB // _complete() would handle the first situation if (_pending_buf->get_size() == buffer_size) { + // only create multiple upload request when the data is more + // than one memory buffer + if (_cur_part_num == 1) { + RETURN_IF_ERROR(_create_multi_upload_request()); + } _cur_part_num++; - _wait.add(); + _count.add_count(); _pending_buf->submit(); _pending_buf = nullptr; } @@ -214,6 +234,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) { upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5)); upload_request.SetContentLength(buf.get_size()); + upload_request.SetContentType("application/octet-stream"); auto upload_part_callable = _client->UploadPartCallable(upload_request); @@ -228,37 +249,36 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) { return; } - std::shared_ptr<CompletedPart> completed_part = std::make_shared<CompletedPart>(); + std::unique_ptr<CompletedPart> completed_part = std::make_unique<CompletedPart>(); completed_part->SetPartNumber(part_num); auto etag = upload_part_outcome.GetResult().GetETag(); // DCHECK(etag.empty()); completed_part->SetETag(etag); std::unique_lock<std::mutex> lck {_completed_lock}; - _completed_parts.emplace_back(completed_part); + _completed_parts.emplace_back(std::move(completed_part)); _bytes_written += buf.get_size(); } -// TODO(AlexYue): if the whole size is less than 5MB, we can use just call put object method -// to reduce the network IO num to just one time Status S3FileWriter::_complete() { SCOPED_RAW_TIMER(_upload_cost_ms.get()); if (_failed) { return _st; } + // upload id is empty means there was no multipart upload + if (_upload_id.empty()) { + _wait_until_finish("PutObject"); + return _st; + } CompleteMultipartUploadRequest complete_request; complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); - while (!_wait.wait()) { - LOG(WARNING) << "Complete multipart upload already takes 5 min" - << "bucket=" << _bucket << ", key=" << _path.native() - << ", upload_id=" << _upload_id; - } + _wait_until_finish("Complete"); // make sure _completed_parts are ascending order std::sort(_completed_parts.begin(), _completed_parts.end(), [](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); }); CompletedMultipartUpload completed_upload; - for (std::shared_ptr<CompletedPart> part : _completed_parts) { + for (auto& part : _completed_parts) { completed_upload.AddParts(*part); } @@ -281,12 +301,34 @@ Status S3FileWriter::finalize() { // submit pending buf if it's not nullptr // it's the last buf, we can submit it right now if (_pending_buf != nullptr) { - _wait.add(); + // if we only need to upload one file less than 5MB, we can just + // call PutObject to reduce the network IO + if (_upload_id.empty()) { + _pending_buf->set_upload_remote_callback( + [this, buf = _pending_buf]() { _put_object(*buf); }); + } + _count.add_count(); _pending_buf->submit(); _pending_buf = nullptr; } return Status::OK(); } +void S3FileWriter::_put_object(S3FileBuffer& buf) { + DCHECK(!_closed && _opened); + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(_bucket).WithKey(_key); + request.SetBody(buf.get_stream()); + request.SetContentLength(buf.get_size()); + request.SetContentType("application/octet-stream"); + auto response = _client->PutObject(request); + if (!response.IsSuccess()) { + _st = Status::InternalError("Error: [{}:{}, responseCode:{}]", + response.GetError().GetExceptionName(), + response.GetError().GetMessage(), + static_cast<int>(response.GetError().GetResponseCode())); + } +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 59942e6ac5..cb34477f54 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -18,6 +18,7 @@ #pragma once #include <aws/core/utils/memory/stl/AWSStringStream.h> +#include <bthread/countdown_event.h> #include <cstddef> #include <list> @@ -48,8 +49,6 @@ public: FileSystemSPtr fs); ~S3FileWriter() override; - Status open(); - Status close() override; Status abort() override; @@ -64,45 +63,10 @@ public: int64_t upload_cost_ms() const { return *_upload_cost_ms; } private: - class WaitGroup { - public: - WaitGroup() = default; - - ~WaitGroup() = default; - - WaitGroup(const WaitGroup&) = delete; - WaitGroup(WaitGroup&&) = delete; - void operator=(const WaitGroup&) = delete; - void operator=(WaitGroup&&) = delete; - // add one counter indicating one more concurrent worker - void add(int count = 1) { _count += count; } - - // decrease count if one concurrent worker finished it's work - void done() { - _count--; - if (_count.load() <= 0) { - _cv.notify_all(); - } - } - - // wait for all concurrent workers finish their work and return true - // would return false if timeout, default timeout would be 5min - bool wait(int64_t timeout_seconds = 300) { - if (_count.load() <= 0) { - return true; - } - std::unique_lock<std::mutex> lck {_lock}; - _cv.wait_for(lck, std::chrono::seconds(timeout_seconds), - [this]() { return _count.load() <= 0; }); - return _count.load() <= 0; - } - - private: - std::mutex _lock; - std::condition_variable _cv; - std::atomic_int64_t _count {0}; - }; + void _wait_until_finish(std::string task_name); Status _complete(); + Status _create_multi_upload_request(); + void _put_object(S3FileBuffer& buf); void _upload_one_part(int64_t part_num, S3FileBuffer& buf); std::string _bucket; @@ -110,7 +74,7 @@ private: bool _closed = true; bool _opened = false; - std::shared_ptr<int64_t> _upload_cost_ms; + std::unique_ptr<int64_t> _upload_cost_ms; std::shared_ptr<Aws::S3::S3Client> _client; std::string _upload_id; @@ -119,9 +83,9 @@ private: // Current Part Num for CompletedPart int _cur_part_num = 1; std::mutex _completed_lock; - std::vector<std::shared_ptr<Aws::S3::Model::CompletedPart>> _completed_parts; + std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts; - WaitGroup _wait; + bthread::CountdownEvent _count; std::atomic_bool _failed = false; Status _st = Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org