morningman commented on code in PR #18301: URL: https://github.com/apache/doris/pull/18301#discussion_r1154541875
########## be/src/io/fs/hdfs_file_reader.cpp: ########## @@ -40,6 +40,10 @@ HdfsFileReader::~HdfsFileReader() { } Status HdfsFileReader::close() { + if (_closed) { + return Status::OK(); + } + std::lock_guard<std::mutex> lock(_lock); Review Comment: Is it necessary to make a hdfs reader thread safe? ########## be/src/common/config.h: ########## @@ -892,6 +892,9 @@ CONF_mInt16(pipeline_short_query_timeout_s, "20"); // Will remove after fully test. CONF_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true"); +// buffered prefetch +CONF_mInt32(prefetch_single_buffer_size_mb, "4"); Review Comment: I think we can remove this config, use a fixed value, if there is no need to modify this. ########## be/src/runtime/exec_env_init.cpp: ########## @@ -100,6 +100,11 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { init_download_cache_required_components(); + ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") + .set_min_threads(1024) Review Comment: It it too many. ########## be/src/io/fs/buffered_reader.h: ########## @@ -29,6 +31,107 @@ namespace doris { namespace io { +class BufferedReader; +struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { + enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; + PrefetchBuffer() = default; + PrefetchBuffer(size_t start_offset, size_t end_offset, size_t buffer_size, + size_t whole_buffer_size, io::FileReader* reader) + : _start_offset(start_offset), + _end_offset(end_offset), + _size(buffer_size), + _whole_buffer_size(whole_buffer_size), + _reader(reader), + _buf(buffer_size, '0') {} + PrefetchBuffer(PrefetchBuffer&& other) + : _offset(other._offset), + _start_offset(other._start_offset), + _end_offset(other._end_offset), + _size(other._size), + _whole_buffer_size(other._whole_buffer_size), + _reader(other._reader), + _buf(std::move(other._buf)) {} + ~PrefetchBuffer() = default; + size_t _offset; + // [_start_offset, _end_offset) is the range that can be prefetched. + // Notice that the reader can read out of [_start_offset, _end_offset), because FE does not align the file + // according to the format when splitting it. + size_t _start_offset; + size_t _end_offset; + size_t _size; + size_t _len {0}; + size_t _whole_buffer_size; + io::FileReader* _reader; + std::string _buf; + BufferStatus _buffer_status {BufferStatus::RESET}; + std::mutex _lock; + std::condition_variable _prefetched; + Status _prefetch_status {Status::OK()}; + // @brief: reset the start offset of this buffer to offset + // @param: the new start offset for this buffer + void reset_offset(size_t offset); + // @brief: start to fetch the content between [_offset, _offset + _size) + void prefetch_buffer(); + // @brief: used by BufferedReader to read the prefetched data + // @param[off] read start address + // @param[buf] buffer to put the actual content + // @param[buf_len] maximum len trying to read + // @param[bytes_read] actual bytes read + Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read); + // @brief: shut down the buffer until the prior prefetching task is done + void close(); + // @brief: to detect whether this buffer contains off + // @param[off] detect offset + bool inline contains(size_t off) const { return _offset <= off && off < _offset + _size; } +}; + +class BufferedReader : public io::FileReader { +public: + BufferedReader(io::FileReaderSPtr reader, int64_t offset, int64_t length, + int64_t buffer_size = -1L); + ~BufferedReader() override; + + Status close() override; + + const io::Path& path() const override { return _reader->path(); } + + size_t size() const override { return _size; } + + bool closed() const override { return _closed; } + + std::shared_ptr<io::FileSystem> fs() const override { return _reader->fs(); } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override; + +private: + size_t get_buffer_pos(int64_t position) const { + return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size; + } + size_t get_buffer_offset(int64_t position) const { + return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size; + } + void resetAllBuffer(size_t position) { Review Comment: ```suggestion void reset_all_buffer(size_t position) { ``` ########## be/src/io/fs/buffered_reader.cpp: ########## @@ -28,6 +28,166 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + IOContext io_context; + + size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - _offset; + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, &io_context); + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); + if (!s.ok() && _offset < _reader->size()) { + _prefetch_status = std::move(s); + } + _buffer_status = BufferStatus::PREFETCHED; + _prefetched.notify_all(); + // eof would come up with len == 0, it would be handled by read_buffer +} + +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, + size_t* bytes_read) { + if (UNLIKELY(off >= _end_offset)) { + // Reader can read out of [_start_offset, _end_offset) by synchronous method. + IOContext io_context; + return _reader->read_at(off, Slice {out, buf_len}, bytes_read, &io_context); + } + { + std::unique_lock lck {_lock}; + // buffer must be prefetched or it's closed + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::PREFETCHED || + _buffer_status == BufferStatus::CLOSED; + }); + if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) { + return Status::OK(); + } + } + RETURN_IF_ERROR(_prefetch_status); + // there is only parquet would do not sequence read + // it would read the end of the file first + if (UNLIKELY(!contains(off))) { + reset_offset((off / _size) * _size); + return read_buffer(off, out, buf_len, bytes_read); + } + if (UNLIKELY(0 == _len || _offset + _len < off)) { + return Status::OK(); + } + // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has + size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); + memcpy((void*)out, _buf.data() + (off - _offset), read_len); + *bytes_read = read_len; + if (off + *bytes_read == _offset + _len) { + reset_offset(_offset + _whole_buffer_size); + } + return Status::OK(); +} + +void PrefetchBuffer::close() { + std::unique_lock lck {_lock}; + // in case _reader still tries to write to the buf after we close the buffer + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + _buffer_status = BufferStatus::CLOSED; + _prefetched.notify_all(); +} + +// buffered reader +BufferedReader::BufferedReader(io::FileReaderSPtr reader, int64_t offset, int64_t length, + int64_t buffer_size) + : _reader(std::move(reader)), _start_offset(offset), _end_offset(offset + length) { + if (buffer_size == -1L) { + buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; + } + _size = _reader->size(); + _whole_pre_buffer_size = buffer_size; +#ifdef BE_TEST + s_max_pre_buffer_size = config::prefetch_single_buffer_size_mb; Review Comment: ```suggestion s_max_pre_buffer_size = config::prefetch_single_buffer_size_mb * 1024 * 1024; ``` ########## be/src/io/fs/buffered_reader.cpp: ########## @@ -28,6 +28,166 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + IOContext io_context; + + size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - _offset; + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, &io_context); + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); + if (!s.ok() && _offset < _reader->size()) { + _prefetch_status = std::move(s); + } + _buffer_status = BufferStatus::PREFETCHED; + _prefetched.notify_all(); + // eof would come up with len == 0, it would be handled by read_buffer +} + +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, + size_t* bytes_read) { + if (UNLIKELY(off >= _end_offset)) { + // Reader can read out of [_start_offset, _end_offset) by synchronous method. + IOContext io_context; + return _reader->read_at(off, Slice {out, buf_len}, bytes_read, &io_context); Review Comment: ```suggestion return _reader->read_at(off, Slice {out, buf_len}, bytes_read); ``` ########## be/src/io/fs/buffered_reader.cpp: ########## @@ -28,6 +28,166 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + IOContext io_context; Review Comment: ```suggestion ``` ########## be/src/io/fs/buffered_reader.cpp: ########## @@ -28,6 +28,166 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + IOContext io_context; + + size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - _offset; + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, &io_context); Review Comment: ```suggestion s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len); ``` ########## be/src/io/fs/buffered_reader.cpp: ########## @@ -28,6 +28,166 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + IOContext io_context; + + size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - _offset; + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, &io_context); + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); + if (!s.ok() && _offset < _reader->size()) { + _prefetch_status = std::move(s); + } + _buffer_status = BufferStatus::PREFETCHED; + _prefetched.notify_all(); + // eof would come up with len == 0, it would be handled by read_buffer +} + +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, + size_t* bytes_read) { + if (UNLIKELY(off >= _end_offset)) { + // Reader can read out of [_start_offset, _end_offset) by synchronous method. + IOContext io_context; Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org