github-actions[bot] commented on code in PR #24364: URL: https://github.com/apache/doris/pull/24364#discussion_r1346143918
########## be/src/io/fs/s3_file_bufferpool.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <cstdint> +#include <fstream> +#include <functional> +#include <list> +#include <memory> +#include <mutex> + +#include "common/status.h" +#include "io/cache/block/block_file_segment.h" +#include "runtime/exec_env.h" +#include "util/slice.h" +#include "util/threadpool.h" + +namespace doris { +namespace io { +enum class BufferType { DOWNLOAD, UPLOAD }; +using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>; +struct OperationState { + OperationState(std::function<void(Status)> sync_after_complete_task, + std::function<bool()> is_cancelled) + : _sync_after_complete_task(std::move(sync_after_complete_task)), + _is_cancelled(std::move(is_cancelled)) {} + /** + * set the val of this operation state which indicates it failed or succeeded + * + * @param S the execution result + */ + void set_val(Status s = Status::OK()) { + // make sure we wouldn't sync twice + if (_value_set) [[unlikely]] { + return; + } + if (nullptr != _sync_after_complete_task) { + _sync_after_complete_task(s); + } + _value_set = true; + } + + /** + * detect whether the execution task is done + * + * @return is the execution task is done + */ + [[nodiscard]] bool is_cancelled() const { + DCHECK(nullptr != _is_cancelled); + return _is_cancelled(); + } + + std::function<void(Status)> _sync_after_complete_task; + std::function<bool()> _is_cancelled; + bool _value_set = false; +}; + +struct FileBuffer : public std::enable_shared_from_this<FileBuffer> { + FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, + OperationState state, bool reserve = false); + virtual ~FileBuffer() { on_finish(); } + /** + * submit the correspoding task to async executor + */ + virtual void submit() = 0; + /** + * append data to the inner memory buffer + * + * @param S the content to be appended + */ + virtual Status append_data(const Slice& s) = 0; + /** + * call the reclaim callback when task is done + */ + void on_finish(); + /** + * swap memory buffer + * + * @param other which has memory buffer allocated + */ + void swap_buffer(Slice& other); + /** + * set the val of it's operation state + * + * @param S the execution result + */ + void set_val(Status s) { _state.set_val(s); } + /** + * get the start offset of this file buffer + * + * @return start offset of this file buffer + */ + size_t get_file_offset() const { return _offset; } Review Comment: warning: function 'get_file_offset' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] size_t get_file_offset() const { return _offset; } ``` ########## be/src/io/fs/s3_file_bufferpool.cpp: ########## @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "s3_file_bufferpool.h" + +#include "common/config.h" +#include "common/logging.h" +#include "io/cache/block/block_file_segment.h" +#include "io/fs/s3_common.h" +#include "runtime/exec_env.h" +#include "util/defer_op.h" +#include "util/slice.h" + +namespace doris { +namespace io { + +/** + * 0. check if the inner memory buffer is empty or not + * 1. relcaim the memory buffer if it's mot empty + */ +void FileBuffer::on_finish() { + if (_buffer.empty()) { + return; + } + S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), _capacity}); + _buffer.clear(); +} + +/** + * take other buffer's memory space and refresh capacity + */ +void FileBuffer::swap_buffer(Slice& other) { + _buffer = other; + _capacity = _buffer.get_size(); + other.clear(); +} + +FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, + OperationState state, bool reserve) + : _alloc_holder(std::move(alloc_holder)), + _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)), + _offset(offset), + _size(0), + _state(std::move(state)), + _capacity(_buffer.get_size()) {} + +/** + * 0. check if file cache holder allocated + * 1. update the cache's type to index cache + */ +void UploadFileBuffer::set_index_offset(size_t offset) { + _index_offset = offset; + if (_holder) { + bool change_to_index_cache = false; + for (auto iter = _holder->file_segments.begin(); iter != _holder->file_segments.end(); + ++iter) { + if (iter == _cur_file_segment) { + change_to_index_cache = true; + } + if (change_to_index_cache) { + static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX)); + } + } + } +} + +/** + * 0. when there is memory preserved, directly write data to buf + * 1. write to file cache otherwise, then we'll wait for free buffer and to rob it + */ +Status UploadFileBuffer::append_data(const Slice& data) { + Defer defer {[&] { _size += data.get_size(); }}; + while (true) { + // if buf is not empty, it means there is memory preserved for this buf + if (!_buffer.empty()) { + std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), data.get_size()); + break; + } + // if the buf has no memory reserved, then write to disk first + if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder != nullptr) { + _holder = _alloc_holder(); + bool cache_is_not_enough = false; + for (auto& segment : _holder->file_segments) { + DCHECK(segment->state() == FileBlock::State::SKIP_CACHE || + segment->state() == FileBlock::State::EMPTY); + if (segment->state() == FileBlock::State::SKIP_CACHE) [[unlikely]] { + cache_is_not_enough = true; + break; + } + if (_index_offset != 0) { + static_cast<void>(segment->change_cache_type_self(CacheType::INDEX)); + } + } + // if cache_is_not_enough, cannot use it ! + _cur_file_segment = _holder->file_segments.begin(); + _append_offset = (*_cur_file_segment)->range().left; + _holder = cache_is_not_enough ? nullptr : std::move(_holder); + if (_holder) { + (*_cur_file_segment)->get_or_set_downloader(); + } + _is_cache_allocated = true; + } + if (_holder) [[likely]] { + size_t data_remain_size = data.get_size(); + size_t pos = 0; + while (data_remain_size != 0) { + auto range = (*_cur_file_segment)->range(); + size_t segment_remain_size = range.right - _append_offset + 1; + size_t append_size = std::min(data_remain_size, segment_remain_size); + Slice append_data(data.get_data() + pos, append_size); + // When there is no available free memory buffer, the data will be written to the cache first + // and then uploaded to S3 when there is an available free memory buffer. + // However, if an error occurs during the write process to the local cache, + // continuing to upload the dirty data from the cache to S3 will result in erroneous data(Bad segment). + // Considering that local disk write failures are rare, a simple approach is chosen here, + // which is to treat the import as a failure directly when a local write failure occurs + RETURN_IF_ERROR((*_cur_file_segment)->append(append_data)); + if (segment_remain_size == append_size) { + RETURN_IF_ERROR((*_cur_file_segment)->finalize_write()); + if (++_cur_file_segment != _holder->file_segments.end()) { + (*_cur_file_segment)->get_or_set_downloader(); + } + } + data_remain_size -= append_size; + _append_offset += append_size; + pos += append_size; + } + break; + } else { + // wait allocate buffer pool + auto tmp = S3FileBufferPool::GetInstance()->allocate(true); + swap_buffer(tmp); + } + } + return Status::OK(); +} + +/** + * 0. allocate one memory buffer + * 1. read the content from the cache and then write + * it into memory buffer + */ +void UploadFileBuffer::read_from_cache() { + auto tmp = S3FileBufferPool::GetInstance()->allocate(true); + swap_buffer(tmp); + + DCHECK(_holder != nullptr); + DCHECK(_capacity >= _size); + size_t pos = 0; + for (auto& segment : _holder->file_segments) { + if (pos == _size) { + break; + } + if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] { + set_val(std::move(s)); + return; + } + size_t segment_size = segment->range().size(); + Slice s(_buffer.get_data() + pos, segment_size); + if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] { + set_val(std::move(st)); + return; + } + pos += segment_size; + } + + // the real lenght should be the buf.get_size() in this situation(consider it's the last part, + // size of it could be less than 5MB) + _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), _size); +} + +/** + * 0. constrcut the stream ptr if the buffer is not empty + * 1. submit the on_upload() callback to executor + */ +void UploadFileBuffer::submit() { + if (!_buffer.empty()) [[likely]] { + _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), _size); + } + // If the data is written into file cache + if (_holder && _cur_file_segment != _holder->file_segments.end()) { + if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) [[unlikely]] { + set_val(std::move(s)); + return; + } + } + static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func( + [buf = this->shared_from_this(), this]() { + // to extend buf's lifetime + // (void)buf; + on_upload(); + })); +} + +/** + * write the content of the memory buffer to local file cache + */ +void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) { + if (!config::enable_file_cache || _alloc_holder == nullptr) { + return; + } + if (_holder) { + return; + } + if (is_cancelled) { + return; + } + // the data is already written to S3 in this situation + // so i didn't handle the file cache write error + _holder = _alloc_holder(); + size_t pos = 0; + size_t data_remain_size = _size; + for (auto& segment : _holder->file_segments) { + if (data_remain_size == 0) { + break; + } + size_t segment_size = segment->range().size(); + size_t append_size = std::min(data_remain_size, segment_size); + if (segment->state() == FileBlock::State::EMPTY) { + if (_index_offset != 0 && segment->range().right >= _index_offset) { + // segment->change_cache_type_self(CacheType::INDEX); + } + segment->get_or_set_downloader(); + // Another thread may have started downloading due to a query + // Just skip putting to cache from UploadFileBuffer + if (segment->is_downloader()) { + Slice s(_buffer.get_data() + pos, append_size); + if (auto st = segment->append(s); !st.ok()) [[unlikely]] { + LOG_WARNING("append data to cache segmetn failed due to {}", st); + return; + } + if (auto st = segment->finalize_write(); !st.ok()) [[unlikely]] { + LOG_WARNING("finalize write to cache segmetn failed due to {}", st); + return; + } + } + } + data_remain_size -= append_size; + pos += append_size; + } +} + +FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) { + _type = type; + return *this; +} +FileBufferBuilder& FileBufferBuilder::set_upload_callback( + std::function<void(UploadFileBuffer& buf)> cb) { + _upload_cb = std::move(cb); + return *this; +} +// set callback to do task sync for the caller +FileBufferBuilder& FileBufferBuilder::set_sync_after_complete_task(std::function<void(Status)> cb) { + _sync_after_complete_task = std::move(cb); + return *this; +} + +FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder( + std::function<FileBlocksHolderPtr()> cb) { + _alloc_holder_cb = std::move(cb); + return *this; +} + +std::shared_ptr<FileBuffer> FileBufferBuilder::build() { Review Comment: warning: method 'build' can be made const [readability-make-member-function-const] ```suggestion std::shared_ptr<FileBuffer> FileBufferBuilder::build() const { ``` be/src/io/fs/s3_file_bufferpool.h:214: ```diff - std::shared_ptr<FileBuffer> build(); + std::shared_ptr<FileBuffer> build() const; ``` ########## be/src/io/fs/s3_file_bufferpool.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <cstdint> +#include <fstream> +#include <functional> +#include <list> +#include <memory> +#include <mutex> + +#include "common/status.h" +#include "io/cache/block/block_file_segment.h" +#include "runtime/exec_env.h" +#include "util/slice.h" +#include "util/threadpool.h" + +namespace doris { +namespace io { +enum class BufferType { DOWNLOAD, UPLOAD }; +using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>; +struct OperationState { + OperationState(std::function<void(Status)> sync_after_complete_task, + std::function<bool()> is_cancelled) + : _sync_after_complete_task(std::move(sync_after_complete_task)), + _is_cancelled(std::move(is_cancelled)) {} + /** + * set the val of this operation state which indicates it failed or succeeded + * + * @param S the execution result + */ + void set_val(Status s = Status::OK()) { + // make sure we wouldn't sync twice + if (_value_set) [[unlikely]] { + return; + } + if (nullptr != _sync_after_complete_task) { + _sync_after_complete_task(s); + } + _value_set = true; + } + + /** + * detect whether the execution task is done + * + * @return is the execution task is done + */ + [[nodiscard]] bool is_cancelled() const { + DCHECK(nullptr != _is_cancelled); + return _is_cancelled(); + } + + std::function<void(Status)> _sync_after_complete_task; + std::function<bool()> _is_cancelled; + bool _value_set = false; +}; + +struct FileBuffer : public std::enable_shared_from_this<FileBuffer> { + FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, + OperationState state, bool reserve = false); + virtual ~FileBuffer() { on_finish(); } + /** + * submit the correspoding task to async executor + */ + virtual void submit() = 0; + /** + * append data to the inner memory buffer + * + * @param S the content to be appended + */ + virtual Status append_data(const Slice& s) = 0; + /** + * call the reclaim callback when task is done + */ + void on_finish(); + /** + * swap memory buffer + * + * @param other which has memory buffer allocated + */ + void swap_buffer(Slice& other); + /** + * set the val of it's operation state + * + * @param S the execution result + */ + void set_val(Status s) { _state.set_val(s); } + /** + * get the start offset of this file buffer + * + * @return start offset of this file buffer + */ + size_t get_file_offset() const { return _offset; } + /** + * get the size of the buffered data + * + * @return the size of the buffered data + */ + size_t get_size() const { return _size; } Review Comment: warning: function 'get_size' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] size_t get_size() const { return _size; } ``` ########## be/test/io/fs/s3_file_writer_test.cpp: ########## @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <aws/s3/S3Client.h> Review Comment: warning: 'aws/s3/S3Client.h' file not found [clang-diagnostic-error] ```cpp #include <aws/s3/S3Client.h> ^ ``` ########## be/src/io/fs/s3_file_bufferpool.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <cstdint> +#include <fstream> +#include <functional> +#include <list> +#include <memory> +#include <mutex> + +#include "common/status.h" +#include "io/cache/block/block_file_segment.h" +#include "runtime/exec_env.h" +#include "util/slice.h" +#include "util/threadpool.h" + +namespace doris { +namespace io { +enum class BufferType { DOWNLOAD, UPLOAD }; +using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>; +struct OperationState { + OperationState(std::function<void(Status)> sync_after_complete_task, + std::function<bool()> is_cancelled) + : _sync_after_complete_task(std::move(sync_after_complete_task)), + _is_cancelled(std::move(is_cancelled)) {} + /** + * set the val of this operation state which indicates it failed or succeeded + * + * @param S the execution result + */ + void set_val(Status s = Status::OK()) { + // make sure we wouldn't sync twice + if (_value_set) [[unlikely]] { + return; + } + if (nullptr != _sync_after_complete_task) { + _sync_after_complete_task(s); + } + _value_set = true; + } + + /** + * detect whether the execution task is done + * + * @return is the execution task is done + */ + [[nodiscard]] bool is_cancelled() const { + DCHECK(nullptr != _is_cancelled); + return _is_cancelled(); + } + + std::function<void(Status)> _sync_after_complete_task; + std::function<bool()> _is_cancelled; + bool _value_set = false; +}; + +struct FileBuffer : public std::enable_shared_from_this<FileBuffer> { + FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, + OperationState state, bool reserve = false); + virtual ~FileBuffer() { on_finish(); } + /** + * submit the correspoding task to async executor + */ + virtual void submit() = 0; + /** + * append data to the inner memory buffer + * + * @param S the content to be appended + */ + virtual Status append_data(const Slice& s) = 0; + /** + * call the reclaim callback when task is done + */ + void on_finish(); + /** + * swap memory buffer + * + * @param other which has memory buffer allocated + */ + void swap_buffer(Slice& other); + /** + * set the val of it's operation state + * + * @param S the execution result + */ + void set_val(Status s) { _state.set_val(s); } + /** + * get the start offset of this file buffer + * + * @return start offset of this file buffer + */ + size_t get_file_offset() const { return _offset; } + /** + * get the size of the buffered data + * + * @return the size of the buffered data + */ + size_t get_size() const { return _size; } + /** + * detect whether the execution task is done + * + * @return is the execution task is done + */ + bool is_cancelled() const { return _state.is_cancelled(); } Review Comment: warning: function 'is_cancelled' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] bool is_cancelled() const { return _state.is_cancelled(); } ``` ########## be/src/io/fs/s3_file_bufferpool.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <cstdint> +#include <fstream> +#include <functional> +#include <list> +#include <memory> +#include <mutex> + +#include "common/status.h" +#include "io/cache/block/block_file_segment.h" +#include "runtime/exec_env.h" +#include "util/slice.h" +#include "util/threadpool.h" + +namespace doris { +namespace io { +enum class BufferType { DOWNLOAD, UPLOAD }; +using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>; +struct OperationState { + OperationState(std::function<void(Status)> sync_after_complete_task, + std::function<bool()> is_cancelled) + : _sync_after_complete_task(std::move(sync_after_complete_task)), + _is_cancelled(std::move(is_cancelled)) {} + /** + * set the val of this operation state which indicates it failed or succeeded + * + * @param S the execution result + */ + void set_val(Status s = Status::OK()) { + // make sure we wouldn't sync twice + if (_value_set) [[unlikely]] { + return; + } + if (nullptr != _sync_after_complete_task) { + _sync_after_complete_task(s); + } + _value_set = true; + } + + /** + * detect whether the execution task is done + * + * @return is the execution task is done + */ + [[nodiscard]] bool is_cancelled() const { + DCHECK(nullptr != _is_cancelled); + return _is_cancelled(); + } + + std::function<void(Status)> _sync_after_complete_task; + std::function<bool()> _is_cancelled; + bool _value_set = false; +}; + +struct FileBuffer : public std::enable_shared_from_this<FileBuffer> { + FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, + OperationState state, bool reserve = false); + virtual ~FileBuffer() { on_finish(); } + /** + * submit the correspoding task to async executor + */ + virtual void submit() = 0; + /** + * append data to the inner memory buffer + * + * @param S the content to be appended + */ + virtual Status append_data(const Slice& s) = 0; + /** + * call the reclaim callback when task is done + */ + void on_finish(); + /** + * swap memory buffer + * + * @param other which has memory buffer allocated + */ + void swap_buffer(Slice& other); + /** + * set the val of it's operation state + * + * @param S the execution result + */ + void set_val(Status s) { _state.set_val(s); } + /** + * get the start offset of this file buffer + * + * @return start offset of this file buffer + */ + size_t get_file_offset() const { return _offset; } + /** + * get the size of the buffered data + * + * @return the size of the buffered data + */ + size_t get_size() const { return _size; } + /** + * detect whether the execution task is done + * + * @return is the execution task is done + */ + bool is_cancelled() const { return _state.is_cancelled(); } + + std::function<FileBlocksHolderPtr()> _alloc_holder; + Slice _buffer; + size_t _offset; + size_t _size; + OperationState _state; + size_t _capacity; +}; + +struct UploadFileBuffer final : public FileBuffer { + UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state, + size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder, + size_t index_offset) + : FileBuffer(alloc_holder, offset, state), + _upload_to_remote(std::move(upload_cb)), + _index_offset(index_offset) {} + ~UploadFileBuffer() override = default; + void submit() override; + /** + * set the index offset + * + * @param offset the index offset + */ + void set_index_offset(size_t offset); + Status append_data(const Slice& s) override; + /** + * read the content from local file cache + * because previously lack of memory buffer + */ + void read_from_cache(); + /** + * write the content inside memory buffer into + * local file cache + */ + void upload_to_local_file_cache(bool); + /** + * do the upload work + * 1. read from cache if the data is written to cache first + * 2. upload content of buffer to S3 + * 3. upload content to file cache if necessary + * 4. call the finish callback caller specified + * 5. reclaim self + */ + void on_upload() { + if (_buffer.empty()) { + read_from_cache(); + } + _upload_to_remote(*this); + if (config::enable_flush_file_cache_async) { + // If we call is_cancelled after set_val then the outside resource of _state might already + // be desturcted and result in heap use after free + bool cancelled = is_cancelled(); + _state.set_val(); + // this control flow means the buf and the stream shares one memory + // so we can directly use buf here + upload_to_local_file_cache(cancelled); + } else { + upload_to_local_file_cache(is_cancelled()); + _state.set_val(); + } + on_finish(); + } + /** + * + * @return the stream representing the inner memory buffer + */ + std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; } Review Comment: warning: function 'get_stream' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org