This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 2059fcf4f8c branch-4.1: [Opt](cloud) Add segment prefetcher #59482
#60870 (#61421)
2059fcf4f8c is described below
commit 2059fcf4f8c7952edee0a14ee7d6943393dba3c0
Author: bobhan1 <[email protected]>
AuthorDate: Wed Apr 1 10:50:52 2026 +0800
branch-4.1: [Opt](cloud) Add segment prefetcher #59482 #60870 (#61421)
pick https://github.com/apache/doris/pull/59482 and
https://github.com/apache/doris/pull/60870
---
be/src/common/config.cpp | 26 ++
be/src/common/config.h | 19 ++
be/src/exec/operator/olap_scan_operator.cpp | 2 +
be/src/exec/operator/olap_scan_operator.h | 1 +
be/src/exec/scan/scanner.cpp | 2 +
be/src/io/cache/block_file_cache.cpp | 3 +
be/src/io/cache/cached_remote_file_reader.cpp | 139 ++++++++---
be/src/io/cache/cached_remote_file_reader.h | 15 +-
be/src/io/fs/s3_file_reader.cpp | 3 +
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 9 +
be/src/service/doris_main.cpp | 4 +
be/src/storage/index/ordinal_page_index.h | 3 +-
be/src/storage/olap_common.h | 1 +
be/src/storage/segment/column_reader.cpp | 124 ++++++++++
be/src/storage/segment/column_reader.h | 40 ++++
be/src/storage/segment/page_io.cpp | 4 +
be/src/storage/segment/segment_iterator.cpp | 94 ++++++++
be/src/storage/segment/segment_iterator.h | 2 +
be/src/storage/segment/segment_prefetcher.cpp | 262 +++++++++++++++++++++
be/src/storage/segment/segment_prefetcher.h | 154 ++++++++++++
.../segment/variant/hierarchical_data_iterator.cpp | 33 +++
.../segment/variant/hierarchical_data_iterator.h | 5 +
.../segment/variant/variant_column_reader.cpp | 10 +
.../segment/variant/variant_column_reader.h | 5 +
be/src/util/concurrency_stats.cpp | 130 ++++++++++
be/src/util/concurrency_stats.h | 127 ++++++++++
27 files changed, 1182 insertions(+), 38 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6b24fbe707f..76b881c0349 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1550,6 +1550,21 @@ DEFINE_mInt64(string_overflow_size, "4294967295"); //
std::numic_limits<uint32_t
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
// The max thread num for BufferedReaderPrefetchThreadPool
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
+
+DEFINE_mBool(enable_segment_prefetch_verbose_log, "false");
+// The thread num for SegmentPrefetchThreadPool
+DEFINE_Int64(segment_prefetch_thread_pool_thread_num_min, "32");
+DEFINE_Int64(segment_prefetch_thread_pool_thread_num_max, "2000");
+
+DEFINE_mInt32(segment_file_cache_consume_rowids_batch_size, "8000");
+// Enable segment file cache block prefetch for query
+DEFINE_mBool(enable_query_segment_file_cache_prefetch, "false");
+// Number of blocks to prefetch ahead in segment iterator for query
+DEFINE_mInt32(query_segment_file_cache_prefetch_block_size, "2");
+// Enable segment file cache block prefetch for compaction
+DEFINE_mBool(enable_compaction_segment_file_cache_prefetch, "false");
+// Number of blocks to prefetch ahead in segment iterator for compaction
+DEFINE_mInt32(compaction_segment_file_cache_prefetch_block_size, "2");
// The min thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
@@ -1701,6 +1716,12 @@ DEFINE_Validator(aws_credentials_provider_version,
[](const std::string& config)
return config == "v1" || config == "v2";
});
+// Concurrency stats dump configuration
+DEFINE_mBool(enable_concurrency_stats_dump, "false");
+DEFINE_mInt32(concurrency_stats_dump_interval_ms, "100");
+DEFINE_Validator(concurrency_stats_dump_interval_ms,
+ [](const int32_t config) -> bool { return config >= 10; });
+
// clang-format off
#ifdef BE_TEST
// test s3
@@ -2170,6 +2191,11 @@ Status set_fuzzy_configs() {
fuzzy_field_and_value["segments_key_bounds_truncation_threshold"] =
std::to_string(distribution2(*generator));
+ fuzzy_field_and_value["enable_query_segment_file_cache_prefetch"] =
+ ((distribution(*generator) % 2) == 0) ? "true" : "false";
+ fuzzy_field_and_value["enable_compaction_segment_file_cache_prefetch"] =
+ ((distribution(*generator) % 2) == 0) ? "true" : "false";
+
// external
if (config::fuzzy_test_type == "external") {
std::uniform_int_distribution<int64_t> distribution3(0, 2);
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ab36e426f81..6197d08ebe1 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1612,6 +1612,21 @@ DECLARE_mInt64(string_overflow_size);
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
// The max thread num for BufferedReaderPrefetchThreadPool
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
+
+DECLARE_mBool(enable_segment_prefetch_verbose_log);
+// The thread num for SegmentPrefetchThreadPool
+DECLARE_Int64(segment_prefetch_thread_pool_thread_num_min);
+DECLARE_Int64(segment_prefetch_thread_pool_thread_num_max);
+
+DECLARE_mInt32(segment_file_cache_consume_rowids_batch_size);
+// Enable segment file cache block prefetch for query
+DECLARE_mBool(enable_query_segment_file_cache_prefetch);
+// Number of blocks to prefetch ahead in segment iterator for query
+DECLARE_mInt32(query_segment_file_cache_prefetch_block_size);
+// Enable segment file cache block prefetch for compaction
+DECLARE_mBool(enable_compaction_segment_file_cache_prefetch);
+// Number of blocks to prefetch ahead in segment iterator for compaction
+DECLARE_mInt32(compaction_segment_file_cache_prefetch_block_size);
// The min thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
@@ -1756,6 +1771,10 @@ DECLARE_mBool(read_cluster_cache_opt_verbose_log);
DECLARE_mString(aws_credentials_provider_version);
+// Concurrency stats dump configuration
+DECLARE_mBool(enable_concurrency_stats_dump);
+DECLARE_mInt32(concurrency_stats_dump_interval_ms);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/exec/operator/olap_scan_operator.cpp
b/be/src/exec/operator/olap_scan_operator.cpp
index 17ac84e3554..5ad3e99ff1d 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -305,6 +305,8 @@ Status OlapScanLocalState::_init_profile() {
ADD_TIMER(_scanner_profile,
"SegmentIteratorInitReturnColumnIteratorsTimer");
_segment_iterator_init_index_iterators_timer =
ADD_TIMER(_scanner_profile,
"SegmentIteratorInitIndexIteratorsTimer");
+ _segment_iterator_init_segment_prefetchers_timer =
+ ADD_TIMER(_scanner_profile,
"SegmentIteratorInitSegmentPrefetchersTimer");
_segment_create_column_readers_timer =
ADD_TIMER(_scanner_profile, "SegmentCreateColumnReadersTimer");
diff --git a/be/src/exec/operator/olap_scan_operator.h
b/be/src/exec/operator/olap_scan_operator.h
index c3b4e8838cf..f22c7f5ccad 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -286,6 +286,7 @@ private:
RuntimeProfile::Counter* _segment_iterator_init_timer = nullptr;
RuntimeProfile::Counter*
_segment_iterator_init_return_column_iterators_timer = nullptr;
RuntimeProfile::Counter* _segment_iterator_init_index_iterators_timer =
nullptr;
+ RuntimeProfile::Counter* _segment_iterator_init_segment_prefetchers_timer
= nullptr;
RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr;
RuntimeProfile::Counter* _segment_load_index_timer = nullptr;
diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp
index 521b7cd317f..a98c67d0fc8 100644
--- a/be/src/exec/scan/scanner.cpp
+++ b/be/src/exec/scan/scanner.cpp
@@ -28,6 +28,7 @@
#include "exprs/vexpr_context.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_profile.h"
+#include "util/concurrency_stats.h"
#include "util/defer_op.h"
namespace doris {
@@ -77,6 +78,7 @@ Status Scanner::init(RuntimeState* state, const
VExprContextSPtrs& conjuncts) {
}
Status Scanner::get_block_after_projects(RuntimeState* state, Block* block,
bool* eos) {
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block);
auto& row_descriptor = _local_state->_parent->row_descriptor();
if (_output_row_descriptor) {
if (_alreay_eos) {
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index f6f641a0a25..afbd20a4816 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -51,6 +51,7 @@
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "runtime/runtime_profile.h"
+#include "util/concurrency_stats.h"
#include "util/stack_util.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
@@ -823,7 +824,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
const bool async_touch_on_get_or_set =
config::enable_file_cache_async_touch_on_get_or_set;
int64_t duration = 0;
{
+
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
std::lock_guard cache_lock(_mutex);
+
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
stats->lock_wait_timer += sw.elapsed_time();
SCOPED_RAW_TIMER(&duration);
/// Get all blocks which intersect with the given range.
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index 8e4efaed97d..da4758e8b99 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -59,6 +59,7 @@
#include "util/bit_util.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/client_cache.h"
+#include "util/concurrency_stats.h"
#include "util/debug_points.h"
namespace doris::io {
@@ -285,6 +286,8 @@ Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockS
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
const IOContext* io_ctx) {
size_t already_read = 0;
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
+
const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
DCHECK(io_ctx);
@@ -390,8 +393,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
cache_context.tablet_id = tablet_id.value_or(0);
MonotonicStopWatch sw;
sw.start();
+
+
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment();
FileBlocksHolder holder =
_cache->get_or_set(_cache_hash, align_left, align_size,
cache_context);
+
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement();
+
stats.cache_get_or_set_timer += sw.elapsed_time();
std::vector<FileBlockSPtr> empty_blocks;
for (auto& block : holder.file_blocks) {
@@ -445,23 +452,28 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
RETURN_IF_ERROR(
_execute_remote_read(empty_blocks, empty_start, size, buffer,
stats, io_ctx));
- for (auto& block : empty_blocks) {
- if (block->state() == FileBlock::State::SKIP_CACHE) {
- continue;
- }
- SCOPED_RAW_TIMER(&stats.local_write_timer);
- char* cur_ptr = buffer.get() + block->range().left - empty_start;
- size_t block_size = block->range().size();
- Status st = block->append(Slice(cur_ptr, block_size));
- if (st.ok()) {
- st = block->finalize();
- }
- if (!st.ok()) {
- LOG_EVERY_N(WARNING, 100) << "Write data to file cache failed.
err=" << st.msg();
- } else {
- _insert_file_reader(block);
+ {
+ SCOPED_CONCURRENCY_COUNT(
+
ConcurrencyStatsManager::instance().cached_remote_reader_write_back);
+ for (auto& block : empty_blocks) {
+ if (block->state() == FileBlock::State::SKIP_CACHE) {
+ continue;
+ }
+ SCOPED_RAW_TIMER(&stats.local_write_timer);
+ char* cur_ptr = buffer.get() + block->range().left -
empty_start;
+ size_t block_size = block->range().size();
+ Status st = block->append(Slice(cur_ptr, block_size));
+ if (st.ok()) {
+ st = block->finalize();
+ }
+ if (!st.ok()) {
+ LOG_EVERY_N(WARNING, 100)
+ << "Write data to file cache failed. err=" <<
st.msg();
+ } else {
+ _insert_file_reader(block);
+ }
+ stats.bytes_write_into_file_cache += block_size;
}
- stats.bytes_write_into_file_cache += block_size;
}
// copy from memory directly
size_t right_offset = offset + bytes_req - 1;
@@ -501,6 +513,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
static int64_t max_wait_time = 10;
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time",
&max_wait_time);
if (block_state != FileBlock::State::DOWNLOADED) {
+ SCOPED_CONCURRENCY_COUNT(
+
ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
do {
SCOPED_RAW_TIMER(&stats.remote_wait_timer);
SCOPED_RAW_TIMER(&stats.remote_read_timer);
@@ -527,33 +541,42 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
} else {
size_t file_offset = current_offset - left;
SCOPED_RAW_TIMER(&stats.local_read_timer);
+ SCOPED_CONCURRENCY_COUNT(
+
ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
st = block->read(Slice(result.data + (current_offset -
offset), read_size),
file_offset);
indirect_read_bytes += read_size;
}
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
- if (block_state == FileBlock::State::DOWNLOADED &&
st.is<ErrorCode::NOT_FOUND>()) {
- need_self_heal = true;
- g_read_cache_self_heal_on_not_found << 1;
- LOG_EVERY_N(WARNING, 100)
- << "Cache block file is missing, will self-heal by
clearing cache "
- "hash. "
- << "path=" << path().native() << ", hash=" <<
_cache_hash.to_string()
- << ", offset=" << left << ", err=" << st.msg();
+ if (is_dryrun) [[unlikely]] {
+ // dryrun mode uses a null buffer, skip actual remote IO
+ } else {
+ if (block_state == FileBlock::State::DOWNLOADED &&
+ st.is<ErrorCode::NOT_FOUND>()) {
+ need_self_heal = true;
+ g_read_cache_self_heal_on_not_found << 1;
+ LOG_EVERY_N(WARNING, 100)
+ << "Cache block file is missing, will
self-heal by clearing cache "
+ "hash. "
+ << "path=" << path().native()
+ << ", hash=" << _cache_hash.to_string() << ",
offset=" << left
+ << ", err=" << st.msg();
+ }
+ LOG(WARNING) << "Read data failed from file cache
downloaded by others. err="
+ << st.msg() << ", block state=" <<
block_state;
+ size_t bytes_read {0};
+ stats.hit_cache = false;
+ stats.from_peer_cache = false;
+ s3_read_counter << 1;
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+ RETURN_IF_ERROR(_remote_file_reader->read_at(
+ current_offset,
+ Slice(result.data + (current_offset - offset),
read_size),
+ &bytes_read));
+ indirect_read_bytes += read_size;
+ DCHECK(bytes_read == read_size);
}
- LOG(WARNING) << "Read data failed from file cache downloaded
by others. err="
- << st.msg() << ", block state=" << block_state;
- size_t bytes_read {0};
- stats.hit_cache = false;
- stats.from_peer_cache = false;
- s3_read_counter << 1;
- SCOPED_RAW_TIMER(&stats.remote_read_timer);
- RETURN_IF_ERROR(_remote_file_reader->read_at(
- current_offset, Slice(result.data + (current_offset -
offset), read_size),
- &bytes_read));
- indirect_read_bytes += read_size;
- DCHECK(bytes_read == read_size);
}
}
*bytes_read += read_size;
@@ -622,4 +645,48 @@ void CachedRemoteFileReader::_update_stats(const
ReadStatistics& read_stats,
g_skip_cache_sum << read_stats.skip_cache;
}
+void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const
IOContext* io_ctx) {
+ if (offset >= this->size() || size == 0) {
+ return;
+ }
+
+ size = std::min(size, this->size() - offset);
+
+ ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool();
+ if (pool == nullptr) {
+ return;
+ }
+
+ IOContext dryrun_ctx;
+ if (io_ctx != nullptr) {
+ dryrun_ctx = *io_ctx;
+ }
+ dryrun_ctx.is_dryrun = true;
+ dryrun_ctx.query_id = nullptr;
+ dryrun_ctx.file_cache_stats = nullptr;
+ dryrun_ctx.file_reader_stats = nullptr;
+
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
+ << fmt::format("[verbose] Submitting prefetch task for offset={}
size={}, file={}",
+ offset, size, path().filename().native());
+ std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this();
+ auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() {
+ auto self = weak_this.lock();
+ if (self == nullptr) {
+ return;
+ }
+ size_t bytes_read;
+ Slice dummy_buffer((char*)nullptr, size);
+ (void)self->read_at_impl(offset, dummy_buffer, &bytes_read,
&dryrun_ctx);
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
+ << fmt::format("[verbose] Prefetch task completed for
offset={} size={}, file={}",
+ offset, size, self->path().filename().native());
+ });
+
+ if (!st.ok()) {
+ VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset
<< " size=" << size
+ << " error=" << st.to_string();
+ }
+}
+
} // namespace doris::io
diff --git a/be/src/io/cache/cached_remote_file_reader.h
b/be/src/io/cache/cached_remote_file_reader.h
index 20c1a47ce88..a0037d42c64 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -37,7 +37,8 @@ namespace doris::io {
struct IOContext;
struct FileCacheStatistics;
-class CachedRemoteFileReader final : public FileReader {
+class CachedRemoteFileReader final : public FileReader,
+ public
std::enable_shared_from_this<CachedRemoteFileReader> {
public:
CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
FileReaderOptions& opts);
@@ -57,6 +58,18 @@ public:
int64_t mtime() const override { return _remote_file_reader->mtime(); }
+ // Asynchronously prefetch a range of file cache blocks.
+ // This method triggers read file cache in dryrun mode to warm up the cache
+ // without actually reading the data into user buffers.
+ //
+ // Parameters:
+ // offset: Starting offset in the file
+ // size: Number of bytes to prefetch
+ // io_ctx: IO context (can be nullptr, will create a dryrun context
internally)
+ //
+ // Note: This is a best-effort operation. Errors are logged but not
returned.
+ void prefetch_range(size_t offset, size_t size, const IOContext* io_ctx =
nullptr);
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index d601818de01..70e34ced6d2 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -41,6 +41,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
+#include "util/concurrency_stats.h"
#include "util/debug_points.h"
#include "util/s3_util.h"
@@ -128,6 +129,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
return Status::InternalError("init s3 client error");
}
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read);
+
int retry_count = 0;
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait
time in milliseconds
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum
wait time in milliseconds
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f5afa76aa9f..858f63336ac 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -261,6 +261,7 @@ public:
ThreadPool* non_block_close_thread_pool();
ThreadPool* s3_file_system_thread_pool() { return
_s3_file_system_thread_pool.get(); }
ThreadPool* udf_close_workers_pool() { return
_udf_close_workers_thread_pool.get(); }
+ ThreadPool* segment_prefetch_thread_pool() { return
_segment_prefetch_thread_pool.get(); }
void init_file_cache_factory(std::vector<doris::CachePath>& cache_paths);
io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
@@ -486,6 +487,8 @@ private:
std::unique_ptr<ThreadPool> _s3_file_system_thread_pool;
// for java-udf to close
std::unique_ptr<ThreadPool> _udf_close_workers_thread_pool;
+ // Threadpool used to prefetch segment file cache blocks
+ std::unique_ptr<ThreadPool> _segment_prefetch_thread_pool;
FragmentMgr* _fragment_mgr = nullptr;
WorkloadGroupMgr* _workload_group_manager = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d914ba1d73e..0c081a9d9d8 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -266,6 +266,13 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_threads(cast_set<int>(buffered_reader_max_threads))
.build(&_buffered_reader_prefetch_thread_pool));
+ static_cast<void>(ThreadPoolBuilder("SegmentPrefetchThreadPool")
+ .set_min_threads(cast_set<int>(
+
config::segment_prefetch_thread_pool_thread_num_min))
+ .set_max_threads(cast_set<int>(
+
config::segment_prefetch_thread_pool_thread_num_max))
+ .build(&_segment_prefetch_thread_pool));
+
static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool")
.set_min_threads(8)
.set_max_threads(32)
@@ -839,6 +846,7 @@ void ExecEnv::destroy() {
_runtime_query_statistics_mgr->stop_report_thread();
}
SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
+ SAFE_SHUTDOWN(_segment_prefetch_thread_pool);
SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
SAFE_SHUTDOWN(_lazy_release_obj_pool);
SAFE_SHUTDOWN(_non_block_close_thread_pool);
@@ -900,6 +908,7 @@ void ExecEnv::destroy() {
_s3_file_system_thread_pool.reset(nullptr);
_send_table_stats_thread_pool.reset(nullptr);
_buffered_reader_prefetch_thread_pool.reset(nullptr);
+ _segment_prefetch_thread_pool.reset(nullptr);
_s3_file_upload_thread_pool.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
_udf_close_workers_thread_pool.reset(nullptr);
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 5c8db2597e7..8a18071caec 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -51,6 +51,7 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "storage/tablet/tablet_schema_cache.h"
#include "storage/utils.h"
+#include "util/concurrency_stats.h"
#include "util/jni-util.h"
#if defined(LEAK_SANITIZER)
@@ -533,6 +534,9 @@ int main(int argc, char** argv) {
return 0;
}
+ // Start concurrency stats manager
+ doris::ConcurrencyStatsManager::instance().start();
+
// begin to start services
doris::ThriftRpcHelper::setup(exec_env);
// 1. thrift server with be_port
diff --git a/be/src/storage/index/ordinal_page_index.h
b/be/src/storage/index/ordinal_page_index.h
index 29cdd699d53..f17d80c20ed 100644
--- a/be/src/storage/index/ordinal_page_index.h
+++ b/be/src/storage/index/ordinal_page_index.h
@@ -98,7 +98,8 @@ private:
OlapReaderStatistics* index_load_stats);
private:
- friend OrdinalPageIndexIterator;
+ friend class OrdinalPageIndexIterator;
+ friend class SegmentPrefetcher;
io::FileReaderSPtr _file_reader;
DorisCallOnce<Status> _load_once;
diff --git a/be/src/storage/olap_common.h b/be/src/storage/olap_common.h
index f0cd3d75a28..ac8634e16ab 100644
--- a/be/src/storage/olap_common.h
+++ b/be/src/storage/olap_common.h
@@ -435,6 +435,7 @@ struct OlapReaderStatistics {
int64_t segment_iterator_init_timer_ns = 0;
int64_t segment_iterator_init_return_column_iterators_timer_ns = 0;
int64_t segment_iterator_init_index_iterators_timer_ns = 0;
+ int64_t segment_iterator_init_segment_prefetchers_timer_ns = 0;
int64_t segment_create_column_readers_timer_ns = 0;
int64_t segment_load_index_timer_ns = 0;
diff --git a/be/src/storage/segment/column_reader.cpp
b/be/src/storage/segment/column_reader.cpp
index 3908b8019f3..a5a98675e48 100644
--- a/be/src/storage/segment/column_reader.cpp
+++ b/be/src/storage/segment/column_reader.cpp
@@ -70,11 +70,13 @@
#include "storage/segment/page_pointer.h" // for PagePointer
#include "storage/segment/row_ranges.h"
#include "storage/segment/segment.h"
+#include "storage/segment/segment_prefetcher.h"
#include "storage/segment/variant/variant_column_reader.h"
#include "storage/tablet/tablet_schema.h"
#include "storage/types.h" // for TypeInfo
#include "util/bitmap.h"
#include "util/block_compression.h"
+#include "util/concurrency_stats.h"
#include "util/rle_encoding.h" // for RleDecoder
#include "util/slice.h"
@@ -413,6 +415,7 @@ Status ColumnReader::new_index_iterator(const
std::shared_ptr<IndexFileReader>&
Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const
PagePointer& pp,
PageHandle* handle, Slice* page_body,
PageFooterPB* footer,
BlockCompressionCodec* codec, bool
is_dict_page) const {
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().column_reader_read_page);
iter_opts.sanity_check();
PageReadOptions opts(iter_opts.io_ctx);
opts.verify_checksum = _opts.verify_checksum;
@@ -704,6 +707,16 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal,
OrdinalPageIndexIterat
return Status::OK();
}
+Status ColumnReader::get_ordinal_index_reader(OrdinalIndexReader*& reader,
+ OlapReaderStatistics*
index_load_stats) {
+ CHECK(_ordinal_index) << fmt::format("ordinal index is null for column
reader of type {}",
+ std::to_string(int(_meta_type)));
+ RETURN_IF_ERROR(
+ _ordinal_index->load(_use_index_page_cache, _opts.kept_in_memory,
index_load_stats));
+ reader = _ordinal_index.get();
+ return Status::OK();
+}
+
Status ColumnReader::new_iterator(ColumnIteratorUPtr* iterator, const
TabletColumn* tablet_column) {
return new_iterator(iterator, tablet_column, nullptr);
}
@@ -909,6 +922,29 @@ Status MapFileColumnIterator::seek_to_ordinal(ordinal_t
ord) {
return Status::OK();
}
+Status MapFileColumnIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ RETURN_IF_ERROR(_offsets_iterator->init_prefetcher(params));
+ if (_map_reader->is_nullable()) {
+ RETURN_IF_ERROR(_null_iterator->init_prefetcher(params));
+ }
+ RETURN_IF_ERROR(_key_iterator->init_prefetcher(params));
+ RETURN_IF_ERROR(_val_iterator->init_prefetcher(params));
+ return Status::OK();
+}
+
+void MapFileColumnIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ _offsets_iterator->collect_prefetchers(prefetchers, init_method);
+ if (_map_reader->is_nullable()) {
+ _null_iterator->collect_prefetchers(prefetchers, init_method);
+ }
+ // the actual data pages to read of key/value column depends on the read
result of offset column,
+ // so we can't init prefetch blocks according to rowids, just prefetch all
data blocks here.
+ _key_iterator->collect_prefetchers(prefetchers,
PrefetcherInitMethod::ALL_DATA_BLOCKS);
+ _val_iterator->collect_prefetchers(prefetchers,
PrefetcherInitMethod::ALL_DATA_BLOCKS);
+}
+
Status MapFileColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst,
bool* has_null) {
if (_reading_flag == ReadingFlag::SKIP_READING) {
DLOG(INFO) << "Map column iterator column " << _column_name << " skip
reading.";
@@ -1297,6 +1333,27 @@ Status
StructFileColumnIterator::seek_to_ordinal(ordinal_t ord) {
return Status::OK();
}
+Status StructFileColumnIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ for (auto& column_iterator : _sub_column_iterators) {
+ RETURN_IF_ERROR(column_iterator->init_prefetcher(params));
+ }
+ if (_struct_reader->is_nullable()) {
+ RETURN_IF_ERROR(_null_iterator->init_prefetcher(params));
+ }
+ return Status::OK();
+}
+
+void StructFileColumnIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ for (auto& column_iterator : _sub_column_iterators) {
+ column_iterator->collect_prefetchers(prefetchers, init_method);
+ }
+ if (_struct_reader->is_nullable()) {
+ _null_iterator->collect_prefetchers(prefetchers, init_method);
+ }
+}
+
Status StructFileColumnIterator::read_by_rowids(const rowid_t* rowids, const
size_t count,
MutableColumnPtr& dst) {
if (_reading_flag == ReadingFlag::SKIP_READING) {
@@ -1441,6 +1498,16 @@ Status
OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) {
return Status::OK();
}
+Status OffsetFileColumnIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ return _offset_iterator->init_prefetcher(params);
+}
+
+void OffsetFileColumnIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ _offset_iterator->collect_prefetchers(prefetchers, init_method);
+}
+
/**
* first_storage_offset read from page should smaller than
next_storage_offset which here call _peek_one_offset from page,
and first_column_offset is keep in memory data which is different
dimension with (first_storage_offset and next_storage_offset)
@@ -1570,6 +1637,27 @@ Status ArrayFileColumnIterator::next_batch(size_t* n,
MutableColumnPtr& dst, boo
return Status::OK();
}
+Status ArrayFileColumnIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ RETURN_IF_ERROR(_offset_iterator->init_prefetcher(params));
+ RETURN_IF_ERROR(_item_iterator->init_prefetcher(params));
+ if (_array_reader->is_nullable()) {
+ RETURN_IF_ERROR(_null_iterator->init_prefetcher(params));
+ }
+ return Status::OK();
+}
+
+void ArrayFileColumnIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ _offset_iterator->collect_prefetchers(prefetchers, init_method);
+ // the actual data pages to read of item column depends on the read result
of offset column,
+ // so we can't init prefetch blocks according to rowids, just prefetch all
data blocks here.
+ _item_iterator->collect_prefetchers(prefetchers,
PrefetcherInitMethod::ALL_DATA_BLOCKS);
+ if (_array_reader->is_nullable()) {
+ _null_iterator->collect_prefetchers(prefetchers, init_method);
+ }
+}
+
Status ArrayFileColumnIterator::read_by_rowids(const rowid_t* rowids, const
size_t count,
MutableColumnPtr& dst) {
if (_reading_flag == ReadingFlag::SKIP_READING) {
@@ -1679,12 +1767,28 @@ Status FileColumnIterator::init(const
ColumnIteratorOptions& opts) {
FileColumnIterator::~FileColumnIterator() = default;
+void FileColumnIterator::_trigger_prefetch_if_eligible(ordinal_t ord) {
+ std::vector<BlockRange> ranges;
+ if (_prefetcher->need_prefetch(cast_set<uint32_t>(ord), &ranges)) {
+ for (const auto& range : ranges) {
+ _cached_remote_file_reader->prefetch_range(range.offset,
range.size, &_opts.io_ctx);
+ }
+ }
+}
+
Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) {
if (_reading_flag == ReadingFlag::SKIP_READING) {
DLOG(INFO) << "File column iterator column " << _column_name << " skip
reading.";
return Status::OK();
}
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] FileColumnIterator::seek_to_ordinal seek to ordinal {},
enable_prefetch={}",
+ ord, _enable_prefetch);
+ if (_enable_prefetch) {
+ _trigger_prefetch_if_eligible(ord);
+ }
+
// if current page contains this row, we don't need to seek
if (!_page || !_page.contains(ord) || !_page_iter.valid()) {
RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts));
@@ -1987,6 +2091,26 @@ Status FileColumnIterator::get_row_ranges_by_dict(const
AndBlockColumnPredicate*
return Status::OK();
}
+Status FileColumnIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ if (_cached_remote_file_reader =
+
std::dynamic_pointer_cast<io::CachedRemoteFileReader>(_reader->_file_reader);
+ !_cached_remote_file_reader) {
+ return Status::OK();
+ }
+ _enable_prefetch = true;
+ _prefetcher = std::make_unique<SegmentPrefetcher>(params.config);
+ RETURN_IF_ERROR(_prefetcher->init(_reader, params.read_options));
+ return Status::OK();
+}
+
+void FileColumnIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ if (_prefetcher) {
+ prefetchers[init_method].emplace_back(_prefetcher.get());
+ }
+}
+
Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) {
_opts = opts;
// be consistent with segment v1
diff --git a/be/src/storage/segment/column_reader.h
b/be/src/storage/segment/column_reader.h
index 056049bb6c8..f51f98504df 100644
--- a/be/src/storage/segment/column_reader.h
+++ b/be/src/storage/segment/column_reader.h
@@ -48,6 +48,7 @@
#include "storage/segment/page_handle.h" // for PageHandle
#include "storage/segment/page_pointer.h"
#include "storage/segment/parsed_page.h" // for ParsedPage
+#include "storage/segment/segment_prefetcher.h"
#include "storage/segment/stream_reader.h"
#include "storage/tablet/tablet_schema.h"
#include "storage/types.h"
@@ -171,6 +172,8 @@ public:
Status seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter,
const ColumnIteratorOptions& iter_opts);
+ Status get_ordinal_index_reader(OrdinalIndexReader*& reader,
+ OlapReaderStatistics* index_load_stats);
// read a page from file into a page handle
Status read_page(const ColumnIteratorOptions& iter_opts, const
PagePointer& pp,
@@ -237,6 +240,8 @@ public:
private:
friend class VariantColumnReader;
+ friend class FileColumnIterator;
+ friend class SegmentPrefetcher;
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows,
io::FileReaderSPtr file_reader);
@@ -401,6 +406,12 @@ public:
virtual void remove_pruned_sub_iterators() {};
+ virtual Status init_prefetcher(const SegmentPrefetchParams& params) {
return Status::OK(); }
+
+ virtual void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {}
+
protected:
Result<TColumnAccessPaths> _get_sub_access_paths(const TColumnAccessPaths&
access_paths);
ColumnIteratorOptions _opts;
@@ -451,11 +462,17 @@ public:
bool is_all_dict_encoding() const override { return _is_all_dict_encoding;
}
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
+
private:
Status _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page)
const;
Status _load_next_page(bool* eos);
Status _read_data_page(const OrdinalPageIndexIterator& iter);
Status _read_dict_data();
+ void _trigger_prefetch_if_eligible(ordinal_t ord);
std::shared_ptr<ColumnReader> _reader = nullptr;
@@ -483,6 +500,10 @@ private:
bool _is_all_dict_encoding = false;
std::unique_ptr<StringRef[]> _dict_word_info;
+
+ bool _enable_prefetch {false};
+ std::unique_ptr<SegmentPrefetcher> _prefetcher;
+ std::shared_ptr<io::CachedRemoteFileReader> _cached_remote_file_reader
{nullptr};
};
class EmptyFileColumnIterator final : public ColumnIterator {
@@ -520,6 +541,11 @@ public:
return _offset_iterator->read_by_rowids(rowids, count, dst);
}
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
+
private:
std::unique_ptr<FileColumnIterator> _offset_iterator;
// reuse a tiny column for peek to avoid frequent allocations
@@ -549,6 +575,10 @@ public:
ordinal_t get_current_ordinal() const override {
return _offsets_iterator->get_current_ordinal();
}
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths)
override;
@@ -593,6 +623,11 @@ public:
void remove_pruned_sub_iterators() override;
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
+
private:
std::shared_ptr<ColumnReader> _struct_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
@@ -627,6 +662,11 @@ public:
void remove_pruned_sub_iterators() override;
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
+
private:
std::shared_ptr<ColumnReader> _array_reader = nullptr;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
diff --git a/be/src/storage/segment/page_io.cpp
b/be/src/storage/segment/page_io.cpp
index 49ac6ebccc2..f8dec23998a 100644
--- a/be/src/storage/segment/page_io.cpp
+++ b/be/src/storage/segment/page_io.cpp
@@ -43,6 +43,7 @@
#include "storage/segment/page_handle.h"
#include "util/block_compression.h"
#include "util/coding.h"
+#include "util/concurrency_stats.h"
#include "util/faststring.h"
namespace doris {
@@ -206,6 +207,7 @@ Status PageIO::read_and_decompress_page_(const
PageReadOptions& opts, PageHandle
"Bad page: page is compressed but codec is NO_COMPRESSION,
file={}",
opts.file_reader->path().native());
}
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
std::unique_ptr<DataPage> decompressed_page =
std::make_unique<DataPage>(
footer->uncompressed_size() + footer_size + 4,
opts.use_page_cache, opts.type);
@@ -240,6 +242,7 @@ Status PageIO::read_and_decompress_page_(const
PageReadOptions& opts, PageHandle
if (encoding_info) {
auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
if (pre_decoder) {
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
RETURN_IF_ERROR(pre_decoder->decode(
&page, &page_slice,
footer->data_page_footer().nullmap_size() +
footer_size + 4,
@@ -255,6 +258,7 @@ Status PageIO::read_and_decompress_page_(const
PageReadOptions& opts, PageHandle
// just before add it to pagecache, it will be consistency with reading
data from page cache.
opts.stats->uncompressed_bytes_read += body->size;
if (opts.use_page_cache && cache) {
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
// insert this page into cache and return the cache handle
cache->insert(cache_key, page.get(), &cache_handle, opts.type,
opts.kept_in_memory);
*handle = PageHandle(std::move(cache_handle));
diff --git a/be/src/storage/segment/segment_iterator.cpp
b/be/src/storage/segment/segment_iterator.cpp
index 3b2adc5f68c..512e1a466fe 100644
--- a/be/src/storage/segment/segment_iterator.cpp
+++ b/be/src/storage/segment/segment_iterator.cpp
@@ -33,6 +33,7 @@
#include <utility>
#include <vector>
+#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/consts.h"
@@ -97,11 +98,13 @@
#include "storage/segment/condition_cache.h"
#include "storage/segment/row_ranges.h"
#include "storage/segment/segment.h"
+#include "storage/segment/segment_prefetcher.h"
#include "storage/segment/variant/variant_column_reader.h"
#include "storage/segment/virtual_column_iterator.h"
#include "storage/tablet/tablet_schema.h"
#include "storage/types.h"
#include "storage/utils.h"
+#include "util/concurrency_stats.h"
#include "util/defer_op.h"
#include "util/simd/bits.h"
@@ -545,9 +548,93 @@ Status SegmentIterator::_lazy_init(Block* block) {
}
_lazy_inited = true;
+
+ _init_segment_prefetchers();
+
return Status::OK();
}
+void SegmentIterator::_init_segment_prefetchers() {
+
SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns);
+ if (!config::is_cloud_mode()) {
+ return;
+ }
+ static std::vector<ReaderType> supported_reader_types {
+ ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION,
+ ReaderType::READER_CUMULATIVE_COMPACTION,
ReaderType::READER_FULL_COMPACTION};
+ if (std::ranges::none_of(supported_reader_types,
+ [&](ReaderType t) { return
_opts.io_ctx.reader_type == t; })) {
+ return;
+ }
+ // Initialize segment prefetcher for predicate and non-predicate columns
+ bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY);
+ bool enable_prefetch = is_query ?
config::enable_query_segment_file_cache_prefetch
+ :
config::enable_compaction_segment_file_cache_prefetch;
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] SegmentIterator _init_segment_prefetchers, is_query={},
enable_prefetch={}, "
+ "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={},
rowset={}, "
+ "segment={}, predicate_column_ids={}, non_predicate_column_ids={}",
+ is_query, enable_prefetch, _row_bitmap.isEmpty(),
_row_bitmap.cardinality(),
+ _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(),
+ fmt::join(_predicate_column_ids, ","),
fmt::join(_non_predicate_column_ids, ","));
+ if (enable_prefetch && !_row_bitmap.isEmpty()) {
+ int window_size =
+ 1 + (is_query ?
config::query_segment_file_cache_prefetch_block_size
+ :
config::compaction_segment_file_cache_prefetch_block_size);
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) <<
fmt::format(
+ "[verbose] SegmentIterator prefetch config: window_size={}",
window_size);
+ if (window_size > 0 &&
+ !_column_iterators.empty()) { // ensure init_iterators has been
called
+ SegmentPrefetcherConfig prefetch_config(window_size,
+
config::file_cache_each_block_size);
+ for (auto cid : _schema->column_ids()) {
+ auto& column_iter = _column_iterators[cid];
+ if (column_iter == nullptr) {
+ continue;
+ }
+ const auto* tablet_column = _schema->column(cid);
+ SegmentPrefetchParams params {
+ .config = prefetch_config,
+ .read_options = _opts,
+ };
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) <<
fmt::format(
+ "[verbose] SegmentIterator init_segment_prefetchers, "
+ "tablet={}, rowset={}, segment={}, column_id={},
col_name={}, type={}",
+ _opts.tablet_id, _opts.rowset_id.to_string(),
segment_id(), cid,
+ tablet_column->name(), tablet_column->type());
+ Status st = column_iter->init_prefetcher(params);
+ if (!st.ok()) {
+ LOG_IF(WARNING,
config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] failed to init prefetcher for
column_id={}, "
+ "tablet={}, rowset={}, segment={}, error={}",
+ cid, _opts.tablet_id, _opts.rowset_id.to_string(),
segment_id(),
+ st.to_string());
+ }
+ }
+
+ // for compaction, it's guaranteed that all rows are read, so we
can prefetch all data blocks
+ PrefetcherInitMethod init_method = (is_query &&
_row_bitmap.cardinality() < num_rows())
+ ?
PrefetcherInitMethod::FROM_ROWIDS
+ :
PrefetcherInitMethod::ALL_DATA_BLOCKS;
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>
prefetchers;
+ for (const auto& column_iter : _column_iterators) {
+ if (column_iter != nullptr) {
+ column_iter->collect_prefetchers(prefetchers, init_method);
+ }
+ }
+ for (auto& [method, prefetcher_vec] : prefetchers) {
+ if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) {
+ for (auto* prefetcher : prefetcher_vec) {
+ prefetcher->build_all_data_blocks();
+ }
+ } else if (method == PrefetcherInitMethod::FROM_ROWIDS &&
!prefetcher_vec.empty()) {
+ SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap,
prefetcher_vec);
+ }
+ }
+ }
+ }
+}
+
Status SegmentIterator::_get_row_ranges_by_keys() {
SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns);
DorisMetrics::instance()->segment_row_total->increment(num_rows());
@@ -2110,6 +2197,11 @@ Status SegmentIterator::_read_columns_by_index(uint32_t
nrows_read_limit, uint16
"[{}]",
nrows_read, is_continuous, fmt::join(_predicate_column_ids, ","));
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] SegmentIterator::_read_columns_by_index read {} rowids,
continuous: {}, "
+ "rowids: [{}...{}]",
+ nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0,
+ nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0);
for (auto cid : _predicate_column_ids) {
auto& column = _current_return_columns[cid];
VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid,
@@ -2473,6 +2565,8 @@ Status
SegmentIterator::copy_column_data_by_selector(IColumn* input_col_ptr,
}
Status SegmentIterator::_next_batch_internal(Block* block) {
+
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch);
+
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
diff --git a/be/src/storage/segment/segment_iterator.h
b/be/src/storage/segment/segment_iterator.h
index 4edef9c77d6..142d252af13 100644
--- a/be/src/storage/segment/segment_iterator.h
+++ b/be/src/storage/segment/segment_iterator.h
@@ -342,6 +342,8 @@ private:
void _init_row_bitmap_by_condition_cache();
+ void _init_segment_prefetchers();
+
class BitmapRangeIterator;
class BackwardBitmapRangeIterator;
diff --git a/be/src/storage/segment/segment_prefetcher.cpp
b/be/src/storage/segment/segment_prefetcher.cpp
new file mode 100644
index 00000000000..2405066d2e5
--- /dev/null
+++ b/be/src/storage/segment/segment_prefetcher.cpp
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/segment/segment_prefetcher.h"
+
+#include <algorithm>
+#include <ranges>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "storage/index/ordinal_page_index.h"
+#include "storage/iterators.h"
+#include "storage/segment/column_reader.h"
+
+namespace doris::segment_v2 {
+
+void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
+ if (ordinal_index == nullptr) {
+ return;
+ }
+ const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first
ordinal of page i
+ const auto& pages = ordinal_index->_pages; // pages[i] = page
pointer of page i
+ const int num_pages = ordinal_index->_num_pages;
+ for (uint32_t i = 0; i < num; ++i) {
+ rowid_t rowid = rowids[i];
+
+ if (_is_forward) {
+ while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <=
rowid) {
+ page_idx++;
+ }
+
+ const auto& page = pages[page_idx];
+ size_t page_start_block = _offset_to_block_id(page.offset);
+ size_t page_end_block = _offset_to_block_id(page.offset +
page.size - 1);
+
+ // If page spans two blocks, assign it to the next block
(page_end_block)
+ size_t block_id =
+ (page_start_block != page_end_block) ? page_end_block :
page_start_block;
+
+ if (block_id != last_block_id) {
+ if (last_block_id != static_cast<size_t>(-1)) {
+ _block_sequence.emplace_back(last_block_id,
current_block_first_rowid);
+ }
+ last_block_id = block_id;
+ current_block_first_rowid = rowid;
+ }
+ } else {
+ // Backward reading: we need the last rowid in each block as the
"first" rowid
+ // (because when reading backwards, we encounter the largest rowid
first)
+ //
+ // Strategy: iterate forward through bitmap, but for each block,
+ // keep updating current_block_first_rowid to the latest (largest)
rowid in that block
+ while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <=
rowid) {
+ page_idx++;
+ }
+ size_t block_id = _offset_to_block_id(pages[page_idx].offset);
+
+ if (block_id != last_block_id) {
+ if (last_block_id != static_cast<size_t>(-1)) {
+ _block_sequence.emplace_back(last_block_id,
current_block_first_rowid);
+ }
+ last_block_id = block_id;
+ }
+ current_block_first_rowid = rowid;
+ }
+ }
+}
+
+void SegmentPrefetcher::build_all_data_blocks() {
+ if (ordinal_index == nullptr) {
+ return;
+ }
+ reset_blocks();
+ const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first
ordinal of page i
+ const auto& pages = ordinal_index->_pages; // pages[i] = page
pointer of page i
+ const int num_pages = ordinal_index->_num_pages;
+
+ last_block_id = static_cast<size_t>(-1);
+ current_block_first_rowid = 0;
+
+ for (page_idx = 0; page_idx < num_pages; ++page_idx) {
+ const auto& page = pages[page_idx];
+
+ if (_is_forward) {
+ size_t page_start_block = _offset_to_block_id(page.offset);
+ size_t page_end_block = _offset_to_block_id(page.offset +
page.size - 1);
+
+ // If page spans two blocks, assign it to the next block
(page_end_block)
+ size_t block_id =
+ (page_start_block != page_end_block) ? page_end_block :
page_start_block;
+
+ if (block_id != last_block_id) {
+ if (last_block_id != static_cast<size_t>(-1)) {
+ _block_sequence.emplace_back(last_block_id,
current_block_first_rowid);
+ }
+ last_block_id = block_id;
+ current_block_first_rowid =
static_cast<rowid_t>(ordinals[page_idx]);
+ }
+ } else {
+ // Backward: use the last ordinal in each block as first_rowid
+ size_t block_id = _offset_to_block_id(page.offset);
+ if (block_id != last_block_id) {
+ if (last_block_id != static_cast<size_t>(-1)) {
+ _block_sequence.emplace_back(last_block_id,
current_block_first_rowid);
+ }
+ last_block_id = block_id;
+ }
+ current_block_first_rowid =
static_cast<rowid_t>(ordinals[page_idx]);
+ }
+ }
+
+ // Add the last block
+ if (last_block_id != static_cast<size_t>(-1)) {
+ _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
+ }
+
+ // Reverse for backward reading
+ if (!_is_forward && !_block_sequence.empty()) {
+ std::ranges::reverse(_block_sequence);
+ }
+}
+
+void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring&
row_bitmap,
+ const
std::vector<SegmentPrefetcher*>& prefetchers) {
+ for (auto* prefetcher : prefetchers) {
+ prefetcher->begin_build_blocks_by_rowids();
+ }
+
+ int batch_size = config::segment_file_cache_consume_rowids_batch_size;
+ std::vector<rowid_t> rowids(batch_size);
+ roaring::api::roaring_uint32_iterator_t iter;
+ roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter);
+ uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter,
rowids.data(), batch_size);
+
+ for (; num > 0;
+ num = roaring::api::roaring_read_uint32_iterator(&iter,
rowids.data(), batch_size)) {
+ for (auto* prefetcher : prefetchers) {
+ prefetcher->add_rowids(rowids.data(), num);
+ }
+ }
+
+ for (auto* prefetcher : prefetchers) {
+ prefetcher->finish_build_blocks_by_rowids();
+ }
+}
+
+void SegmentPrefetcher::begin_build_blocks_by_rowids() {
+ reset_blocks();
+ page_idx = 0;
+}
+
+void SegmentPrefetcher::finish_build_blocks_by_rowids() {
+ if (ordinal_index == nullptr) {
+ return;
+ }
+ if (last_block_id != static_cast<size_t>(-1)) {
+ _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
+ }
+
+ if (!_is_forward && !_block_sequence.empty()) {
+ std::ranges::reverse(_block_sequence);
+ }
+
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] SegmentPrefetcher initialized with block count={},
is_forward={}, "
+ "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
+ _block_sequence.size(), _is_forward, ordinal_index->_num_pages,
_path,
+ fmt::join(_block_sequence | std::views::transform([](const auto&
b) {
+ return fmt::format("({}, {})", b.block_id,
b.first_rowid);
+ }),
+ ","));
+}
+
+void SegmentPrefetcher::reset_blocks() {
+ _block_sequence.clear();
+ _current_block_index = 0;
+ _prefetched_index = -1;
+}
+
+Status SegmentPrefetcher::init(std::shared_ptr<ColumnReader> column_reader,
+ const StorageReadOptions& read_options) {
+ DCHECK(column_reader != nullptr);
+
+ reset_blocks();
+ _is_forward = !read_options.read_orderby_key_reverse;
+ _path = column_reader->_file_reader->path().filename().native();
+
+ RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index,
read_options.stats));
+ return Status::OK();
+}
+
+bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid,
std::vector<BlockRange>* out_ranges) {
+ DCHECK(out_ranges != nullptr);
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
+ << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter
current_rowid={}, {}",
+ current_rowid, debug_string());
+ if (_block_sequence.empty() ||
+ _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
+ return false;
+ }
+
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] SegmentPrefetcher need_prefetch called with
current_rowid={}, {}, "
+ "block=(id={}, first_rowid={})",
+ current_rowid, debug_string(),
_block_sequence[_current_block_index].block_id,
+ _block_sequence[_current_block_index].first_rowid);
+ if (_is_forward) {
+ while (_current_block_index + 1 < _block_sequence.size() &&
+ _block_sequence[_current_block_index + 1].first_rowid <=
current_rowid) {
+ _current_block_index++;
+ }
+ } else {
+ while (_current_block_index + 1 < _block_sequence.size() &&
+ _block_sequence[_current_block_index + 1].first_rowid >=
current_rowid) {
+ _current_block_index++;
+ }
+ }
+
+ out_ranges->clear();
+ // for non-predicate column, some rowids in row_bitmap may be filtered out
after vec evaluation of predicate columns,
+ // so we should not prefetch for these rows
+ _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
+ while (_prefetched_index + 1 < _block_sequence.size() &&
+ window_size() < _config.prefetch_window_size) {
+
out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
+ }
+
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+ "[verbose] SegmentPrefetcher need_prefetch after calc with
current_rowid={}, {}, "
+ "block=(id={}, first_rowid={})",
+ current_rowid, debug_string(),
_block_sequence[_current_block_index].block_id,
+ _block_sequence[_current_block_index].first_rowid);
+
+ bool triggered = !out_ranges->empty();
+ if (triggered) {
+ LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) <<
fmt::format(
+ "[verbose] SegmentPrefetcher prefetch triggered at rowid={},
{}, prefetch {} "
+ "blocks: (offset, size)=[{}]",
+ current_rowid, debug_string(), out_ranges->size(),
+ fmt::join(*out_ranges | std::views::transform([](const auto&
b) {
+ return fmt::format("({}, {})", b.offset, b.size);
+ }),
+ ","));
+ }
+ return triggered;
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/storage/segment/segment_prefetcher.h
b/be/src/storage/segment/segment_prefetcher.h
new file mode 100644
index 00000000000..659f80c3b0d
--- /dev/null
+++ b/be/src/storage/segment/segment_prefetcher.h
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <roaring/roaring.hh>
+#include <vector>
+
+#include "common/status.h"
+#include "storage/segment/common.h"
+
+namespace doris {
+namespace io {
+class FileReader;
+} // namespace io
+class StorageReadOptions;
+
+namespace segment_v2 {
+class OrdinalIndexReader;
+class ColumnReader;
+
+enum class PrefetcherInitMethod : int { FROM_ROWIDS = 0, ALL_DATA_BLOCKS = 1 };
+
+// Configuration for segment prefetcher
+struct SegmentPrefetcherConfig {
+ // Number of file cache blocks to prefetch ahead
+ size_t prefetch_window_size = 4;
+
+ // File cache block size in bytes (default 1MB)
+ size_t block_size = 1024 * 1024;
+
+ SegmentPrefetcherConfig(size_t window_size, size_t blk_size)
+ : prefetch_window_size(window_size), block_size(blk_size) {}
+};
+
+// Block range representing [offset, offset + size) in the segment file
+struct BlockRange {
+ uint64_t offset;
+ uint64_t size;
+
+ BlockRange(uint64_t off, uint64_t sz) : offset(off), size(sz) {}
+
+ bool operator==(const BlockRange& other) const {
+ return offset == other.offset && size == other.size;
+ }
+};
+
+// Represents a block with its first rowid for reading
+struct BlockInfo {
+ size_t block_id;
+ rowid_t first_rowid;
+
+ BlockInfo(size_t bid, rowid_t rid) : block_id(bid), first_rowid(rid) {}
+};
+
+struct SegmentPrefetchParams {
+ SegmentPrefetcherConfig config;
+ const StorageReadOptions& read_options;
+};
+
+// SegmentPrefetcher maintains block sequence and triggers prefetch to keep
+// N blocks ahead of current reading position.
+//
+// Key design:
+// - Monotonic reading: rowids are read in order (forward or backward)
+// - Trigger condition: when current_rowid reaches a block boundary, prefetch
next N blocks
+// - No deduplication needed: reading is monotonic, blocks are naturally
processed in order
+class SegmentPrefetcher {
+public:
+ explicit SegmentPrefetcher(const SegmentPrefetcherConfig& config) :
_config(config) {}
+
+ ~SegmentPrefetcher() = default;
+
+ Status init(std::shared_ptr<ColumnReader> column_reader,
+ const StorageReadOptions& read_options);
+
+ bool need_prefetch(rowid_t current_rowid, std::vector<BlockRange>*
out_ranges);
+
+ static void build_blocks_by_rowids(const roaring::Roaring& row_bitmap,
+ const std::vector<SegmentPrefetcher*>&
prefetchers);
+ void begin_build_blocks_by_rowids();
+ void add_rowids(const rowid_t* rowids, uint32_t num);
+ void finish_build_blocks_by_rowids();
+
+ void build_all_data_blocks();
+
+private:
+ // Parameters:
+ // row_bitmap: The complete bitmap of rowids to scan
+ // ordinal_index: Ordinal index reader (must be loaded)
+ //
+ // For forward reading: first_rowid is the first rowid we need to read in
each block
+ // For backward reading: first_rowid is the last rowid we need to read in
each block
+ // (since we read backwards, this is the first one we'll encounter)
+ void _build_block_sequence_from_bitmap(const roaring::Roaring& row_bitmap,
+ OrdinalIndexReader* ordinal_index);
+ size_t _offset_to_block_id(uint64_t offset) const { return offset /
_config.block_size; }
+
+ BlockRange _block_id_to_range(size_t block_id) const {
+ return {block_id * _config.block_size, _config.block_size};
+ }
+
+ int window_size() const { return _prefetched_index - _current_block_index
+ 1; }
+
+ std::string debug_string() const {
+ return fmt::format(
+ "[internal state] _is_forward={}, _prefetched_index={},
_current_block_index={}, "
+ "window_size={}, block.size()={}, path={}",
+ _is_forward, _prefetched_index, _current_block_index,
window_size(),
+ _block_sequence.size(), _path);
+ }
+
+ void reset_blocks();
+
+private:
+ SegmentPrefetcherConfig _config;
+ std::string _path;
+
+ // Sequence of blocks with their first rowid (in reading order)
+ std::vector<BlockInfo> _block_sequence;
+
+ bool _is_forward = true;
+
+ int _prefetched_index = -1;
+ int _current_block_index = 0;
+
+ int page_idx = 0;
+ // For each page, track the first rowid we need to read
+ // For forward: the smallest rowid in this page
+ // For backward: the largest rowid in this page (first one we'll encounter
when reading backwards)
+ size_t last_block_id = static_cast<size_t>(-1);
+ rowid_t current_block_first_rowid = 0;
+
+ OrdinalIndexReader* ordinal_index = nullptr;
+};
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/storage/segment/variant/hierarchical_data_iterator.cpp
b/be/src/storage/segment/variant/hierarchical_data_iterator.cpp
index cfd1c7a26d1..8c219b2f982 100644
--- a/be/src/storage/segment/variant/hierarchical_data_iterator.cpp
+++ b/be/src/storage/segment/variant/hierarchical_data_iterator.cpp
@@ -168,6 +168,39 @@ ordinal_t HierarchicalDataIterator::get_current_ordinal()
const {
return (*_substream_reader.begin())->data.iterator->get_current_ordinal();
}
+Status HierarchicalDataIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
+ RETURN_IF_ERROR(node.data.iterator->init_prefetcher(params));
+ return Status::OK();
+ }));
+ if (_root_reader) {
+ DCHECK(_root_reader->inited);
+ RETURN_IF_ERROR(_root_reader->iterator->init_prefetcher(params));
+ }
+ if (_binary_column_reader) {
+ DCHECK(_binary_column_reader->inited);
+
RETURN_IF_ERROR(_binary_column_reader->iterator->init_prefetcher(params));
+ }
+ return Status::OK();
+}
+
+void HierarchicalDataIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ static_cast<void>(tranverse([&](SubstreamReaderTree::Node& node) {
+ node.data.iterator->collect_prefetchers(prefetchers, init_method);
+ return Status::OK();
+ }));
+ if (_root_reader) {
+ DCHECK(_root_reader->inited);
+ _root_reader->iterator->collect_prefetchers(prefetchers, init_method);
+ }
+ if (_binary_column_reader) {
+ DCHECK(_binary_column_reader->inited);
+ _binary_column_reader->iterator->collect_prefetchers(prefetchers,
init_method);
+ }
+}
+
Status HierarchicalDataIterator::_process_sub_columns(
ColumnVariant& container_variant, const PathsWithColumnAndType&
non_nested_subcolumns) {
for (const auto& entry : non_nested_subcolumns) {
diff --git a/be/src/storage/segment/variant/hierarchical_data_iterator.h
b/be/src/storage/segment/variant/hierarchical_data_iterator.h
index 8dd90475b75..eb5e29093e3 100644
--- a/be/src/storage/segment/variant/hierarchical_data_iterator.h
+++ b/be/src/storage/segment/variant/hierarchical_data_iterator.h
@@ -91,6 +91,11 @@ public:
Status add_stream(int32_t col_uid, const SubcolumnColumnMetaInfo::Node*
node,
ColumnReaderCache* column_reader_cache,
OlapReaderStatistics* stats);
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
+
private:
SubstreamReaderTree _substream_reader;
std::unique_ptr<SubstreamIterator> _root_reader;
diff --git a/be/src/storage/segment/variant/variant_column_reader.cpp
b/be/src/storage/segment/variant/variant_column_reader.cpp
index 31fbfe74a04..93f896c80b5 100644
--- a/be/src/storage/segment/variant/variant_column_reader.cpp
+++ b/be/src/storage/segment/variant/variant_column_reader.cpp
@@ -1466,6 +1466,16 @@ Status VariantRootColumnIterator::read_by_rowids(const
rowid_t* rowids, const si
return _process_root_column(dst, root_column, most_common_type);
}
+Status VariantRootColumnIterator::init_prefetcher(const SegmentPrefetchParams&
params) {
+ return _inner_iter->init_prefetcher(params);
+}
+
+void VariantRootColumnIterator::collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) {
+ _inner_iter->collect_prefetchers(prefetchers, init_method);
+}
+
static void fill_nested_with_defaults(MutableColumnPtr& dst, MutableColumnPtr&
sibling_column,
size_t nrows) {
const auto* sibling_array =
diff --git a/be/src/storage/segment/variant/variant_column_reader.h
b/be/src/storage/segment/variant/variant_column_reader.h
index a7f079ce0dc..d9f35730e62 100644
--- a/be/src/storage/segment/variant/variant_column_reader.h
+++ b/be/src/storage/segment/variant/variant_column_reader.h
@@ -462,6 +462,11 @@ public:
ordinal_t get_current_ordinal() const override { return
_inner_iter->get_current_ordinal(); }
+ Status init_prefetcher(const SegmentPrefetchParams& params) override;
+ void collect_prefetchers(
+ std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>&
prefetchers,
+ PrefetcherInitMethod init_method) override;
+
private:
Status _process_root_column(MutableColumnPtr& dst, MutableColumnPtr&
root_column,
const DataTypePtr& most_common_type);
diff --git a/be/src/util/concurrency_stats.cpp
b/be/src/util/concurrency_stats.cpp
new file mode 100644
index 00000000000..7c741cbae5b
--- /dev/null
+++ b/be/src/util/concurrency_stats.cpp
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/concurrency_stats.h"
+
+#include <chrono>
+#include <sstream>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "util/thread.h"
+
+namespace doris {
+ConcurrencyStatsManager::ConcurrencyStatsManager() : _running(false) {
+ // Initialize all counters in the order of read path (top to bottom)
+ vscanner_get_block = new ConcurrencyCounter("vscanner");
+ segment_iterator_next_batch = new ConcurrencyCounter("segment_iterator");
+ column_reader_read_page = new ConcurrencyCounter("column_reader");
+ page_io_decompress = new ConcurrencyCounter("page_io.decompress");
+ page_io_pre_decode = new ConcurrencyCounter("page_io.pre_decode");
+ page_io_insert_page_cache = new
ConcurrencyCounter("page_io.insert_page_cache");
+ cached_remote_reader_read_at = new
ConcurrencyCounter("file_cache.read_at");
+ cached_remote_reader_get_or_set = new
ConcurrencyCounter("file_cache.get_or_set");
+ cached_remote_reader_get_or_set_wait_lock =
+ new ConcurrencyCounter("file_cache.get_or_set_wait_lock");
+ cached_remote_reader_write_back = new
ConcurrencyCounter("file_cache.write_back");
+ cached_remote_reader_blocking = new
ConcurrencyCounter("file_cache.blocking");
+ cached_remote_reader_local_read = new
ConcurrencyCounter("file_cache.local_read");
+ s3_file_reader_read = new ConcurrencyCounter("s3.read");
+
+ // Add to vector in the order they should be printed
+ _counters.push_back(vscanner_get_block);
+ _counters.push_back(segment_iterator_next_batch);
+ _counters.push_back(column_reader_read_page);
+ _counters.push_back(page_io_decompress);
+ _counters.push_back(page_io_pre_decode);
+ _counters.push_back(page_io_insert_page_cache);
+ _counters.push_back(cached_remote_reader_read_at);
+ _counters.push_back(cached_remote_reader_get_or_set);
+ _counters.push_back(cached_remote_reader_get_or_set_wait_lock);
+ _counters.push_back(cached_remote_reader_write_back);
+ _counters.push_back(cached_remote_reader_blocking);
+ _counters.push_back(cached_remote_reader_local_read);
+ _counters.push_back(s3_file_reader_read);
+}
+
+ConcurrencyStatsManager::~ConcurrencyStatsManager() {
+ stop();
+
+ // Clean up counters
+ for (auto* counter : _counters) {
+ delete counter;
+ }
+ _counters.clear();
+}
+
+ConcurrencyStatsManager& ConcurrencyStatsManager::instance() {
+ static ConcurrencyStatsManager instance;
+ return instance;
+}
+
+void ConcurrencyStatsManager::start() {
+ if (_running.exchange(true)) {
+ return; // Already running
+ }
+
+ _dump_thread = std::make_unique<std::thread>([this]() {
_dump_thread_func(); });
+}
+
+void ConcurrencyStatsManager::stop() {
+ if (!_running.exchange(false)) {
+ return; // Not running
+ }
+
+ if (_dump_thread && _dump_thread->joinable()) {
+ _dump_thread->join();
+ }
+ _dump_thread.reset();
+}
+
+void ConcurrencyStatsManager::dump_to_log() {
+ if (_counters.empty()) {
+ return;
+ }
+
+ // Build single line output: CONCURRENCY_STATS name1=value1 name2=value2
...
+ std::stringstream ss;
+ ss << "CONCURRENCY_STATS";
+
+ for (const auto* counter : _counters) {
+ int64_t value = counter->value();
+ ss << " " << counter->name() << "=" << value;
+ }
+
+ LOG(INFO) << ss.str();
+}
+
+void ConcurrencyStatsManager::_dump_thread_func() {
+ Thread::set_self_name("ConcurrencyStatsManager_dump_thread");
+ while (_running.load(std::memory_order_relaxed)) {
+ // Check if dumping is enabled
+ if (config::enable_concurrency_stats_dump) {
+ dump_to_log();
+ }
+
+ // Sleep for the configured interval
+ int32_t interval_ms = config::concurrency_stats_dump_interval_ms;
+ if (interval_ms <= 0) {
+ interval_ms = 100; // Default to 100ms if invalid
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms));
+ }
+}
+
+} // namespace doris
diff --git a/be/src/util/concurrency_stats.h b/be/src/util/concurrency_stats.h
new file mode 100644
index 00000000000..f5d4489ea3d
--- /dev/null
+++ b/be/src/util/concurrency_stats.h
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+namespace doris {
+
+// A thread-safe counter for tracking concurrent operations
+// Uses atomic variable for high-performance concurrent access
+class ConcurrencyCounter {
+public:
+ explicit ConcurrencyCounter(std::string name) : _name(std::move(name)),
_count(0) {}
+
+ // Increment the counter
+ void increment() { _count.fetch_add(1, std::memory_order_relaxed); }
+
+ // Decrement the counter
+ void decrement() { _count.fetch_sub(1, std::memory_order_relaxed); }
+
+ // Get current value
+ int64_t value() const { return _count.load(std::memory_order_relaxed); }
+
+ const std::string& name() const { return _name; }
+
+ // RAII helper for automatic increment/decrement
+ class Guard {
+ public:
+ explicit Guard(ConcurrencyCounter* counter) : _counter(counter) {
+ if (_counter) {
+ _counter->increment();
+ }
+ }
+
+ ~Guard() {
+ if (_counter) {
+ _counter->decrement();
+ }
+ }
+
+ Guard(const Guard&) = delete;
+ Guard& operator=(const Guard&) = delete;
+
+ private:
+ ConcurrencyCounter* _counter;
+ };
+
+private:
+ std::string _name;
+ std::atomic<int64_t> _count;
+};
+
+// Singleton manager for all concurrency counters
+// All counters are defined here in order
+class ConcurrencyStatsManager {
+public:
+ static ConcurrencyStatsManager& instance();
+
+ // Start the background thread for periodic logging
+ void start();
+
+ // Stop the background thread
+ void stop();
+
+ // Manually dump all counters to log
+ void dump_to_log();
+
+ // Access to individual counters (defined in order of read path from top
to bottom)
+ ConcurrencyCounter* vscanner_get_block;
+ ConcurrencyCounter* segment_iterator_next_batch;
+ ConcurrencyCounter* column_reader_read_page;
+ ConcurrencyCounter* page_io_decompress;
+ ConcurrencyCounter* page_io_pre_decode;
+ ConcurrencyCounter* page_io_insert_page_cache;
+ ConcurrencyCounter* cached_remote_reader_read_at;
+ ConcurrencyCounter* cached_remote_reader_get_or_set;
+ ConcurrencyCounter* cached_remote_reader_get_or_set_wait_lock;
+ ConcurrencyCounter* cached_remote_reader_write_back;
+ ConcurrencyCounter* cached_remote_reader_blocking;
+ ConcurrencyCounter* cached_remote_reader_local_read;
+ ConcurrencyCounter* s3_file_reader_read;
+
+private:
+ ConcurrencyStatsManager();
+ ~ConcurrencyStatsManager();
+
+ ConcurrencyStatsManager(const ConcurrencyStatsManager&) = delete;
+ ConcurrencyStatsManager& operator=(const ConcurrencyStatsManager&) =
delete;
+
+ void _dump_thread_func();
+
+ // All counters in the order they should be printed
+ std::vector<ConcurrencyCounter*> _counters;
+
+ std::atomic<bool> _running;
+ std::unique_ptr<std::thread> _dump_thread;
+};
+
+// Macro for scoped counting
+#define SCOPED_CONCURRENCY_COUNT_IMPL(counter_ptr, unique_id) \
+ doris::ConcurrencyCounter::Guard
_concurrency_guard_##unique_id(counter_ptr)
+
+#define SCOPED_CONCURRENCY_COUNT_HELPER(counter_ptr, line) \
+ SCOPED_CONCURRENCY_COUNT_IMPL(counter_ptr, line)
+
+#define SCOPED_CONCURRENCY_COUNT(counter_ptr)
SCOPED_CONCURRENCY_COUNT_HELPER(counter_ptr, __LINE__)
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]