This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this 
push:
     new 824ac04e9f2 [opt](cloud) Support prefetch parallel for single file 
(#57524)
824ac04e9f2 is described below

commit 824ac04e9f2d5052f7b7b182f63917190fe50724
Author: Gavin Chou <[email protected]>
AuthorDate: Tue Nov 4 15:13:08 2025 +0800

    [opt](cloud) Support prefetch parallel for single file (#57524)
---
 be/src/common/config.cpp                      |  7 +++
 be/src/common/config.h                        |  2 +
 be/src/io/cache/cached_remote_file_reader.cpp | 79 ++++++++++++++++++++++-----
 be/src/io/cache/cached_remote_file_reader.h   |  4 ++
 be/src/olap/like_column_predicate.h           |  7 +--
 5 files changed, 82 insertions(+), 17 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 243cd65351a..d19f6381028 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1143,6 +1143,13 @@ 
DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
 DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
 DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
 DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
+// number of prefetch parallel when the read is missed
+DEFINE_mInt32(file_cache_num_parallel_prefetch, "0");
+// if we read the tail data less than `threshold` we extend this read with 
extra
+// block of data, e.g. by default, if the read is the tail 10KB, the actual IO 
is
+// config::file_cache_each_block_size + 10KB
+// if tail read is 101KB, the actual IO is 101KB
+DEFINE_mInt64(file_cache_tail_read_extra_bytes_threshold, "102400");
 DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");
 
 DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 55591fc36c7..a17768fae6e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1180,6 +1180,8 @@ 
DECLARE_mInt64(file_cache_background_lru_dump_interval_ms);
 DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
 DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
 DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
+DECLARE_mInt32(file_cache_num_parallel_prefetch);
+DECLARE_mInt64(file_cache_tail_read_extra_bytes_threshold);
 DECLARE_mBool(enable_evaluate_shadow_queue_diff);
 
 // inverted index searcher cache
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index b89bdcf2f6d..429a1f625f9 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -36,9 +36,11 @@
 #include "io/fs/file_reader.h"
 #include "io/fs/local_file_system.h"
 #include "io/io_common.h"
+#include "runtime/exec_env.h"
 #include "util/bit_util.h"
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
+#include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris::io {
 
@@ -47,6 +49,7 @@ bvar::LatencyRecorder 
g_skip_cache_num("cached_remote_reader_skip_cache_num");
 bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
 bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
         "cached_remote_reader_skip_local_cache_io_sum_bytes");
+bvar::LatencyRecorder 
g_read_at_req_bytes("cached_remote_reader_read_at_req_bytes");
 
 CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader,
                                                const FileReaderOptions& opts)
@@ -86,6 +89,10 @@ void 
CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
 }
 
 CachedRemoteFileReader::~CachedRemoteFileReader() {
+    {
+        std::unique_lock l(_parallel_mtx);
+        _parallel_cv.wait(l, [this] { return _parallel_ref == 0; });
+    }
     static_cast<void>(close());
 }
 
@@ -94,24 +101,41 @@ Status CachedRemoteFileReader::close() {
 }
 
 std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, 
size_t read_size,
-                                                               size_t length) {
-    size_t left = offset;
-    size_t right = offset + read_size - 1;
-    size_t align_left =
-            (left / config::file_cache_each_block_size) * 
config::file_cache_each_block_size;
-    size_t align_right =
-            (right / config::file_cache_each_block_size + 1) * 
config::file_cache_each_block_size;
-    align_right = align_right < length ? align_right : length;
-    size_t align_size = align_right - align_left;
-    if (align_size < config::file_cache_each_block_size && align_left != 0) {
-        align_size += config::file_cache_each_block_size;
-        align_left -= config::file_cache_each_block_size;
+                                                               size_t 
file_length) {
+    const static size_t block_size = config::file_cache_each_block_size;
+
+    // Calculate the original read range [start, end)
+    size_t start_pos = offset;
+    size_t end_pos = offset + read_size;
+
+    // Align start position to the previous block boundary
+    size_t aligned_start = (start_pos / block_size) * block_size;
+
+    // Align end position to the next block boundary
+    size_t aligned_end = ((end_pos - 1) / block_size + 1) * block_size;
+
+    // Ensure we don't exceed file boundaries
+    aligned_end = std::min(aligned_end, file_length);
+
+    size_t aligned_size = aligned_end - aligned_start;
+
+    if (aligned_size > config::file_cache_tail_read_extra_bytes_threshold) {
+        return {aligned_start, aligned_size};
     }
-    return std::make_pair(align_left, align_size);
+
+    // Special case: if aligned size is smaller than a block and we're not at 
file start,
+    // extend backwards to include a full block
+    if (aligned_size < block_size && aligned_start > 0) {
+        aligned_start -= block_size;
+        aligned_size += block_size;
+    }
+
+    return {aligned_start, aligned_size};
 }
 
 Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
                                             const IOContext* io_ctx) {
+    g_read_at_req_bytes << result.size;
     const bool is_dryrun = io_ctx->is_dryrun;
     DCHECK(!closed());
     DCHECK(io_ctx);
@@ -183,6 +207,35 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
     }
     // read from cache or remote
     auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
+    if (config::file_cache_num_parallel_prefetch > 0 && !is_dryrun && 
_is_doris_table) {
+        auto off = align_left + align_size;
+        if (off < _remote_file_reader->size()) { // there may be more to read
+            auto ioctx = *io_ctx;
+            ioctx.is_dryrun = true;
+            ioctx.query_id = nullptr;
+            ioctx.file_cache_stats = nullptr;
+            ioctx.file_reader_stats = nullptr;
+            auto pool = 
ExecEnv::GetInstance()->scanner_scheduler()->get_remote_scan_thread_pool();
+            {
+                std::unique_lock l(_parallel_mtx);
+                _parallel_ref++;
+            }
+            auto st = pool->submit_scan_task(vectorized::SimplifiedScanTask(
+                    [ioctx, off, this] {
+                        size_t bytesread;
+                        Slice r((char*)0x0, 
config::file_cache_each_block_size);
+                        (void)read_at_impl(off, r, &bytesread, &ioctx);
+                        std::unique_lock l(_parallel_mtx);
+                        _parallel_ref--;
+                        _parallel_cv.notify_one();
+                    },
+                    nullptr));
+            if (!st.ok()) {
+                std::unique_lock l(_parallel_mtx);
+                _parallel_ref--;
+            }
+        }
+    }
     CacheContext cache_context(io_ctx);
     cache_context.stats = &stats;
     MonotonicStopWatch sw;
diff --git a/be/src/io/cache/cached_remote_file_reader.h 
b/be/src/io/cache/cached_remote_file_reader.h
index 94e8a5807ba..04d8cc69af8 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -69,6 +69,10 @@ private:
 
     void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
                        bool is_inverted_index) const;
+
+    std::atomic<int> _parallel_ref {0};
+    std::mutex _parallel_mtx;
+    std::condition_variable _parallel_cv;
 };
 
 } // namespace doris::io
diff --git a/be/src/olap/like_column_predicate.h 
b/be/src/olap/like_column_predicate.h
index de5ed6767b4..52ace5807b2 100644
--- a/be/src/olap/like_column_predicate.h
+++ b/be/src/olap/like_column_predicate.h
@@ -149,8 +149,7 @@ private:
             const vectorized::ColumnDictI32& column) const {
         std::vector<bool>* res = nullptr;
         if (_segment_id_to_cached_res_flags.if_contains(
-                    column.get_rowset_segment_id(),
-                    [&res](auto& pair) { res = &pair.second; })) {
+                    column.get_rowset_segment_id(), [&res](auto& pair) { res = 
&pair.second; })) {
             return res;
         }
 
@@ -175,8 +174,8 @@ private:
                     std::pair {column.get_rowset_segment_id(), tmp_res});
         }
 
-        _segment_id_to_cached_res_flags.if_contains(
-                column.get_rowset_segment_id(), [&res](auto& pair) { res = 
&pair.second; });
+        
_segment_id_to_cached_res_flags.if_contains(column.get_rowset_segment_id(),
+                                                    [&res](auto& pair) { res = 
&pair.second; });
         return res;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to