This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 83040c8f25 [feature](S3FileWriter) Reduce network RTT for files that
multipart are not applicable (#19135)
83040c8f25 is described below
commit 83040c8f250478e0986fb4fc569d53aee1d6874f
Author: AlexYue <[email protected]>
AuthorDate: Sat May 6 14:46:18 2023 +0800
[feature](S3FileWriter) Reduce network RTT for files that multipart are not
applicable (#19135)
For files less than 5MB, we don't need to use multi part upload which would
at least takes 3 network IO.
Instead we can just call PutObject which only takes one shot.
---
be/src/io/fs/s3_file_writer.cpp | 94 +++++++++++++++++++++++++++++------------
be/src/io/fs/s3_file_writer.h | 50 +++-------------------
2 files changed, 75 insertions(+), 69 deletions(-)
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 30ee8cee58..5e1313b9c6 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -32,8 +32,10 @@
#include <aws/s3/model/CompletedPart.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CreateMultipartUploadResult.h>
+#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/UploadPartResult.h>
+#include <fmt/core.h>
#include <glog/logging.h>
#include <sstream>
@@ -73,7 +75,7 @@ S3FileWriter::S3FileWriter(Path path,
std::shared_ptr<S3Client> client, const S3
: FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path,
std::move(fs)),
_bucket(s3_conf.bucket),
_key(std::move(path)),
- _upload_cost_ms(std::make_shared<int64_t>()),
+ _upload_cost_ms(std::make_unique<int64_t>()),
_client(std::move(client)) {}
S3FileWriter::~S3FileWriter() {
@@ -83,36 +85,47 @@ S3FileWriter::~S3FileWriter() {
CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " <<
_closed;
}
-Status S3FileWriter::open() {
- VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
+Status S3FileWriter::_create_multi_upload_request() {
CreateMultipartUploadRequest create_request;
create_request.WithBucket(_bucket).WithKey(_key);
- create_request.SetContentType("text/plain");
+ create_request.SetContentType("application/octet-stream");
auto outcome = _client->CreateMultipartUpload(create_request);
if (outcome.IsSuccess()) {
_upload_id = outcome.GetResult().GetUploadId();
- _closed = false;
- _opened = true;
return Status::OK();
}
return Status::IOError("failed to create multipart upload(bucket={},
key={}, upload_id={}): {}",
_bucket, _path.native(), _upload_id,
outcome.GetError().GetMessage());
}
+void S3FileWriter::_wait_until_finish(std::string task_name) {
+ auto msg =
+ fmt::format("{} multipart upload already takes 5 min, bucket={},
key={}, upload_id={}",
+ std::move(task_name), _bucket, _path.native(),
_upload_id);
+ while (_count.timed_wait({5 * 60, 0}) < 0) {
+ LOG(WARNING) << msg;
+ }
+}
+
Status S3FileWriter::abort() {
_failed = true;
if (_closed || !_opened) {
return Status::OK();
}
+ // we need to reclaim the memory
+ if (_pending_buf) {
+ _pending_buf->on_finished();
+ _pending_buf = nullptr;
+ }
+ // upload id is empty means there was no create multi upload
+ if (_upload_id.empty()) {
+ return Status::OK();
+ }
VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
_closed = true;
- while (!_wait.wait()) {
- LOG(WARNING) << "Abort multipart upload already takes 5 min"
- << "bucket=" << _bucket << ", key=" << _path.native()
- << ", upload_id=" << _upload_id;
- }
+ _wait_until_finish("Abort");
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
auto outcome = _client->AbortMultipartUpload(request);
@@ -135,7 +148,7 @@ Status S3FileWriter::close() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
_closed = true;
if (_pending_buf != nullptr) {
- _wait.add();
+ _count.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@@ -147,7 +160,9 @@ Status S3FileWriter::close() {
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
// lazy open
if (!_opened) {
- RETURN_IF_ERROR(open());
+ VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
+ _closed = false;
+ _opened = true;
}
DCHECK(!_closed);
size_t buffer_size = config::s3_write_buffer_size;
@@ -164,7 +179,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
});
_pending_buf->set_file_offset(_bytes_appended);
// later we might need to wait all prior tasks to be finished
- _pending_buf->set_finish_upload([this]() { _wait.done(); });
+ _pending_buf->set_finish_upload([this]() { _count.signal(); });
_pending_buf->set_is_cancel([this]() { return _failed.load();
});
_pending_buf->set_on_failed([this, part_num =
_cur_part_num](Status st) {
VLOG_NOTICE << "failed at key: " << _key << ", load part "
<< part_num
@@ -187,8 +202,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
// satisfy that the size is larger than or euqal to 5MB
// _complete() would handle the first situation
if (_pending_buf->get_size() == buffer_size) {
+ // only create multiple upload request when the data is more
+ // than one memory buffer
+ if (_cur_part_num == 1) {
+ RETURN_IF_ERROR(_create_multi_upload_request());
+ }
_cur_part_num++;
- _wait.add();
+ _count.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@@ -214,6 +234,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
S3FileBuffer& buf) {
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
upload_request.SetContentLength(buf.get_size());
+ upload_request.SetContentType("application/octet-stream");
auto upload_part_callable = _client->UploadPartCallable(upload_request);
@@ -228,37 +249,36 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
S3FileBuffer& buf) {
return;
}
- std::shared_ptr<CompletedPart> completed_part =
std::make_shared<CompletedPart>();
+ std::unique_ptr<CompletedPart> completed_part =
std::make_unique<CompletedPart>();
completed_part->SetPartNumber(part_num);
auto etag = upload_part_outcome.GetResult().GetETag();
// DCHECK(etag.empty());
completed_part->SetETag(etag);
std::unique_lock<std::mutex> lck {_completed_lock};
- _completed_parts.emplace_back(completed_part);
+ _completed_parts.emplace_back(std::move(completed_part));
_bytes_written += buf.get_size();
}
-// TODO(AlexYue): if the whole size is less than 5MB, we can use just call put
object method
-// to reduce the network IO num to just one time
Status S3FileWriter::_complete() {
SCOPED_RAW_TIMER(_upload_cost_ms.get());
if (_failed) {
return _st;
}
+ // upload id is empty means there was no multipart upload
+ if (_upload_id.empty()) {
+ _wait_until_finish("PutObject");
+ return _st;
+ }
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
- while (!_wait.wait()) {
- LOG(WARNING) << "Complete multipart upload already takes 5 min"
- << "bucket=" << _bucket << ", key=" << _path.native()
- << ", upload_id=" << _upload_id;
- }
+ _wait_until_finish("Complete");
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() <
p2->GetPartNumber(); });
CompletedMultipartUpload completed_upload;
- for (std::shared_ptr<CompletedPart> part : _completed_parts) {
+ for (auto& part : _completed_parts) {
completed_upload.AddParts(*part);
}
@@ -281,12 +301,34 @@ Status S3FileWriter::finalize() {
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
- _wait.add();
+ // if we only need to upload one file less than 5MB, we can just
+ // call PutObject to reduce the network IO
+ if (_upload_id.empty()) {
+ _pending_buf->set_upload_remote_callback(
+ [this, buf = _pending_buf]() { _put_object(*buf); });
+ }
+ _count.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
return Status::OK();
}
+void S3FileWriter::_put_object(S3FileBuffer& buf) {
+ DCHECK(!_closed && _opened);
+ Aws::S3::Model::PutObjectRequest request;
+ request.WithBucket(_bucket).WithKey(_key);
+ request.SetBody(buf.get_stream());
+ request.SetContentLength(buf.get_size());
+ request.SetContentType("application/octet-stream");
+ auto response = _client->PutObject(request);
+ if (!response.IsSuccess()) {
+ _st = Status::InternalError("Error: [{}:{}, responseCode:{}]",
+ response.GetError().GetExceptionName(),
+ response.GetError().GetMessage(),
+
static_cast<int>(response.GetError().GetResponseCode()));
+ }
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 59942e6ac5..cb34477f54 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -18,6 +18,7 @@
#pragma once
#include <aws/core/utils/memory/stl/AWSStringStream.h>
+#include <bthread/countdown_event.h>
#include <cstddef>
#include <list>
@@ -48,8 +49,6 @@ public:
FileSystemSPtr fs);
~S3FileWriter() override;
- Status open();
-
Status close() override;
Status abort() override;
@@ -64,45 +63,10 @@ public:
int64_t upload_cost_ms() const { return *_upload_cost_ms; }
private:
- class WaitGroup {
- public:
- WaitGroup() = default;
-
- ~WaitGroup() = default;
-
- WaitGroup(const WaitGroup&) = delete;
- WaitGroup(WaitGroup&&) = delete;
- void operator=(const WaitGroup&) = delete;
- void operator=(WaitGroup&&) = delete;
- // add one counter indicating one more concurrent worker
- void add(int count = 1) { _count += count; }
-
- // decrease count if one concurrent worker finished it's work
- void done() {
- _count--;
- if (_count.load() <= 0) {
- _cv.notify_all();
- }
- }
-
- // wait for all concurrent workers finish their work and return true
- // would return false if timeout, default timeout would be 5min
- bool wait(int64_t timeout_seconds = 300) {
- if (_count.load() <= 0) {
- return true;
- }
- std::unique_lock<std::mutex> lck {_lock};
- _cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
- [this]() { return _count.load() <= 0; });
- return _count.load() <= 0;
- }
-
- private:
- std::mutex _lock;
- std::condition_variable _cv;
- std::atomic_int64_t _count {0};
- };
+ void _wait_until_finish(std::string task_name);
Status _complete();
+ Status _create_multi_upload_request();
+ void _put_object(S3FileBuffer& buf);
void _upload_one_part(int64_t part_num, S3FileBuffer& buf);
std::string _bucket;
@@ -110,7 +74,7 @@ private:
bool _closed = true;
bool _opened = false;
- std::shared_ptr<int64_t> _upload_cost_ms;
+ std::unique_ptr<int64_t> _upload_cost_ms;
std::shared_ptr<Aws::S3::S3Client> _client;
std::string _upload_id;
@@ -119,9 +83,9 @@ private:
// Current Part Num for CompletedPart
int _cur_part_num = 1;
std::mutex _completed_lock;
- std::vector<std::shared_ptr<Aws::S3::Model::CompletedPart>>
_completed_parts;
+ std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>
_completed_parts;
- WaitGroup _wait;
+ bthread::CountdownEvent _count;
std::atomic_bool _failed = false;
Status _st = Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]