This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b0c215e694 [enhance](be)add more profile in prefetched buffered reader (#19119) b0c215e694 is described below commit b0c215e694811822898919c5cfe94e525c41bb14 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Tue May 2 09:53:39 2023 +0800 [enhance](be)add more profile in prefetched buffered reader (#19119) --- be/src/io/fs/buffered_reader.cpp | 54 +++++++++++++++++++++++++++++----- be/src/io/fs/buffered_reader.h | 24 +++++++++++---- be/src/service/internal_service.cpp | 5 +++- be/test/io/fs/buffered_reader_test.cpp | 12 +++++--- 4 files changed, 77 insertions(+), 18 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index dab5ab8f61..f57aa16f86 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -406,8 +406,13 @@ void PrefetchBuffer::prefetch_buffer() { buf_size = merge_small_ranges(_offset, read_range_index); } - s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); + { + SCOPED_RAW_TIMER(&_statis.read_time); + s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); + } g_bytes_downloaded << _len; + _statis.prefetch_request_io += 1; + _statis.prefetch_request_bytes += _len; std::unique_lock lck {_lock}; _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); if (!s.ok() && _offset < _reader->size()) { @@ -506,8 +511,13 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, } // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); - memcpy((void*)out, _buf.get() + (off - _offset), read_len); + { + SCOPED_RAW_TIMER(&_statis.copy_time); + memcpy((void*)out, _buf.get() + (off - _offset), read_len); + } *bytes_read = read_len; + _statis.request_io += 1; + _statis.request_bytes += read_len; if (off + *bytes_read == _offset + _len) { reset_offset(_offset + _whole_buffer_size); } @@ -520,11 +530,15 @@ void PrefetchBuffer::close() { _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); _buffer_status = BufferStatus::CLOSED; _prefetched.notify_all(); + if (_sync_profile != nullptr) { + _sync_profile(*this); + } } // buffered reader -PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, PrefetchRange file_range, - const IOContext* io_ctx, int64_t buffer_size) +PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, + PrefetchRange file_range, const IOContext* io_ctx, + int64_t buffer_size) : _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx) { if (buffer_size == -1L) { buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; @@ -533,12 +547,35 @@ PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, Prefet _whole_pre_buffer_size = buffer_size; _file_range.end_offset = std::min(_file_range.end_offset, _size); int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size / s_max_pre_buffer_size : 1; + std::function<void(PrefetchBuffer&)> sync_buffer = nullptr; + if (profile != nullptr) { + const char* prefetch_buffered_reader = "PrefetchBufferedReader"; + ADD_TIMER(profile, prefetch_buffered_reader); + auto copy_time = ADD_CHILD_TIMER(profile, "CopyTime", prefetch_buffered_reader); + auto read_time = ADD_CHILD_TIMER(profile, "ReadTime", prefetch_buffered_reader); + auto prefetch_request_io = + ADD_CHILD_COUNTER(profile, "PreRequestIO", TUnit::UNIT, prefetch_buffered_reader); + auto prefetch_request_bytes = ADD_CHILD_COUNTER(profile, "PreRequestBytes", TUnit::BYTES, + prefetch_buffered_reader); + auto request_io = + ADD_CHILD_COUNTER(profile, "RequestIO", TUnit::UNIT, prefetch_buffered_reader); + auto request_bytes = + ADD_CHILD_COUNTER(profile, "RequestBytes", TUnit::BYTES, prefetch_buffered_reader); + sync_buffer = [=](PrefetchBuffer& buf) { + COUNTER_UPDATE(copy_time, buf._statis.copy_time); + COUNTER_UPDATE(read_time, buf._statis.read_time); + COUNTER_UPDATE(prefetch_request_io, buf._statis.prefetch_request_io); + COUNTER_UPDATE(prefetch_request_bytes, buf._statis.prefetch_request_bytes); + COUNTER_UPDATE(request_io, buf._statis.request_io); + COUNTER_UPDATE(request_bytes, buf._statis.request_bytes); + }; + } // set the _cur_offset of this reader as same as the inner reader's, // to make sure the buffer reader will start to read at right position. for (int i = 0; i < buffer_num; i++) { - _pre_buffers.emplace_back( - std::make_shared<PrefetchBuffer>(_file_range, s_max_pre_buffer_size, - _whole_pre_buffer_size, _reader.get(), _io_ctx)); + _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>( + _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), _io_ctx, + sync_buffer)); } } @@ -690,7 +727,8 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile, *file_reader = std::make_shared<InMemoryFileReader>(reader); } else if (access_mode == AccessMode::SEQUENTIAL) { io::FileReaderSPtr safeReader = std::make_shared<ThreadSafeReader>(reader); - *file_reader = std::make_shared<io::PrefetchBufferedReader>(safeReader, file_range, io_ctx); + *file_reader = std::make_shared<io::PrefetchBufferedReader>(profile, safeReader, file_range, + io_ctx); } else { *file_reader = std::move(reader); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 0208139ba1..f22789de8f 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -314,13 +314,15 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size, - io::FileReader* reader, const IOContext* io_ctx) + io::FileReader* reader, const IOContext* io_ctx, + std::function<void(PrefetchBuffer&)> sync_profile) : _file_range(file_range), _size(buffer_size), _whole_buffer_size(whole_buffer_size), _reader(reader), _io_ctx(io_ctx), - _buf(new char[buffer_size]) {} + _buf(new char[buffer_size]), + _sync_profile(sync_profile) {} PrefetchBuffer(PrefetchBuffer&& other) : _offset(other._offset), @@ -330,7 +332,8 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { _whole_buffer_size(other._whole_buffer_size), _reader(other._reader), _io_ctx(other._io_ctx), - _buf(std::move(other._buf)) {} + _buf(std::move(other._buf)), + _sync_profile(std::move(other._sync_profile)) {} ~PrefetchBuffer() = default; @@ -351,6 +354,16 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { std::condition_variable _prefetched; Status _prefetch_status {Status::OK()}; std::atomic_bool _exceed = false; + std::function<void(PrefetchBuffer&)> _sync_profile; + struct Statistics { + int64_t copy_time {0}; + int64_t read_time {0}; + int64_t prefetch_request_io {0}; + int64_t prefetch_request_bytes {0}; + int64_t request_io {0}; + int64_t request_bytes {0}; + }; + Statistics _statis; // @brief: reset the start offset of this buffer to offset // @param: the new start offset for this buffer @@ -396,8 +409,9 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { */ class PrefetchBufferedReader : public io::FileReader { public: - PrefetchBufferedReader(io::FileReaderSPtr reader, PrefetchRange file_range, - const IOContext* io_ctx = nullptr, int64_t buffer_size = -1L); + PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, + PrefetchRange file_range, const IOContext* io_ctx = nullptr, + int64_t buffer_size = -1L); ~PrefetchBufferedReader() override; Status close() override; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0dbc26d6ac..ab639e6f29 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -512,8 +512,11 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c const TFileRangeDesc& range = file_scan_range.ranges.at(0); const TFileScanRangeParams& params = file_scan_range.params; + // make sure profile is desctructed after reader cause PrefetchBufferedReader + // might asynchronouslly access the profile + std::unique_ptr<RuntimeProfile> profile = + std::make_unique<RuntimeProfile>("FetchTableSchema"); std::unique_ptr<vectorized::GenericReader> reader(nullptr); - std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); io::IOContext io_ctx; io::FileCacheStatistics file_cache_statis; io_ctx.file_cache_stats = &file_cache_statis; diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 35b0cc60e1..6a281e125f 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -124,7 +124,8 @@ TEST_F(BufferedReaderTest, normal_use) { io::global_local_filesystem()->open_file( "./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file", &local_reader); auto sync_local_reader = std::make_shared<SyncLocalFileReader>(std::move(local_reader)); - io::PrefetchBufferedReader reader(std::move(sync_local_reader), io::PrefetchRange(0, 1024)); + io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader), + io::PrefetchRange(0, 1024)); uint8_t buf[1024]; Slice result {buf, 1024}; MonotonicStopWatch watch; @@ -143,7 +144,8 @@ TEST_F(BufferedReaderTest, test_validity) { "./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt", &local_reader); auto sync_local_reader = std::make_shared<SyncLocalFileReader>(std::move(local_reader)); - io::PrefetchBufferedReader reader(std::move(sync_local_reader), io::PrefetchRange(0, 1024)); + io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader), + io::PrefetchRange(0, 1024)); Status st; uint8_t buf[10]; Slice result {buf, 10}; @@ -192,7 +194,8 @@ TEST_F(BufferedReaderTest, test_seek) { "./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt", &local_reader); auto sync_local_reader = std::make_shared<SyncLocalFileReader>(std::move(local_reader)); - io::PrefetchBufferedReader reader(std::move(sync_local_reader), io::PrefetchRange(0, 1024)); + io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader), + io::PrefetchRange(0, 1024)); Status st; uint8_t buf[10]; @@ -238,7 +241,8 @@ TEST_F(BufferedReaderTest, test_miss) { "./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt", &local_reader); auto sync_local_reader = std::make_shared<SyncLocalFileReader>(std::move(local_reader)); - io::PrefetchBufferedReader reader(std::move(sync_local_reader), io::PrefetchRange(0, 1024)); + io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader), + io::PrefetchRange(0, 1024)); uint8_t buf[128]; Slice result {buf, 128}; size_t bytes_read; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org