This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 83040c8f25 [feature](S3FileWriter) Reduce network RTT for files that 
multipart are not applicable (#19135)
83040c8f25 is described below

commit 83040c8f250478e0986fb4fc569d53aee1d6874f
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Sat May 6 14:46:18 2023 +0800

    [feature](S3FileWriter) Reduce network RTT for files that multipart are not 
applicable (#19135)
    
    For files less than 5MB, we don't need to use multi part upload which would 
at least takes 3 network IO.
    Instead we can just call PutObject which only takes one shot.
---
 be/src/io/fs/s3_file_writer.cpp | 94 +++++++++++++++++++++++++++++------------
 be/src/io/fs/s3_file_writer.h   | 50 +++-------------------
 2 files changed, 75 insertions(+), 69 deletions(-)

diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 30ee8cee58..5e1313b9c6 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -32,8 +32,10 @@
 #include <aws/s3/model/CompletedPart.h>
 #include <aws/s3/model/CreateMultipartUploadRequest.h>
 #include <aws/s3/model/CreateMultipartUploadResult.h>
+#include <aws/s3/model/PutObjectRequest.h>
 #include <aws/s3/model/UploadPartRequest.h>
 #include <aws/s3/model/UploadPartResult.h>
+#include <fmt/core.h>
 #include <glog/logging.h>
 
 #include <sstream>
@@ -73,7 +75,7 @@ S3FileWriter::S3FileWriter(Path path, 
std::shared_ptr<S3Client> client, const S3
         : FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, 
std::move(fs)),
           _bucket(s3_conf.bucket),
           _key(std::move(path)),
-          _upload_cost_ms(std::make_shared<int64_t>()),
+          _upload_cost_ms(std::make_unique<int64_t>()),
           _client(std::move(client)) {}
 
 S3FileWriter::~S3FileWriter() {
@@ -83,36 +85,47 @@ S3FileWriter::~S3FileWriter() {
     CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << 
_closed;
 }
 
-Status S3FileWriter::open() {
-    VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
+Status S3FileWriter::_create_multi_upload_request() {
     CreateMultipartUploadRequest create_request;
     create_request.WithBucket(_bucket).WithKey(_key);
-    create_request.SetContentType("text/plain");
+    create_request.SetContentType("application/octet-stream");
 
     auto outcome = _client->CreateMultipartUpload(create_request);
 
     if (outcome.IsSuccess()) {
         _upload_id = outcome.GetResult().GetUploadId();
-        _closed = false;
-        _opened = true;
         return Status::OK();
     }
     return Status::IOError("failed to create multipart upload(bucket={}, 
key={}, upload_id={}): {}",
                            _bucket, _path.native(), _upload_id, 
outcome.GetError().GetMessage());
 }
 
+void S3FileWriter::_wait_until_finish(std::string task_name) {
+    auto msg =
+            fmt::format("{} multipart upload already takes 5 min, bucket={}, 
key={}, upload_id={}",
+                        std::move(task_name), _bucket, _path.native(), 
_upload_id);
+    while (_count.timed_wait({5 * 60, 0}) < 0) {
+        LOG(WARNING) << msg;
+    }
+}
+
 Status S3FileWriter::abort() {
     _failed = true;
     if (_closed || !_opened) {
         return Status::OK();
     }
+    // we need to reclaim the memory
+    if (_pending_buf) {
+        _pending_buf->on_finished();
+        _pending_buf = nullptr;
+    }
+    // upload id is empty means there was no create multi upload
+    if (_upload_id.empty()) {
+        return Status::OK();
+    }
     VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
     _closed = true;
-    while (!_wait.wait()) {
-        LOG(WARNING) << "Abort multipart upload already takes 5 min"
-                     << "bucket=" << _bucket << ", key=" << _path.native()
-                     << ", upload_id=" << _upload_id;
-    }
+    _wait_until_finish("Abort");
     AbortMultipartUploadRequest request;
     request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
     auto outcome = _client->AbortMultipartUpload(request);
@@ -135,7 +148,7 @@ Status S3FileWriter::close() {
     VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
     _closed = true;
     if (_pending_buf != nullptr) {
-        _wait.add();
+        _count.add_count();
         _pending_buf->submit();
         _pending_buf = nullptr;
     }
@@ -147,7 +160,9 @@ Status S3FileWriter::close() {
 Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
     // lazy open
     if (!_opened) {
-        RETURN_IF_ERROR(open());
+        VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
+        _closed = false;
+        _opened = true;
     }
     DCHECK(!_closed);
     size_t buffer_size = config::s3_write_buffer_size;
@@ -164,7 +179,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                         });
                 _pending_buf->set_file_offset(_bytes_appended);
                 // later we might need to wait all prior tasks to be finished
-                _pending_buf->set_finish_upload([this]() { _wait.done(); });
+                _pending_buf->set_finish_upload([this]() { _count.signal(); });
                 _pending_buf->set_is_cancel([this]() { return _failed.load(); 
});
                 _pending_buf->set_on_failed([this, part_num = 
_cur_part_num](Status st) {
                     VLOG_NOTICE << "failed at key: " << _key << ", load part " 
<< part_num
@@ -187,8 +202,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
             // satisfy that the size is larger than or euqal to 5MB
             // _complete() would handle the first situation
             if (_pending_buf->get_size() == buffer_size) {
+                // only create multiple upload request when the data is more
+                // than one memory buffer
+                if (_cur_part_num == 1) {
+                    RETURN_IF_ERROR(_create_multi_upload_request());
+                }
                 _cur_part_num++;
-                _wait.add();
+                _count.add_count();
                 _pending_buf->submit();
                 _pending_buf = nullptr;
             }
@@ -214,6 +234,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
S3FileBuffer& buf) {
     
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
 
     upload_request.SetContentLength(buf.get_size());
+    upload_request.SetContentType("application/octet-stream");
 
     auto upload_part_callable = _client->UploadPartCallable(upload_request);
 
@@ -228,37 +249,36 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
S3FileBuffer& buf) {
         return;
     }
 
-    std::shared_ptr<CompletedPart> completed_part = 
std::make_shared<CompletedPart>();
+    std::unique_ptr<CompletedPart> completed_part = 
std::make_unique<CompletedPart>();
     completed_part->SetPartNumber(part_num);
     auto etag = upload_part_outcome.GetResult().GetETag();
     // DCHECK(etag.empty());
     completed_part->SetETag(etag);
 
     std::unique_lock<std::mutex> lck {_completed_lock};
-    _completed_parts.emplace_back(completed_part);
+    _completed_parts.emplace_back(std::move(completed_part));
     _bytes_written += buf.get_size();
 }
 
-// TODO(AlexYue): if the whole size is less than 5MB, we can use just call put 
object method
-// to reduce the network IO num to just one time
 Status S3FileWriter::_complete() {
     SCOPED_RAW_TIMER(_upload_cost_ms.get());
     if (_failed) {
         return _st;
     }
+    // upload id is empty means there was no multipart upload
+    if (_upload_id.empty()) {
+        _wait_until_finish("PutObject");
+        return _st;
+    }
     CompleteMultipartUploadRequest complete_request;
     
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
 
-    while (!_wait.wait()) {
-        LOG(WARNING) << "Complete multipart upload already takes 5 min"
-                     << "bucket=" << _bucket << ", key=" << _path.native()
-                     << ", upload_id=" << _upload_id;
-    }
+    _wait_until_finish("Complete");
     // make sure _completed_parts are ascending order
     std::sort(_completed_parts.begin(), _completed_parts.end(),
               [](auto& p1, auto& p2) { return p1->GetPartNumber() < 
p2->GetPartNumber(); });
     CompletedMultipartUpload completed_upload;
-    for (std::shared_ptr<CompletedPart> part : _completed_parts) {
+    for (auto& part : _completed_parts) {
         completed_upload.AddParts(*part);
     }
 
@@ -281,12 +301,34 @@ Status S3FileWriter::finalize() {
     // submit pending buf if it's not nullptr
     // it's the last buf, we can submit it right now
     if (_pending_buf != nullptr) {
-        _wait.add();
+        // if we only need to upload one file less than 5MB, we can just
+        // call PutObject to reduce the network IO
+        if (_upload_id.empty()) {
+            _pending_buf->set_upload_remote_callback(
+                    [this, buf = _pending_buf]() { _put_object(*buf); });
+        }
+        _count.add_count();
         _pending_buf->submit();
         _pending_buf = nullptr;
     }
     return Status::OK();
 }
 
+void S3FileWriter::_put_object(S3FileBuffer& buf) {
+    DCHECK(!_closed && _opened);
+    Aws::S3::Model::PutObjectRequest request;
+    request.WithBucket(_bucket).WithKey(_key);
+    request.SetBody(buf.get_stream());
+    request.SetContentLength(buf.get_size());
+    request.SetContentType("application/octet-stream");
+    auto response = _client->PutObject(request);
+    if (!response.IsSuccess()) {
+        _st = Status::InternalError("Error: [{}:{}, responseCode:{}]",
+                                    response.GetError().GetExceptionName(),
+                                    response.GetError().GetMessage(),
+                                    
static_cast<int>(response.GetError().GetResponseCode()));
+    }
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 59942e6ac5..cb34477f54 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <aws/core/utils/memory/stl/AWSStringStream.h>
+#include <bthread/countdown_event.h>
 
 #include <cstddef>
 #include <list>
@@ -48,8 +49,6 @@ public:
                  FileSystemSPtr fs);
     ~S3FileWriter() override;
 
-    Status open();
-
     Status close() override;
 
     Status abort() override;
@@ -64,45 +63,10 @@ public:
     int64_t upload_cost_ms() const { return *_upload_cost_ms; }
 
 private:
-    class WaitGroup {
-    public:
-        WaitGroup() = default;
-
-        ~WaitGroup() = default;
-
-        WaitGroup(const WaitGroup&) = delete;
-        WaitGroup(WaitGroup&&) = delete;
-        void operator=(const WaitGroup&) = delete;
-        void operator=(WaitGroup&&) = delete;
-        // add one counter indicating one more concurrent worker
-        void add(int count = 1) { _count += count; }
-
-        // decrease count if one concurrent worker finished it's work
-        void done() {
-            _count--;
-            if (_count.load() <= 0) {
-                _cv.notify_all();
-            }
-        }
-
-        // wait for all concurrent workers finish their work and return true
-        // would return false if timeout, default timeout would be 5min
-        bool wait(int64_t timeout_seconds = 300) {
-            if (_count.load() <= 0) {
-                return true;
-            }
-            std::unique_lock<std::mutex> lck {_lock};
-            _cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
-                         [this]() { return _count.load() <= 0; });
-            return _count.load() <= 0;
-        }
-
-    private:
-        std::mutex _lock;
-        std::condition_variable _cv;
-        std::atomic_int64_t _count {0};
-    };
+    void _wait_until_finish(std::string task_name);
     Status _complete();
+    Status _create_multi_upload_request();
+    void _put_object(S3FileBuffer& buf);
     void _upload_one_part(int64_t part_num, S3FileBuffer& buf);
 
     std::string _bucket;
@@ -110,7 +74,7 @@ private:
     bool _closed = true;
     bool _opened = false;
 
-    std::shared_ptr<int64_t> _upload_cost_ms;
+    std::unique_ptr<int64_t> _upload_cost_ms;
 
     std::shared_ptr<Aws::S3::S3Client> _client;
     std::string _upload_id;
@@ -119,9 +83,9 @@ private:
     // Current Part Num for CompletedPart
     int _cur_part_num = 1;
     std::mutex _completed_lock;
-    std::vector<std::shared_ptr<Aws::S3::Model::CompletedPart>> 
_completed_parts;
+    std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> 
_completed_parts;
 
-    WaitGroup _wait;
+    bthread::CountdownEvent _count;
 
     std::atomic_bool _failed = false;
     Status _st = Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to