This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 2bc39d5c9c3 [enhance](PrefetchReader) Make the prefetch timeout one config (#27371) (#27530) 2bc39d5c9c3 is described below commit 2bc39d5c9c3c5f6aa988956edca6dc1f3c7d6b00 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Fri Dec 22 14:16:06 2023 +0800 [enhance](PrefetchReader) Make the prefetch timeout one config (#27371) (#27530) --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/io/fs/buffered_reader.cpp | 37 +++++++++++++++++++------------------ 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 40f5a2b6b66..071422b57da 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1104,6 +1104,8 @@ DEFINE_Int32(ingest_binlog_work_pool_size, "-1"); // Download binlog rate limit, unit is KB/s, 0 means no limit DEFINE_Int32(download_binlog_rate_limit_kbs, "0"); +DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000"); + DEFINE_Bool(enable_snapshot_action, "false"); // clang-format off diff --git a/be/src/common/config.h b/be/src/common/config.h index 7d7119ecfc1..b962a79075e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1160,6 +1160,8 @@ DECLARE_Int32(ingest_binlog_work_pool_size); // Download binlog rate limit, unit is KB/s DECLARE_Int32(download_binlog_rate_limit_kbs); +DECLARE_mInt32(buffered_reader_read_timeout_ms); + // whether to enable /api/snapshot api DECLARE_Bool(enable_snapshot_action); diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 18e638d6d75..339ee9a8b11 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -387,20 +387,15 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz return Status::OK(); } -// the condition variable would wait at most 10 seconds -// otherwise it would quit the procedure and treat it -// as one time out error status and would make the load -// task failed -constexpr static int WAIT_TIME_OUT_MS = 10000; - // there exists occasions where the buffer is already closed but // some prior tasks are still queued in thread pool, so we have to check whether // the buffer is closed each time the condition variable is notified. void PrefetchBuffer::reset_offset(size_t offset) { { std::unique_lock lck {_lock}; - if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), - [this]() { return _buffer_status != BufferStatus::PENDING; })) { + if (!_prefetched.wait_for( + lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), + [this]() { return _buffer_status != BufferStatus::PENDING; })) { _prefetch_status = Status::TimedOut("time out when reset prefetch buffer"); return; } @@ -427,10 +422,12 @@ void PrefetchBuffer::reset_offset(size_t offset) { void PrefetchBuffer::prefetch_buffer() { { std::unique_lock lck {_lock}; - if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() { - return _buffer_status == BufferStatus::RESET || - _buffer_status == BufferStatus::CLOSED; - })) { + if (!_prefetched.wait_for( + lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), + [this]() { + return _buffer_status == BufferStatus::RESET || + _buffer_status == BufferStatus::CLOSED; + })) { _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer"); return; } @@ -470,7 +467,8 @@ void PrefetchBuffer::prefetch_buffer() { _statis.prefetch_request_io += 1; _statis.prefetch_request_bytes += _len; std::unique_lock lck {_lock}; - if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), + if (!_prefetched.wait_for(lck, + std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), [this]() { return _buffer_status == BufferStatus::PENDING; })) { _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer"); return; @@ -555,10 +553,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, { std::unique_lock lck {_lock}; // buffer must be prefetched or it's closed - if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() { - return _buffer_status == BufferStatus::PREFETCHED || - _buffer_status == BufferStatus::CLOSED; - })) { + if (!_prefetched.wait_for( + lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), + [this]() { + return _buffer_status == BufferStatus::PREFETCHED || + _buffer_status == BufferStatus::CLOSED; + })) { _prefetch_status = Status::TimedOut("time out when read prefetch buffer"); return _prefetch_status; } @@ -594,7 +594,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, void PrefetchBuffer::close() { std::unique_lock lck {_lock}; // in case _reader still tries to write to the buf after we close the buffer - if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), + if (!_prefetched.wait_for(lck, + std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), [this]() { return _buffer_status != BufferStatus::PENDING; })) { _prefetch_status = Status::TimedOut("time out when close prefetch buffer"); return; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org