platoneko commented on code in PR #24364: URL: https://github.com/apache/doris/pull/24364#discussion_r1360201116
########## be/src/io/fs/s3_file_writer.cpp: ########## @@ -205,30 +221,58 @@ Status S3FileWriter::close() { Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { DCHECK(!_closed); size_t buffer_size = config::s3_write_buffer_size; + DBUG_EXECUTE_IF("s3_file_writer::appendv", + { return Status::InternalError("failed to append data"); }); for (size_t i = 0; i < data_cnt; i++) { size_t data_size = data[i].get_size(); for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) { if (_failed) { return _st; } if (!_pending_buf) { - _pending_buf = S3FileBufferPool::GetInstance()->allocate(); - // capture part num by value along with the value of the shared ptr - _pending_buf->set_upload_remote_callback( - [part_num = _cur_part_num, this, cur_buf = _pending_buf]() { - _upload_one_part(part_num, *cur_buf); - }); - _pending_buf->set_file_offset(_bytes_appended); - // later we might need to wait all prior tasks to be finished - _pending_buf->set_finish_upload([this]() { _countdown_event.signal(); }); - _pending_buf->set_is_cancel([this]() { return _failed.load(); }); - _pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) { - VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num - << ", st " << st.to_string(); - std::unique_lock<std::mutex> _lck {_completed_lock}; - this->_st = std::move(st); - _failed = true; - }); + auto builder = FileBufferBuilder(); + builder.set_type(BufferType::UPLOAD) + .set_upload_callback( + [part_num = _cur_part_num, this](UploadFileBuffer& buf) { + _upload_one_part(part_num, buf); + }) + .set_file_offset(_bytes_appended) + .set_index_offset(_index_offset) + .set_sync_after_complete_task([this, part_num = _cur_part_num](Status s) { + bool ret = false; + if (!s.ok()) [[unlikely]] { + VLOG_NOTICE << "failed at key: " << _key << ", load part " + << part_num << ", st " << s; + std::unique_lock<std::mutex> _lck {_completed_lock}; + _failed = true; + ret = true; + this->_st = std::move(s); + } + // After the signal, there is a scenario where the previous invocation of _wait_until_finish + // returns to the caller, and subsequently, the S3 file writer is destructed. + // This means that accessing _failed afterwards would result in a heap use after free vulnerability. + _countdown_event.signal(); + return ret; + }) + .set_is_cancelled([this]() { return _failed.load(); }); + if (!_write_file_cache) { Review Comment: ```suggestion if (_write_file_cache) { ``` ########## be/src/io/cache/block/block_file_cache.cpp: ########## @@ -65,6 +65,8 @@ std::string IFileCache::cache_type_to_string(CacheType type) { return "_disposable"; case CacheType::NORMAL: return ""; + case CacheType::TTL: + return "_ttl"; Review Comment: return `std::string_view` ########## be/src/io/fs/s3_file_writer.h: ########## @@ -68,20 +69,26 @@ class S3FileWriter final : public FileWriter { std::shared_ptr<Aws::S3::S3Client> _client; std::string _upload_id; + size_t _index_offset {0}; // Current Part Num for CompletedPart int _cur_part_num = 1; std::mutex _completed_lock; std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts; + IFileCache::Key _cache_key; + IFileCache* _cache; Review Comment: 建议初始化一下 ########## be/src/io/fs/s3_file_writer.h: ########## @@ -68,20 +69,26 @@ class S3FileWriter final : public FileWriter { std::shared_ptr<Aws::S3::S3Client> _client; std::string _upload_id; + size_t _index_offset {0}; // Current Part Num for CompletedPart int _cur_part_num = 1; std::mutex _completed_lock; std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts; + IFileCache::Key _cache_key; + IFileCache* _cache; Review Comment: 建议初始化一下 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org