This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2bc39d5c9c3 [enhance](PrefetchReader) Make the prefetch timeout one 
config (#27371) (#27530)
2bc39d5c9c3 is described below

commit 2bc39d5c9c3c5f6aa988956edca6dc1f3c7d6b00
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Fri Dec 22 14:16:06 2023 +0800

    [enhance](PrefetchReader) Make the prefetch timeout one config (#27371) 
(#27530)
---
 be/src/common/config.cpp         |  2 ++
 be/src/common/config.h           |  2 ++
 be/src/io/fs/buffered_reader.cpp | 37 +++++++++++++++++++------------------
 3 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 40f5a2b6b66..071422b57da 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1104,6 +1104,8 @@ DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
 // Download binlog rate limit, unit is KB/s, 0 means no limit
 DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
 
+DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");
+
 DEFINE_Bool(enable_snapshot_action, "false");
 
 // clang-format off
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7d7119ecfc1..b962a79075e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1160,6 +1160,8 @@ DECLARE_Int32(ingest_binlog_work_pool_size);
 // Download binlog rate limit, unit is KB/s
 DECLARE_Int32(download_binlog_rate_limit_kbs);
 
+DECLARE_mInt32(buffered_reader_read_timeout_ms);
+
 // whether to enable /api/snapshot api
 DECLARE_Bool(enable_snapshot_action);
 
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 18e638d6d75..339ee9a8b11 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -387,20 +387,15 @@ Status MergeRangeFileReader::_fill_box(int range_index, 
size_t start_offset, siz
     return Status::OK();
 }
 
-// the condition variable would wait at most 10 seconds
-// otherwise it would quit the procedure and treat it
-// as one time out error status and would make the load
-// task failed
-constexpr static int WAIT_TIME_OUT_MS = 10000;
-
 // there exists occasions where the buffer is already closed but
 // some prior tasks are still queued in thread pool, so we have to check 
whether
 // the buffer is closed each time the condition variable is notified.
 void PrefetchBuffer::reset_offset(size_t offset) {
     {
         std::unique_lock lck {_lock};
-        if (!_prefetched.wait_for(lck, 
std::chrono::milliseconds(WAIT_TIME_OUT_MS),
-                                  [this]() { return _buffer_status != 
BufferStatus::PENDING; })) {
+        if (!_prefetched.wait_for(
+                    lck, 
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
+                    [this]() { return _buffer_status != BufferStatus::PENDING; 
})) {
             _prefetch_status = Status::TimedOut("time out when reset prefetch 
buffer");
             return;
         }
@@ -427,10 +422,12 @@ void PrefetchBuffer::reset_offset(size_t offset) {
 void PrefetchBuffer::prefetch_buffer() {
     {
         std::unique_lock lck {_lock};
-        if (!_prefetched.wait_for(lck, 
std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
-                return _buffer_status == BufferStatus::RESET ||
-                       _buffer_status == BufferStatus::CLOSED;
-            })) {
+        if (!_prefetched.wait_for(
+                    lck, 
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
+                    [this]() {
+                        return _buffer_status == BufferStatus::RESET ||
+                               _buffer_status == BufferStatus::CLOSED;
+                    })) {
             _prefetch_status = Status::TimedOut("time out when invoking 
prefetch buffer");
             return;
         }
@@ -470,7 +467,8 @@ void PrefetchBuffer::prefetch_buffer() {
     _statis.prefetch_request_io += 1;
     _statis.prefetch_request_bytes += _len;
     std::unique_lock lck {_lock};
-    if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
+    if (!_prefetched.wait_for(lck,
+                              
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
                               [this]() { return _buffer_status == 
BufferStatus::PENDING; })) {
         _prefetch_status = Status::TimedOut("time out when invoking prefetch 
buffer");
         return;
@@ -555,10 +553,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const 
char* out, size_t buf_len,
     {
         std::unique_lock lck {_lock};
         // buffer must be prefetched or it's closed
-        if (!_prefetched.wait_for(lck, 
std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
-                return _buffer_status == BufferStatus::PREFETCHED ||
-                       _buffer_status == BufferStatus::CLOSED;
-            })) {
+        if (!_prefetched.wait_for(
+                    lck, 
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
+                    [this]() {
+                        return _buffer_status == BufferStatus::PREFETCHED ||
+                               _buffer_status == BufferStatus::CLOSED;
+                    })) {
             _prefetch_status = Status::TimedOut("time out when read prefetch 
buffer");
             return _prefetch_status;
         }
@@ -594,7 +594,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* 
out, size_t buf_len,
 void PrefetchBuffer::close() {
     std::unique_lock lck {_lock};
     // in case _reader still tries to write to the buf after we close the 
buffer
-    if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
+    if (!_prefetched.wait_for(lck,
+                              
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
                               [this]() { return _buffer_status != 
BufferStatus::PENDING; })) {
         _prefetch_status = Status::TimedOut("time out when close prefetch 
buffer");
         return;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to