This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b0c215e694 [enhance](be)add more profile in prefetched buffered reader 
(#19119)
b0c215e694 is described below

commit b0c215e694811822898919c5cfe94e525c41bb14
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Tue May 2 09:53:39 2023 +0800

    [enhance](be)add more profile in prefetched buffered reader (#19119)
---
 be/src/io/fs/buffered_reader.cpp       | 54 +++++++++++++++++++++++++++++-----
 be/src/io/fs/buffered_reader.h         | 24 +++++++++++----
 be/src/service/internal_service.cpp    |  5 +++-
 be/test/io/fs/buffered_reader_test.cpp | 12 +++++---
 4 files changed, 77 insertions(+), 18 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index dab5ab8f61..f57aa16f86 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -406,8 +406,13 @@ void PrefetchBuffer::prefetch_buffer() {
         buf_size = merge_small_ranges(_offset, read_range_index);
     }
 
-    s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, 
_io_ctx);
+    {
+        SCOPED_RAW_TIMER(&_statis.read_time);
+        s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, 
_io_ctx);
+    }
     g_bytes_downloaded << _len;
+    _statis.prefetch_request_io += 1;
+    _statis.prefetch_request_bytes += _len;
     std::unique_lock lck {_lock};
     _prefetched.wait(lck, [this]() { return _buffer_status == 
BufferStatus::PENDING; });
     if (!s.ok() && _offset < _reader->size()) {
@@ -506,8 +511,13 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* 
out, size_t buf_len,
     }
     // [0]: maximum len trying to read, [1] maximum length buffer can provide, 
[2] actual len buffer has
     size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len 
- off});
-    memcpy((void*)out, _buf.get() + (off - _offset), read_len);
+    {
+        SCOPED_RAW_TIMER(&_statis.copy_time);
+        memcpy((void*)out, _buf.get() + (off - _offset), read_len);
+    }
     *bytes_read = read_len;
+    _statis.request_io += 1;
+    _statis.request_bytes += read_len;
     if (off + *bytes_read == _offset + _len) {
         reset_offset(_offset + _whole_buffer_size);
     }
@@ -520,11 +530,15 @@ void PrefetchBuffer::close() {
     _prefetched.wait(lck, [this]() { return _buffer_status != 
BufferStatus::PENDING; });
     _buffer_status = BufferStatus::CLOSED;
     _prefetched.notify_all();
+    if (_sync_profile != nullptr) {
+        _sync_profile(*this);
+    }
 }
 
 // buffered reader
-PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, 
PrefetchRange file_range,
-                                               const IOContext* io_ctx, 
int64_t buffer_size)
+PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, 
io::FileReaderSPtr reader,
+                                               PrefetchRange file_range, const 
IOContext* io_ctx,
+                                               int64_t buffer_size)
         : _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx) 
{
     if (buffer_size == -1L) {
         buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
@@ -533,12 +547,35 @@ 
PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, Prefet
     _whole_pre_buffer_size = buffer_size;
     _file_range.end_offset = std::min(_file_range.end_offset, _size);
     int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size / 
s_max_pre_buffer_size : 1;
+    std::function<void(PrefetchBuffer&)> sync_buffer = nullptr;
+    if (profile != nullptr) {
+        const char* prefetch_buffered_reader = "PrefetchBufferedReader";
+        ADD_TIMER(profile, prefetch_buffered_reader);
+        auto copy_time = ADD_CHILD_TIMER(profile, "CopyTime", 
prefetch_buffered_reader);
+        auto read_time = ADD_CHILD_TIMER(profile, "ReadTime", 
prefetch_buffered_reader);
+        auto prefetch_request_io =
+                ADD_CHILD_COUNTER(profile, "PreRequestIO", TUnit::UNIT, 
prefetch_buffered_reader);
+        auto prefetch_request_bytes = ADD_CHILD_COUNTER(profile, 
"PreRequestBytes", TUnit::BYTES,
+                                                        
prefetch_buffered_reader);
+        auto request_io =
+                ADD_CHILD_COUNTER(profile, "RequestIO", TUnit::UNIT, 
prefetch_buffered_reader);
+        auto request_bytes =
+                ADD_CHILD_COUNTER(profile, "RequestBytes", TUnit::BYTES, 
prefetch_buffered_reader);
+        sync_buffer = [=](PrefetchBuffer& buf) {
+            COUNTER_UPDATE(copy_time, buf._statis.copy_time);
+            COUNTER_UPDATE(read_time, buf._statis.read_time);
+            COUNTER_UPDATE(prefetch_request_io, 
buf._statis.prefetch_request_io);
+            COUNTER_UPDATE(prefetch_request_bytes, 
buf._statis.prefetch_request_bytes);
+            COUNTER_UPDATE(request_io, buf._statis.request_io);
+            COUNTER_UPDATE(request_bytes, buf._statis.request_bytes);
+        };
+    }
     // set the _cur_offset of this reader as same as the inner reader's,
     // to make sure the buffer reader will start to read at right position.
     for (int i = 0; i < buffer_num; i++) {
-        _pre_buffers.emplace_back(
-                std::make_shared<PrefetchBuffer>(_file_range, 
s_max_pre_buffer_size,
-                                                 _whole_pre_buffer_size, 
_reader.get(), _io_ctx));
+        _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
+                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, 
_reader.get(), _io_ctx,
+                sync_buffer));
     }
 }
 
@@ -690,7 +727,8 @@ Status DelegateReader::create_file_reader(RuntimeProfile* 
profile,
         *file_reader = std::make_shared<InMemoryFileReader>(reader);
     } else if (access_mode == AccessMode::SEQUENTIAL) {
         io::FileReaderSPtr safeReader = 
std::make_shared<ThreadSafeReader>(reader);
-        *file_reader = 
std::make_shared<io::PrefetchBufferedReader>(safeReader, file_range, io_ctx);
+        *file_reader = std::make_shared<io::PrefetchBufferedReader>(profile, 
safeReader, file_range,
+                                                                    io_ctx);
     } else {
         *file_reader = std::move(reader);
     }
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 0208139ba1..f22789de8f 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -314,13 +314,15 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer> {
     enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
 
     PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t 
whole_buffer_size,
-                   io::FileReader* reader, const IOContext* io_ctx)
+                   io::FileReader* reader, const IOContext* io_ctx,
+                   std::function<void(PrefetchBuffer&)> sync_profile)
             : _file_range(file_range),
               _size(buffer_size),
               _whole_buffer_size(whole_buffer_size),
               _reader(reader),
               _io_ctx(io_ctx),
-              _buf(new char[buffer_size]) {}
+              _buf(new char[buffer_size]),
+              _sync_profile(sync_profile) {}
 
     PrefetchBuffer(PrefetchBuffer&& other)
             : _offset(other._offset),
@@ -330,7 +332,8 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer> {
               _whole_buffer_size(other._whole_buffer_size),
               _reader(other._reader),
               _io_ctx(other._io_ctx),
-              _buf(std::move(other._buf)) {}
+              _buf(std::move(other._buf)),
+              _sync_profile(std::move(other._sync_profile)) {}
 
     ~PrefetchBuffer() = default;
 
@@ -351,6 +354,16 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer> {
     std::condition_variable _prefetched;
     Status _prefetch_status {Status::OK()};
     std::atomic_bool _exceed = false;
+    std::function<void(PrefetchBuffer&)> _sync_profile;
+    struct Statistics {
+        int64_t copy_time {0};
+        int64_t read_time {0};
+        int64_t prefetch_request_io {0};
+        int64_t prefetch_request_bytes {0};
+        int64_t request_io {0};
+        int64_t request_bytes {0};
+    };
+    Statistics _statis;
 
     // @brief: reset the start offset of this buffer to offset
     // @param: the new start offset for this buffer
@@ -396,8 +409,9 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer> {
  */
 class PrefetchBufferedReader : public io::FileReader {
 public:
-    PrefetchBufferedReader(io::FileReaderSPtr reader, PrefetchRange file_range,
-                           const IOContext* io_ctx = nullptr, int64_t 
buffer_size = -1L);
+    PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+                           PrefetchRange file_range, const IOContext* io_ctx = 
nullptr,
+                           int64_t buffer_size = -1L);
     ~PrefetchBufferedReader() override;
 
     Status close() override;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 0dbc26d6ac..ab639e6f29 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -512,8 +512,11 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
         const TFileRangeDesc& range = file_scan_range.ranges.at(0);
         const TFileScanRangeParams& params = file_scan_range.params;
 
+        // make sure profile is desctructed after reader cause 
PrefetchBufferedReader
+        // might asynchronouslly access the profile
+        std::unique_ptr<RuntimeProfile> profile =
+                std::make_unique<RuntimeProfile>("FetchTableSchema");
         std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
         io::IOContext io_ctx;
         io::FileCacheStatistics file_cache_statis;
         io_ctx.file_cache_stats = &file_cache_statis;
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index 35b0cc60e1..6a281e125f 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -124,7 +124,8 @@ TEST_F(BufferedReaderTest, normal_use) {
     io::global_local_filesystem()->open_file(
             
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file", 
&local_reader);
     auto sync_local_reader = 
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
-    io::PrefetchBufferedReader reader(std::move(sync_local_reader), 
io::PrefetchRange(0, 1024));
+    io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+                                      io::PrefetchRange(0, 1024));
     uint8_t buf[1024];
     Slice result {buf, 1024};
     MonotonicStopWatch watch;
@@ -143,7 +144,8 @@ TEST_F(BufferedReaderTest, test_validity) {
             
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
             &local_reader);
     auto sync_local_reader = 
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
-    io::PrefetchBufferedReader reader(std::move(sync_local_reader), 
io::PrefetchRange(0, 1024));
+    io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+                                      io::PrefetchRange(0, 1024));
     Status st;
     uint8_t buf[10];
     Slice result {buf, 10};
@@ -192,7 +194,8 @@ TEST_F(BufferedReaderTest, test_seek) {
             
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
             &local_reader);
     auto sync_local_reader = 
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
-    io::PrefetchBufferedReader reader(std::move(sync_local_reader), 
io::PrefetchRange(0, 1024));
+    io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+                                      io::PrefetchRange(0, 1024));
 
     Status st;
     uint8_t buf[10];
@@ -238,7 +241,8 @@ TEST_F(BufferedReaderTest, test_miss) {
             
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
             &local_reader);
     auto sync_local_reader = 
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
-    io::PrefetchBufferedReader reader(std::move(sync_local_reader), 
io::PrefetchRange(0, 1024));
+    io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+                                      io::PrefetchRange(0, 1024));
     uint8_t buf[128];
     Slice result {buf, 128};
     size_t bytes_read;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to