This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f05a582ab24 branch-4.0: [feature](cloud) Read peer be cache when
balance in same cluster #56384 (#58089)
f05a582ab24 is described below
commit f05a582ab24177ac462e53f86d293a36c3f120cc
Author: deardeng <[email protected]>
AuthorDate: Tue Nov 18 16:02:56 2025 +0800
branch-4.0: [feature](cloud) Read peer be cache when balance in same
cluster #56384 (#58089)
cherry pick from #56384
---
be/src/cloud/cloud_backend_service.cpp | 24 ++-
be/src/cloud/cloud_internal_service.cpp | 122 +++++++++++
be/src/cloud/cloud_internal_service.h | 5 +
be/src/cloud/cloud_tablet_mgr.cpp | 2 +-
be/src/cloud/cloud_warm_up_manager.cpp | 84 ++++++++
be/src/cloud/cloud_warm_up_manager.h | 28 +++
be/src/cloud/config.cpp | 6 +
be/src/cloud/config.h | 4 +
be/src/http/action/file_cache_action.cpp | 5 +
be/src/io/cache/block_file_cache_downloader.cpp | 43 +++-
be/src/io/cache/block_file_cache_factory.cpp | 35 +++
be/src/io/cache/block_file_cache_factory.h | 6 +
be/src/io/cache/block_file_cache_profile.cpp | 25 ++-
be/src/io/cache/block_file_cache_profile.h | 7 +
be/src/io/cache/cached_remote_file_reader.cpp | 184 ++++++++++++++--
be/src/io/cache/cached_remote_file_reader.h | 13 +-
be/src/io/cache/file_cache_common.h | 2 +
be/src/io/cache/peer_file_cache_reader.cpp | 167 +++++++++++++++
be/src/io/cache/peer_file_cache_reader.h | 82 ++++++++
be/src/io/fs/file_reader.h | 2 +
be/src/io/fs/s3_file_reader.cpp | 17 +-
be/src/io/io_common.h | 6 +
be/src/olap/rowset/beta_rowset.cpp | 1 +
be/src/olap/storage_policy.cpp | 86 ++++++++
be/src/olap/storage_policy.h | 3 +
be/src/util/doris_metrics.cpp | 2 +
be/src/util/doris_metrics.h | 1 +
be/test/olap/storage_resource_test.cpp | 109 ++++++++++
.../java/org/apache/doris/system/HeartbeatMgr.java | 3 +-
gensrc/proto/internal_service.proto | 35 +++
.../doris/regression/action/ProfileAction.groovy | 5 +-
.../cloud_p0/balance/test_balance_warm_up.groovy | 38 +++-
.../test_balance_warm_up_use_peer_cache.groovy | 223 ++++++++++++++++++++
...e_warm_up_with_compaction_use_peer_cache.groovy | 234 +++++++++++++++++++++
.../read_from_peer/test_read_from_peer.groovy | 178 ++++++++++++++++
35 files changed, 1755 insertions(+), 32 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index a5caf89e207..62f72f0bb76 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -179,6 +179,12 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
<< request.brpc_port
<< ", tablets num=" << request.tablet_ids.size() << ",
tablet_ids=" << oss.str();
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ // Record each tablet in manager
+ for (int64_t tablet_id : request.tablet_ids) {
+ manager.record_balanced_tablet(tablet_id, request.host,
request.brpc_port);
+ }
+
std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
@@ -188,6 +194,8 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host <<
": "
<< status.to_string();
+ // Remove failed tablets from tracking
+ manager.remove_balanced_tablets(request.tablet_ids);
return;
}
}
@@ -199,6 +207,8 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr
" << brpc_addr;
+ // Remove failed tablets from tracking
+ manager.remove_balanced_tablets(request.tablet_ids);
return;
}
brpc::Controller cntl;
@@ -213,10 +223,20 @@ void
CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
if (!cntl.Failed()) {
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
- _engine.file_cache_block_downloader().submit_download_task(
- std::move(*brpc_response.mutable_file_cache_block_metas()));
+ auto& file_cache_block_metas =
*brpc_response.mutable_file_cache_block_metas();
+ if (!file_cache_block_metas.empty()) {
+ _engine.file_cache_block_downloader().submit_download_task(
+ std::move(file_cache_block_metas));
+ LOG(INFO) << "warm_up_cache_async: successfully submitted download
task for tablets="
+ << oss.str();
+ } else {
+ LOG(INFO) << "warm_up_cache_async: no file cache block meta found,
addr=" << brpc_addr;
+ manager.remove_balanced_tablets(request.tablet_ids);
+ }
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
+ // Remove failed tablets from tracking
+ manager.remove_balanced_tablets(request.tablet_ids);
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" <<
brpc_addr
<< ", error=" << cntl.ErrorText();
}
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 240ffe56c3c..94bb951b95d 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -19,6 +19,9 @@
#include <bthread/countdown_event.h>
+#include <algorithm>
+#include <thread>
+
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
@@ -27,12 +30,24 @@
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
+#include "runtime/thread_context.h"
+#include "runtime/workload_management/io_throttle.h"
+#include "util/async_io.h"
#include "util/debug_points.h"
namespace doris {
#include "common/compile_check_avoid_begin.h"
#include "common/compile_check_begin.h"
+bvar::Adder<uint64_t>
g_file_cache_get_by_peer_num("file_cache_get_by_peer_num");
+bvar::Adder<uint64_t>
g_file_cache_get_by_peer_blocks_num("file_cache_get_by_peer_blocks_num");
+bvar::Adder<uint64_t>
g_file_cache_get_by_peer_success_num("file_cache_get_by_peer_success_num");
+bvar::Adder<uint64_t>
g_file_cache_get_by_peer_failed_num("file_cache_get_by_peer_failed_num");
+bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
+ "file_cache_get_by_peer_server_latency");
+bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
+ "file_cache_get_by_peer_read_cache_file_latency");
+
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine,
ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}
@@ -154,6 +169,113 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
}
+
+void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
+ [[maybe_unused]],
+ const PFetchPeerDataRequest*
request,
+ PFetchPeerDataResponse*
response,
+ google::protobuf::Closure*
done) {
+ // TODO(dx): use async thread pool to handle the request, not AsyncIO
+ brpc::ClosureGuard closure_guard(done);
+ g_file_cache_get_by_peer_num << 1;
+ if (!config::enable_file_cache) {
+ LOG_WARNING("try to access file cache data, but file cache not
enabled");
+ return;
+ }
+ int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ const auto type = request->type();
+ const auto& path = request->path();
+ response->mutable_status()->set_status_code(TStatusCode::OK);
+ if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+ // Read specific range [file_offset, file_offset+file_size) across
cached blocks
+ auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+ for (auto& cb : datas) {
+ *(response->add_datas()) = std::move(cb);
+ }
+ } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+ // Multiple specific blocks
+ auto hash = io::BlockFileCache::hash(path);
+ auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ response->mutable_status()->add_error_msgs("can't get file cache
instance");
+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ return;
+ }
+ io::CacheContext ctx {};
+ // ensure a valid stats pointer is provided to cache layer
+ io::ReadStatistics local_stats;
+ ctx.stats = &local_stats;
+ for (const auto& cb_req : request->cache_req()) {
+ size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
+ size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
+ auto holder = cache->get_or_set(hash, offset, size, ctx);
+ for (auto& fb : holder.file_blocks) {
+ auto state = fb->state();
+ if (state != io::FileBlock::State::DOWNLOADED) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed, state=" << state;
+ response->mutable_status()->add_error_msgs("read cache
file error");
+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ return;
+ }
+ g_file_cache_get_by_peer_blocks_num << 1;
+ doris::CacheBlockPB* out = response->add_datas();
+ out->set_block_offset(static_cast<int64_t>(fb->offset()));
+ out->set_block_size(static_cast<int64_t>(fb->range().size()));
+ std::string data;
+ data.resize(fb->range().size());
+ // Offload the file block read to a dedicated OS thread to
avoid bthread IO
+ Status read_st = Status::OK();
+ // due to file_reader.cpp:33] Check failed: bthread_self() == 0
+ int64_t begin_read_file_ts =
+ std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ auto task = [&] {
+ // Current thread not exist ThreadContext, usually after
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext
and bind a Task.
+
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ read_st = fb->read(slice, /*read_offset=*/0);
+ };
+ AsyncIO::run_task(task, io::FileSystemType::LOCAL);
+ int64_t end_read_file_ts =
+ std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency
+ << (end_read_file_ts - begin_read_file_ts);
+ if (read_st.ok()) {
+ out->set_data(std::move(data));
+ } else {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed: " << read_st;
+ response->mutable_status()->add_error_msgs("read cache
file error");
+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ return;
+ }
+ }
+ }
+ }
+ DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
+ int st_us = dp->param<int>("sleep", 1000);
+
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep",
st_us);
+ // sleep us
+ bthread_usleep(st_us);
+ });
+
+ int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
+ g_file_cache_get_by_peer_success_num << 1;
+
+ VLOG_DEBUG << "fetch cache request=" << request->DebugString()
+ << ", response=" << response->DebugString();
+}
+
#include "common/compile_check_end.h"
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_num(
diff --git a/be/src/cloud/cloud_internal_service.h
b/be/src/cloud/cloud_internal_service.h
index 59d8739cbf4..db4916313fe 100644
--- a/be/src/cloud/cloud_internal_service.h
+++ b/be/src/cloud/cloud_internal_service.h
@@ -48,6 +48,11 @@ public:
const PRecycleCacheRequest* request,
PRecycleCacheResponse* response,
google::protobuf::Closure* done) override;
+ // Get file cached data about the path in file cache
+ void fetch_peer_data(google::protobuf::RpcController* controller,
+ const PFetchPeerDataRequest* request,
PFetchPeerDataResponse* response,
+ google::protobuf::Closure* done) override;
+
private:
CloudStorageEngine& _engine;
};
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index a3270e0dcf9..03c882e0352 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -487,7 +487,7 @@ void
CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablet->build_tablet_report_info(&tablet_info);
using namespace std::chrono;
int64_t now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- if (now - g_tablet_report_inactive_duration_ms * 1000 <
tablet->last_access_time_ms) {
+ if (now - g_tablet_report_inactive_duration_ms <
tablet->last_access_time_ms) {
// the tablet is still being accessed and used in recently, so not
report it
return;
}
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 5c2d52f9b84..fb1a005c22a 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -82,6 +82,7 @@ bvar::Adder<uint64_t>
g_file_cache_recycle_cache_requested_index_num(
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
"file_cache_warm_up_rowset_last_call_unix_ts", 0);
bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up",
"failed_task_num");
+bvar::Adder<uint64_t>
g_balance_tablet_be_mapping_size("balance_tablet_be_mapping_size");
bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency(
"file_cache_warm_up_rowset_wait_for_compaction_latency");
@@ -104,6 +105,11 @@ CloudWarmUpManager::~CloudWarmUpManager() {
if (_download_thread.joinable()) {
_download_thread.join();
}
+
+ for (auto& shard : _balanced_tablets_shards) {
+ std::lock_guard<std::mutex> lock(shard.mtx);
+ shard.tablets.clear();
+ }
}
std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTablet* tablet) {
@@ -783,5 +789,83 @@ void CloudWarmUpManager::_recycle_cache(int64_t tablet_id,
}
}
+// Balance warm up cache management methods implementation
+void CloudWarmUpManager::record_balanced_tablet(int64_t tablet_id, const
std::string& host,
+ int32_t brpc_port) {
+ auto& shard = get_shard(tablet_id);
+ std::lock_guard<std::mutex> lock(shard.mtx);
+ JobMeta meta;
+ meta.be_ip = host;
+ meta.brpc_port = brpc_port;
+ shard.tablets.emplace(tablet_id, std::move(meta));
+ g_balance_tablet_be_mapping_size << 1;
+ VLOG_DEBUG << "Recorded balanced warm up cache tablet: tablet_id=" <<
tablet_id
+ << ", host=" << host << ":" << brpc_port;
+}
+
+std::optional<std::pair<std::string, int32_t>>
CloudWarmUpManager::get_balanced_tablet_info(
+ int64_t tablet_id) {
+ auto& shard = get_shard(tablet_id);
+ std::lock_guard<std::mutex> lock(shard.mtx);
+ auto it = shard.tablets.find(tablet_id);
+ if (it == shard.tablets.end()) {
+ return std::nullopt;
+ }
+ return std::make_pair(it->second.be_ip, it->second.brpc_port);
+}
+
+void CloudWarmUpManager::remove_balanced_tablet(int64_t tablet_id) {
+ auto& shard = get_shard(tablet_id);
+ std::lock_guard<std::mutex> lock(shard.mtx);
+ auto it = shard.tablets.find(tablet_id);
+ if (it != shard.tablets.end()) {
+ shard.tablets.erase(it);
+ g_balance_tablet_be_mapping_size << -1;
+ VLOG_DEBUG << "Removed balanced warm up cache tablet by timer,
tablet_id=" << tablet_id;
+ }
+}
+
+void CloudWarmUpManager::remove_balanced_tablets(const std::vector<int64_t>&
tablet_ids) {
+ // Group tablet_ids by shard to minimize lock contention
+ std::array<std::vector<int64_t>, SHARD_COUNT> shard_groups;
+ for (int64_t tablet_id : tablet_ids) {
+ shard_groups[get_shard_index(tablet_id)].push_back(tablet_id);
+ }
+
+ // Process each shard
+ for (size_t i = 0; i < SHARD_COUNT; ++i) {
+ if (shard_groups[i].empty()) continue;
+
+ auto& shard = _balanced_tablets_shards[i];
+ std::lock_guard<std::mutex> lock(shard.mtx);
+ for (int64_t tablet_id : shard_groups[i]) {
+ auto it = shard.tablets.find(tablet_id);
+ if (it != shard.tablets.end()) {
+ shard.tablets.erase(it);
+ g_balance_tablet_be_mapping_size << -1;
+ VLOG_DEBUG << "Removed balanced warm up cache tablet:
tablet_id=" << tablet_id;
+ }
+ }
+ }
+}
+
+std::unordered_map<int64_t, std::pair<std::string, int32_t>>
+CloudWarmUpManager::get_all_balanced_tablets() const {
+ std::unordered_map<int64_t, std::pair<std::string, int32_t>> result;
+
+ // Lock all shards to get consistent snapshot
+ std::array<std::unique_lock<std::mutex>, SHARD_COUNT> locks;
+ for (size_t i = 0; i < SHARD_COUNT; ++i) {
+ locks[i] =
std::unique_lock<std::mutex>(_balanced_tablets_shards[i].mtx);
+ }
+
+ for (const auto& shard : _balanced_tablets_shards) {
+ for (const auto& [tablet_id, entry] : shard.tablets) {
+ result.emplace(tablet_id, std::make_pair(entry.be_ip,
entry.brpc_port));
+ }
+ }
+ return result;
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index dfa4f6e2be6..8725dc06939 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -48,6 +48,10 @@ struct JobMeta {
std::vector<int64_t> tablet_ids;
};
+// manager for
+// table warm up
+// cluster warm up
+// balance peer addr cache
class CloudWarmUpManager {
public:
explicit CloudWarmUpManager(CloudStorageEngine& engine);
@@ -85,6 +89,14 @@ public:
void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>&
rowsets);
+ // Balance warm up cache management methods
+ void record_balanced_tablet(int64_t tablet_id, const std::string& host,
int32_t brpc_port);
+ std::optional<std::pair<std::string, int32_t>>
get_balanced_tablet_info(int64_t tablet_id);
+ void remove_balanced_tablet(int64_t tablet_id);
+ void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
+ bool is_balanced_tablet_expired(const
std::chrono::system_clock::time_point& ctime) const;
+ std::unordered_map<int64_t, std::pair<std::string, int32_t>>
get_all_balanced_tablets() const;
+
private:
void handle_jobs();
@@ -120,6 +132,22 @@ private:
std::unordered_map<int64_t, Cache> _tablet_replica_cache;
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPoolToken> _thread_pool_token;
+
+ // Sharded lock for better performance
+ static constexpr size_t SHARD_COUNT = 10240;
+ struct Shard {
+ mutable std::mutex mtx;
+ std::unordered_map<int64_t, JobMeta> tablets;
+ };
+ std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;
+
+ // Helper methods for shard operations
+ size_t get_shard_index(int64_t tablet_id) const {
+ return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
+ }
+ Shard& get_shard(int64_t tablet_id) {
+ return _balanced_tablets_shards[get_shard_index(tablet_id)];
+ }
};
} // namespace doris
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 5f7c94428a5..ab31b9868f5 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -135,5 +135,11 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");
DEFINE_mDouble(standby_compaction_version_ratio, "0.8");
+DEFINE_mBool(enable_cache_read_from_peer, "false");
+
+// Cache the expiration time of the peer address.
+// This can be configured to be less than the
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
+// If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting
in the `fe` configuration as the expiration time.
+DEFINE_mInt64(cache_read_from_peer_expired_seconds, "-1");
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 3542808992b..ce4c7e0cd8c 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -178,5 +178,9 @@ DECLARE_mBool(enable_standby_passive_compaction);
DECLARE_mDouble(standby_compaction_version_ratio);
+DECLARE_mBool(enable_cache_read_from_peer);
+
+DECLARE_mInt64(cache_read_from_peer_expired_seconds);
+
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/http/action/file_cache_action.cpp
b/be/src/http/action/file_cache_action.cpp
index dbd8dcc3b5e..882dc895480 100644
--- a/be/src/http/action/file_cache_action.cpp
+++ b/be/src/http/action/file_cache_action.cpp
@@ -73,6 +73,11 @@ Status FileCacheAction::_handle_header(HttpRequest* req,
std::string* json_metri
json[RELEASED_ELEMENTS.data()] = released;
*json_metrics = json.ToString();
} else if (operation == CLEAR) {
+ DBUG_EXECUTE_IF("FileCacheAction._handle_header.ignore_clear", {
+ LOG_WARNING("debug point
FileCacheAction._handle_header.ignore_clear");
+ st = Status::OK();
+ return st;
+ });
const std::string& sync = req->param(SYNC.data());
const std::string& segment_path = req->param(VALUE.data());
if (segment_path.empty()) {
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 46ce90aafcc..851ff386e4d 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -20,7 +20,9 @@
#include "io/cache/block_file_cache_downloader.h"
+#include <bthread/bthread.h>
#include <bthread/countdown_event.h>
+#include <bthread/unstable.h>
#include <bvar/bvar.h>
#include <fmt/core.h>
#include <gen_cpp/internal_service.pb.h>
@@ -31,6 +33,7 @@
#include <variant>
#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/cloud_warm_up_manager.h"
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
@@ -170,6 +173,14 @@ std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}
+static void clean_up_expired_mappings(void* arg) {
+ // Reclaim ownership with unique_ptr for automatic memory management
+ std::unique_ptr<int64_t> tablet_id(static_cast<int64_t*>(arg));
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ manager.remove_balanced_tablet(*tablet_id);
+ VLOG_DEBUG << "Removed expired balanced warm up cache tablet: tablet_id="
<< *tablet_id;
+}
+
void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::unordered_set<int64_t> synced_tablets;
@@ -225,8 +236,27 @@ void FileCacheBlockDownloader::download_file_cache_block(
<< "]";
}
}
+ // Use std::make_unique to avoid raw pointer allocation
+ auto tablet_id_ptr = std::make_unique<int64_t>(tablet_id);
+ unsigned long expired_ms = g_tablet_report_inactive_duration_ms;
+ if (doris::config::cache_read_from_peer_expired_seconds > 0 &&
+ doris::config::cache_read_from_peer_expired_seconds <=
+ g_tablet_report_inactive_duration_ms / 1000) {
+ expired_ms =
doris::config::cache_read_from_peer_expired_seconds * 1000;
+ }
+ bthread_timer_t timer_id;
+ // ATTN: The timer callback will reclaim ownership of the
tablet_id_ptr, so we need to release it after the timer is added.
+ if (const int rc =
+ bthread_timer_add(&timer_id,
butil::milliseconds_from_now(expired_ms),
+ clean_up_expired_mappings,
tablet_id_ptr.get());
+ rc == 0) {
+ tablet_id_ptr.release();
+ } else {
+ LOG(WARNING) << "Fail to add timer for clean up expired
mappings for tablet_id="
+ << tablet_id << " rc=" << rc;
+ }
LOG(INFO) << "download_file_cache_block: download_done,
tablet_Id=" << tablet_id
- << "status=" << st.to_string();
+ << " status=" << st.to_string() << " expired_ms=" <<
expired_ms;
};
std::string path;
@@ -323,9 +353,16 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
void FileCacheBlockDownloader::download_blocks(DownloadTask& task) {
switch (task.task_message.index()) {
- case 0:
- download_file_cache_block(std::get<0>(task.task_message));
+ case 0: {
+ bool should_balance_task = true;
+
DBUG_EXECUTE_IF("FileCacheBlockDownloader.download_blocks.balance_task",
+ { should_balance_task = false; });
+ if (should_balance_task) {
+ download_file_cache_block(std::get<0>(task.task_message));
+ }
+
break;
+ }
case 1:
download_segment_file(std::get<1>(task.task_message));
break;
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index 598baa8e857..f99a9112d09 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -41,6 +41,7 @@
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
+#include "util/slice.h"
#include "vec/core/block.h"
namespace doris {
@@ -119,6 +120,40 @@ Status FileCacheFactory::create_file_cache(const
std::string& cache_base_path,
return Status::OK();
}
+std::vector<doris::CacheBlockPB>
FileCacheFactory::get_cache_data_by_path(const std::string& path) {
+ auto cache_hash = BlockFileCache::hash(path);
+ return get_cache_data_by_path(cache_hash);
+}
+
+std::vector<doris::CacheBlockPB> FileCacheFactory::get_cache_data_by_path(
+ const UInt128Wrapper& hash) {
+ std::vector<doris::CacheBlockPB> ret;
+ BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ return ret;
+ }
+ auto blocks = cache->get_blocks_by_key(hash);
+ for (auto& [offset, fb] : blocks) {
+ doris::CacheBlockPB cb;
+ cb.set_block_offset(static_cast<int64_t>(offset));
+ cb.set_block_size(static_cast<int64_t>(fb->range().size()));
+ // try to read data into bytes
+ std::string data;
+ data.resize(fb->range().size());
+ Slice slice(data.data(), data.size());
+ // read from beginning of this block
+ Status st = fb->read(slice, /*read_offset=*/0);
+ if (st.ok()) {
+ cb.set_data(data);
+ } else {
+ // On read failure, skip setting data but still report meta
+ VLOG_DEBUG << "read cache block failed: " << st;
+ }
+ ret.emplace_back(std::move(cb));
+ }
+ return ret;
+}
+
std::vector<std::string> FileCacheFactory::get_cache_file_by_path(const
UInt128Wrapper& hash) {
io::BlockFileCache* cache =
io::FileCacheFactory::instance()->get_by_path(hash);
auto blocks = cache->get_blocks_by_key(hash);
diff --git a/be/src/io/cache/block_file_cache_factory.h
b/be/src/io/cache/block_file_cache_factory.h
index 8b9f5ae3ccb..837feac7f68 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -27,6 +27,7 @@
#include <vector>
#include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/file_cache_common.h"
namespace doris {
@@ -65,6 +66,11 @@ public:
std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper&
hash);
int64_t get_cache_file_size_by_path(const UInt128Wrapper& hash);
+ // Return cached blocks data for a given key hash
+ std::vector<doris::CacheBlockPB> get_cache_data_by_path(const
UInt128Wrapper& hash);
+ // Convenience overload: compute hash from path and return cached blocks
data
+ std::vector<doris::CacheBlockPB> get_cache_data_by_path(const std::string&
path);
+
BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
get_query_context_holders(
diff --git a/be/src/io/cache/block_file_cache_profile.cpp
b/be/src/io/cache/block_file_cache_profile.cpp
index fe6414b7878..6a9676fbeee 100644
--- a/be/src/io/cache/block_file_cache_profile.cpp
+++ b/be/src/io/cache/block_file_cache_profile.cpp
@@ -30,6 +30,7 @@ std::shared_ptr<AtomicStatistics> FileCacheMetrics::report() {
std::lock_guard lock(_mtx);
output_stats->num_io_bytes_read_from_cache +=
_statistics->num_io_bytes_read_from_cache;
output_stats->num_io_bytes_read_from_remote +=
_statistics->num_io_bytes_read_from_remote;
+ output_stats->num_io_bytes_read_from_peer +=
_statistics->num_io_bytes_read_from_peer;
return output_stats;
}
@@ -43,6 +44,7 @@ void FileCacheMetrics::update(FileCacheStatistics*
input_stats) {
}
_statistics->num_io_bytes_read_from_cache +=
input_stats->bytes_read_from_local;
_statistics->num_io_bytes_read_from_remote +=
input_stats->bytes_read_from_remote;
+ _statistics->num_io_bytes_read_from_peer +=
input_stats->bytes_read_from_peer;
}
void FileCacheMetrics::register_entity() {
@@ -56,8 +58,11 @@ void FileCacheMetrics::update_metrics_callback() {
stats->num_io_bytes_read_from_cache);
DorisMetrics::instance()->num_io_bytes_read_from_remote->set_value(
stats->num_io_bytes_read_from_remote);
+ DorisMetrics::instance()->num_io_bytes_read_from_peer->set_value(
+ stats->num_io_bytes_read_from_peer);
DorisMetrics::instance()->num_io_bytes_read_total->set_value(
- stats->num_io_bytes_read_from_cache +
stats->num_io_bytes_read_from_remote);
+ stats->num_io_bytes_read_from_cache +
stats->num_io_bytes_read_from_remote +
+ stats->num_io_bytes_read_from_peer);
}
FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
@@ -67,8 +72,11 @@
FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumLocalIOTotal",
TUnit::UNIT, cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile,
"NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
+ num_peer_io_total =
+ ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumPeerIOTotal",
TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer",
cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer",
cache_profile, 1);
+ peer_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "PeerIOUseTimer",
cache_profile, 1);
remote_wait_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "WaitOtherDownloaderTimer",
cache_profile, 1);
write_cache_io_timer =
@@ -81,6 +89,8 @@
FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
TUnit::BYTES,
cache_profile, 1);
bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL(profile,
"BytesScannedFromRemote",
TUnit::BYTES,
cache_profile, 1);
+ bytes_scanned_from_peer = ADD_CHILD_COUNTER_WITH_LEVEL(profile,
"BytesScannedFromPeer",
+ TUnit::BYTES,
cache_profile, 1);
read_cache_file_directly_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "ReadCacheFileDirectlyTimer",
cache_profile, 1);
cache_get_or_set_timer =
@@ -93,14 +103,20 @@
FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
profile, "InvertedIndexNumLocalIOTotal", TUnit::UNIT,
cache_profile, 1);
inverted_index_num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexNumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
+ inverted_index_num_peer_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
+ profile, "InvertedIndexNumPeerIOTotal", TUnit::UNIT,
cache_profile, 1);
inverted_index_bytes_scanned_from_cache = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexBytesScannedFromCache", TUnit::BYTES,
cache_profile, 1);
inverted_index_bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexBytesScannedFromRemote", TUnit::BYTES,
cache_profile, 1);
+ inverted_index_bytes_scanned_from_peer = ADD_CHILD_COUNTER_WITH_LEVEL(
+ profile, "InvertedIndexBytesScannedFromPeer", TUnit::BYTES,
cache_profile, 1);
inverted_index_local_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile,
"InvertedIndexLocalIOUseTimer", cache_profile, 1);
inverted_index_remote_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile,
"InvertedIndexRemoteIOUseTimer", cache_profile, 1);
+ inverted_index_peer_io_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexPeerIOUseTimer",
cache_profile, 1);
inverted_index_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexIOTimer",
cache_profile, 1);
}
@@ -108,14 +124,17 @@
FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
void FileCacheProfileReporter::update(const FileCacheStatistics* statistics)
const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
+ COUNTER_UPDATE(num_peer_io_total, statistics->num_peer_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
+ COUNTER_UPDATE(peer_io_timer, statistics->peer_io_timer);
COUNTER_UPDATE(remote_wait_timer, statistics->remote_wait_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
COUNTER_UPDATE(bytes_write_into_cache, statistics->bytes_write_into_cache);
COUNTER_UPDATE(num_skip_cache_io_total,
statistics->num_skip_cache_io_total);
COUNTER_UPDATE(bytes_scanned_from_cache,
statistics->bytes_read_from_local);
COUNTER_UPDATE(bytes_scanned_from_remote,
statistics->bytes_read_from_remote);
+ COUNTER_UPDATE(bytes_scanned_from_peer, statistics->bytes_read_from_peer);
COUNTER_UPDATE(read_cache_file_directly_timer,
statistics->read_cache_file_directly_timer);
COUNTER_UPDATE(cache_get_or_set_timer, statistics->cache_get_or_set_timer);
COUNTER_UPDATE(lock_wait_timer, statistics->lock_wait_timer);
@@ -126,12 +145,16 @@ void FileCacheProfileReporter::update(const
FileCacheStatistics* statistics) con
statistics->inverted_index_num_local_io_total);
COUNTER_UPDATE(inverted_index_num_remote_io_total,
statistics->inverted_index_num_remote_io_total);
+ COUNTER_UPDATE(inverted_index_num_peer_io_total,
statistics->inverted_index_num_peer_io_total);
COUNTER_UPDATE(inverted_index_bytes_scanned_from_cache,
statistics->inverted_index_bytes_read_from_local);
COUNTER_UPDATE(inverted_index_bytes_scanned_from_remote,
statistics->inverted_index_bytes_read_from_remote);
+ COUNTER_UPDATE(inverted_index_bytes_scanned_from_peer,
+ statistics->inverted_index_bytes_read_from_peer);
COUNTER_UPDATE(inverted_index_local_io_timer,
statistics->inverted_index_local_io_timer);
COUNTER_UPDATE(inverted_index_remote_io_timer,
statistics->inverted_index_remote_io_timer);
+ COUNTER_UPDATE(inverted_index_peer_io_timer,
statistics->inverted_index_peer_io_timer);
COUNTER_UPDATE(inverted_index_io_timer,
statistics->inverted_index_io_timer);
}
diff --git a/be/src/io/cache/block_file_cache_profile.h
b/be/src/io/cache/block_file_cache_profile.h
index 903f45a8663..2feb3f26923 100644
--- a/be/src/io/cache/block_file_cache_profile.h
+++ b/be/src/io/cache/block_file_cache_profile.h
@@ -37,6 +37,7 @@ namespace io {
struct AtomicStatistics {
std::atomic<int64_t> num_io_bytes_read_from_cache = 0;
std::atomic<int64_t> num_io_bytes_read_from_remote = 0;
+ std::atomic<int64_t> num_io_bytes_read_from_peer = 0;
};
class FileCacheMetrics {
public:
@@ -66,10 +67,13 @@ private:
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
+ RuntimeProfile::Counter* num_peer_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
+ RuntimeProfile::Counter* bytes_scanned_from_peer = nullptr;
RuntimeProfile::Counter* remote_io_timer = nullptr;
+ RuntimeProfile::Counter* peer_io_timer = nullptr;
RuntimeProfile::Counter* remote_wait_timer = nullptr;
RuntimeProfile::Counter* write_cache_io_timer = nullptr;
RuntimeProfile::Counter* bytes_write_into_cache = nullptr;
@@ -82,10 +86,13 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* inverted_index_num_local_io_total = nullptr;
RuntimeProfile::Counter* inverted_index_num_remote_io_total = nullptr;
+ RuntimeProfile::Counter* inverted_index_num_peer_io_total = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote =
nullptr;
+ RuntimeProfile::Counter* inverted_index_bytes_scanned_from_peer = nullptr;
RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr;
+ RuntimeProfile::Counter* inverted_index_peer_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_io_timer = nullptr;
FileCacheProfileReporter(RuntimeProfile* profile);
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index d8e960d913e..59ed51a6f6b 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -17,15 +17,24 @@
#include "io/cache/cached_remote_file_reader.h"
+#include <brpc/controller.h>
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
-#include <string.h>
#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <cstring>
+#include <functional>
#include <list>
+#include <memory>
+#include <mutex>
+#include <thread>
#include <vector>
+#include "cloud/cloud_warm_up_manager.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "cpp/sync_point.h"
@@ -33,16 +42,26 @@
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/block_file_cache_profile.h"
#include "io/cache/file_block.h"
+#include "io/cache/peer_file_cache_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/local_file_system.h"
#include "io/io_common.h"
+#include "olap/storage_policy.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
+#include "runtime/workload_management/io_throttle.h"
+#include "service/backend_options.h"
#include "util/bit_util.h"
+#include "util/brpc_client_cache.h" // BrpcClientCache
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
namespace doris::io {
-bvar::Adder<uint64_t> remote_read_counter("cached_remote_reader_remote_read");
+bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
+bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read");
bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
@@ -63,6 +82,8 @@ bvar::Window<bvar::Adder<uint64_t>>
g_read_cache_indirect_bytes_1min_window(
bvar::Window<bvar::Adder<uint64_t>>
g_read_cache_indirect_total_bytes_1min_window(
"cached_remote_reader_indirect_total_bytes_1min_window",
&g_read_cache_indirect_total_bytes,
60);
+bvar::Adder<uint64_t> g_failed_get_peer_addr_counter(
+ "cached_remote_reader_failed_get_peer_addr_counter");
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
const FileReaderOptions& opts)
@@ -131,6 +152,126 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
return std::make_pair(align_left, align_size);
}
+namespace {
+std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
+ return StorageResource::parse_tablet_id_from_path(file_path);
+}
+
+// Get peer connection info from tablet_id
+std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+ std::string host = "";
+ int port = 0;
+
+ // Try to get tablet_id from actual path and lookup tablet info
+ if (auto tablet_id = extract_tablet_id(file_path)) {
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
+ } else {
+ LOG_WARNING("get peer connection info not found")
+ .tag("tablet_id", *tablet_id)
+ .tag("file_path", file_path);
+ }
+ } else {
+ LOG_WARNING("parse tablet id from path failed")
+ .tag("tablet_id", "null")
+ .tag("file_path", file_path);
+ }
+
+ DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+ host = dp->param<std::string>("host", "127.0.0.1");
+ port = dp->param("port", 9060);
+ LOG_WARNING("debug point
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
+ .tag("host", host)
+ .tag("port", port);
+ });
+
+ return {host, port};
+}
+
+// Execute peer read with fallback to S3
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ auto [host, port] = get_peer_connection_info(file_path);
+ VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
+ << ", file_path=" << file_path;
+
+ if (host.empty() || port == 0) {
+ g_failed_get_peer_addr_counter << 1;
+ LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is
empty"
+ << ", host=" << host << ", port=" << port
+ << ", file_path=" << file_path;
+ return Status::InternalError("host or port is empty");
+ }
+ SCOPED_RAW_TIMER(&stats.peer_read_timer);
+ peer_read_counter << 1;
+ PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
+ auto st = peer_reader.fetch_blocks(empty_blocks, empty_start,
Slice(buffer.get(), size), &size,
+ io_ctx);
+ if (!st.ok()) {
+ LOG_WARNING("PeerFileCacheReader read from peer failed")
+ .tag("host", host)
+ .tag("port", port)
+ .tag("error", st.msg());
+ }
+ stats.from_peer_cache = true;
+ return st;
+}
+
+// Execute S3 read
+Status execute_s3_read(size_t empty_start, size_t& size,
std::unique_ptr<char[]>& buffer,
+ ReadStatistics& stats, const IOContext* io_ctx,
+ FileReaderSPtr remote_file_reader) {
+ s3_read_counter << 1;
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+ stats.from_peer_cache = false;
+ return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
&size, io_ctx);
+}
+
+} // anonymous namespace
+
+Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockSPtr>& empty_blocks,
+ size_t empty_start,
size_t& size,
+ std::unique_ptr<char[]>&
buffer,
+ ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", {
+ // Determine read type from debug point or default to S3
+ std::string read_type = "s3";
+ read_type = dp->param<std::string>("type", "s3");
+ LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type")
+ .tag("path", path().native())
+ .tag("off", empty_start)
+ .tag("size", size)
+ .tag("type", read_type);
+ // Execute appropriate read strategy
+ if (read_type == "s3") {
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ } else {
+ return execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
+ _is_doris_table, stats, io_ctx);
+ }
+ });
+
+ if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ } else {
+ // first try peer read, if peer failed, fallback to S3
+ // peer timeout is 5 seconds
+ // TODO(dx): here peer and s3 reader need to get data in parallel, and
take the one that is correct and returns first
+ auto st = execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
+ _is_doris_table, stats, io_ctx);
+ if (!st.ok()) {
+ // Fallback to S3
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ }
+ return st;
+ }
+}
+
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
const IOContext* io_ctx) {
size_t already_read = 0;
@@ -148,6 +289,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
*bytes_read = 0;
return Status::OK();
}
+
ReadStatistics stats;
stats.bytes_read += bytes_req;
MonotonicStopWatch read_at_sw;
@@ -274,12 +416,11 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
empty_end = empty_blocks.back()->range().right;
size_t size = empty_end - empty_start + 1;
std::unique_ptr<char[]> buffer(new char[size]);
- {
- remote_read_counter << 1;
- SCOPED_RAW_TIMER(&stats.remote_read_timer);
- RETURN_IF_ERROR(_remote_file_reader->read_at(empty_start,
Slice(buffer.get(), size),
- &size, io_ctx));
- }
+
+ // Determine read type and execute remote read
+ RETURN_IF_ERROR(
+ _execute_remote_read(empty_blocks, empty_start, size, buffer,
stats, io_ctx));
+
for (auto& block : empty_blocks) {
if (block->state() == FileBlock::State::SKIP_CACHE) {
continue;
@@ -371,7 +512,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
<< st.msg() << ", block state=" << block_state;
size_t bytes_read {0};
stats.hit_cache = false;
- remote_read_counter << 1;
+ stats.from_peer_cache = false;
+ s3_read_counter << 1;
SCOPED_RAW_TIMER(&stats.remote_read_timer);
RETURN_IF_ERROR(_remote_file_reader->read_at(
current_offset, Slice(result.data + (current_offset -
offset), read_size),
@@ -400,10 +542,16 @@ void CachedRemoteFileReader::_update_stats(const
ReadStatistics& read_stats,
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
- statis->num_remote_io_total++;
- statis->bytes_read_from_remote += read_stats.bytes_read;
+ if (read_stats.from_peer_cache) {
+ statis->num_peer_io_total++;
+ statis->bytes_read_from_peer += read_stats.bytes_read;
+ statis->peer_io_timer += read_stats.peer_read_timer;
+ } else {
+ statis->num_remote_io_total++;
+ statis->bytes_read_from_remote += read_stats.bytes_read;
+ statis->remote_io_timer += read_stats.remote_read_timer;
+ }
}
- statis->remote_io_timer += read_stats.remote_read_timer;
statis->remote_wait_timer += read_stats.remote_wait_timer;
statis->local_io_timer += read_stats.local_read_timer;
statis->num_skip_cache_io_total += read_stats.skip_cache;
@@ -421,11 +569,17 @@ void CachedRemoteFileReader::_update_stats(const
ReadStatistics& read_stats,
statis->inverted_index_num_local_io_total++;
statis->inverted_index_bytes_read_from_local +=
read_stats.bytes_read;
} else {
- statis->inverted_index_num_remote_io_total++;
- statis->inverted_index_bytes_read_from_remote +=
read_stats.bytes_read;
+ if (read_stats.from_peer_cache) {
+ statis->inverted_index_num_peer_io_total++;
+ statis->inverted_index_bytes_read_from_peer +=
read_stats.bytes_read;
+ statis->inverted_index_peer_io_timer +=
read_stats.peer_read_timer;
+ } else {
+ statis->inverted_index_num_remote_io_total++;
+ statis->inverted_index_bytes_read_from_remote +=
read_stats.bytes_read;
+ statis->inverted_index_remote_io_timer +=
read_stats.remote_read_timer;
+ }
}
statis->inverted_index_local_io_timer += read_stats.local_read_timer;
- statis->inverted_index_remote_io_timer += read_stats.remote_read_timer;
}
g_skip_cache_sum << read_stats.skip_cache;
diff --git a/be/src/io/cache/cached_remote_file_reader.h
b/be/src/io/cache/cached_remote_file_reader.h
index 94e8a5807ba..939471b62ea 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -22,6 +22,7 @@
#include <map>
#include <shared_mutex>
#include <utility>
+#include <vector>
#include "common/status.h"
#include "io/cache/block_file_cache.h"
@@ -60,15 +61,21 @@ protected:
private:
void _insert_file_reader(FileBlockSPtr file_block);
+
+ // Execute remote read (S3 or peer).
+ Status _execute_remote_read(const std::vector<FileBlockSPtr>&
empty_blocks, size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ ReadStatistics& stats, const IOContext*
io_ctx);
+
+ void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
+ bool is_inverted_index) const;
+
bool _is_doris_table;
FileReaderSPtr _remote_file_reader;
UInt128Wrapper _cache_hash;
BlockFileCache* _cache;
std::shared_mutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers;
-
- void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
- bool is_inverted_index) const;
};
} // namespace doris::io
diff --git a/be/src/io/cache/file_cache_common.h
b/be/src/io/cache/file_cache_common.h
index 03d8e6f13cd..090c219236b 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -61,10 +61,12 @@ struct UInt128Wrapper {
struct ReadStatistics {
bool hit_cache = true;
+ bool from_peer_cache = false;
bool skip_cache = false;
int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
+ int64_t peer_read_timer = 0;
int64_t remote_wait_timer = 0; // wait for other downloader
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
diff --git a/be/src/io/cache/peer_file_cache_reader.cpp
b/be/src/io/cache/peer_file_cache_reader.cpp
new file mode 100644
index 00000000000..c034cdce110
--- /dev/null
+++ b/be/src/io/cache/peer_file_cache_reader.cpp
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "io/cache/peer_file_cache_reader.h"
+
+#include <brpc/controller.h>
+#include <bvar/latency_recorder.h>
+#include <bvar/reducer.h>
+#include <fmt/format.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
+#include "util/brpc_client_cache.h"
+#include "util/bvar_helper.h"
+#include "util/debug_points.h"
+#include "util/defer_op.h"
+#include "util/doris_metrics.h"
+#include "util/network_util.h"
+#include "util/runtime_profile.h"
+
+namespace doris::io {
+// read from peer
+
+bvar::Adder<uint64_t> peer_cache_reader_failed_counter("peer_cache_reader",
"failed_counter");
+bvar::Adder<uint64_t> peer_cache_reader_succ_counter("peer_cache_reader",
"succ_counter");
+bvar::LatencyRecorder peer_bytes_per_read("peer_cache_reader",
"bytes_per_read"); // also QPS
+bvar::Adder<uint64_t> peer_cache_reader_total("peer_cache_reader",
"total_num");
+bvar::Adder<uint64_t> peer_cache_being_read("peer_cache_reader",
"file_being_read");
+bvar::Adder<uint64_t> peer_cache_reader_read_counter("peer_cache_reader",
"read_at");
+bvar::LatencyRecorder peer_cache_reader_latency("peer_cache_reader",
"peer_latency");
+bvar::PerSecond<bvar::Adder<uint64_t>>
peer_get_request_qps("peer_cache_reader", "peer_get_request",
+
&peer_cache_reader_read_counter);
+bvar::Adder<uint64_t> peer_bytes_read_total("peer_cache_reader", "bytes_read");
+bvar::PerSecond<bvar::Adder<uint64_t>>
peer_read_througthput("peer_cache_reader",
+
"peer_read_throughput",
+
&peer_bytes_read_total);
+
+PeerFileCacheReader::PeerFileCacheReader(const io::Path& file_path, bool
is_doris_table,
+ std::string host, int port)
+ : _path(file_path), _is_doris_table(is_doris_table), _host(host),
_port(port) {
+ peer_cache_reader_total << 1;
+ peer_cache_being_read << 1;
+}
+
+PeerFileCacheReader::~PeerFileCacheReader() {
+ peer_cache_being_read << -1;
+}
+
+Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>&
blocks, size_t off,
+ Slice s, size_t* bytes_read, const
IOContext* ctx) {
+ VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
+ << " bytes_read=" << *bytes_read;
+ *bytes_read = 0;
+ if (blocks.empty()) {
+ return Status::OK();
+ }
+ if (!_is_doris_table) {
+ return Status::NotSupported("peer cache fetch only supports doris
table segments");
+ }
+
+ PFetchPeerDataRequest req;
+ req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
+ req.set_path(_path.filename().native());
+ for (const auto& blk : blocks) {
+ auto* cb = req.add_cache_req();
+ cb->set_block_offset(static_cast<int64_t>(blk->range().left));
+ cb->set_block_size(static_cast<int64_t>(blk->range().size()));
+ }
+
+ std::string realhost = _host;
+ int port = _port;
+
+ auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+ if (dns_cache == nullptr) {
+ LOG(WARNING) << "DNS cache is not initialized, skipping hostname
resolve";
+ } else if (!is_valid_ip(realhost)) {
+ Status status = dns_cache->get(_host, &realhost);
+ if (!status.ok()) {
+ peer_cache_reader_failed_counter << 1;
+ LOG(WARNING) << "failed to get ip from host " << _host << ": " <<
status.to_string();
+ return Status::InternalError("failed to get ip from host {}",
_host);
+ }
+ }
+ std::string brpc_addr = get_host_port(realhost, port);
+ Status st = Status::OK();
+ std::shared_ptr<PBackendService_Stub> brpc_stub =
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
+ brpc_addr);
+ if (!brpc_stub) {
+ peer_cache_reader_failed_counter << 1;
+ LOG(WARNING) << "failed to get brpc stub " << brpc_addr;
+ st = Status::RpcError("Address {} is wrong", brpc_addr);
+ return st;
+ }
+ LIMIT_REMOTE_SCAN_IO(bytes_read);
+ int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ Defer defer_latency {[&]() {
+ int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ peer_cache_reader_latency << (end_ts - begin_ts);
+ }};
+
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(5000);
+ PFetchPeerDataResponse resp;
+ peer_cache_reader_read_counter << 1;
+ brpc_stub->fetch_peer_data(&cntl, &req, &resp, nullptr);
+ if (cntl.Failed()) {
+ return Status::RpcError(cntl.ErrorText());
+ }
+ if (resp.has_status()) {
+ Status st2 = Status::create(resp.status());
+ if (!st2.ok()) return st2;
+ }
+
+ size_t filled = 0;
+ for (const auto& data : resp.datas()) {
+ if (data.data().empty()) {
+ peer_cache_reader_failed_counter << 1;
+ LOG(WARNING) << "peer cache read empty data" <<
data.block_offset();
+ return Status::InternalError("peer cache read empty data");
+ }
+ int64_t block_off = data.block_offset();
+ size_t rel = block_off > static_cast<int64_t>(off)
+ ? static_cast<size_t>(block_off -
static_cast<int64_t>(off))
+ : 0;
+ size_t can_copy = std::min(s.size - rel,
static_cast<size_t>(data.data().size()));
+ VLOG_DEBUG << "peer cache read data=" << data.block_offset()
+ << " size=" << data.data().size() << " off=" << rel << "
can_copy=" << can_copy;
+ std::memcpy(s.data + rel, data.data().data(), can_copy);
+ filled += can_copy;
+ }
+ VLOG_DEBUG << "peer cache read filled=" << filled;
+ peer_bytes_read_total << filled;
+ *bytes_read = filled;
+ peer_bytes_per_read << filled;
+ if (filled != s.size) {
+ peer_cache_reader_failed_counter << 1;
+ return Status::InternalError("peer cache read incomplete: need={},
got={}", s.size, filled);
+ }
+ peer_cache_reader_succ_counter << 1;
+ return Status::OK();
+}
+
+} // namespace doris::io
diff --git a/be/src/io/cache/peer_file_cache_reader.h
b/be/src/io/cache/peer_file_cache_reader.h
new file mode 100644
index 00000000000..f028bc2cd91
--- /dev/null
+++ b/be/src/io/cache/peer_file_cache_reader.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "io/cache/file_block.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/s3_file_system.h"
+#include "util/slice.h"
+
+namespace doris {
+class RuntimeProfile;
+
+namespace io {
+struct IOContext;
+
+class PeerFileCacheReader final {
+public:
+ /**
+ * Construct a peer file cache reader bound to a specific file and peer
endpoint.
+ *
+ * Params:
+ * - file_path: Path of the target file whose cache blocks will be fetched
from a peer.
+ * - is_doris_table: Whether the target file is a Doris table segment;
only true is supported.
+ * - host: Peer hostname or IP address to fetch from.
+ * - port: Peer BRPC service port.
+ */
+ PeerFileCacheReader(const io::Path& file_path, bool is_doris_table,
std::string host, int port);
+ ~PeerFileCacheReader();
+ /**
+ * Fetch data blocks from a peer and write them into the provided buffer.
+ *
+ * Behavior:
+ * - Supports only Doris table segment files (is_doris_table=true);
otherwise returns NotSupported.
+ * - Builds a BRPC request to invoke peer fetch_peer_data using the given
blocks.
+ * - Copies returned block data into the contiguous buffer Slice s, using
'off' as the base offset.
+ * - Succeeds only if exactly s.size bytes are written; otherwise returns
an Incomplete error.
+ *
+ * Params:
+ * - blocks: List of file blocks to fetch (global file offsets, inclusive
ranges).
+ * - off: Base file offset corresponding to the start of Slice s.
+ * - s: Destination buffer; must be large enough to hold all requested
block bytes.
+ * - n: Output number of bytes successfully written.
+ * - ctx: IO context (kept for interface symmetry).
+ *
+ * Returns:
+ * - OK: Successfully wrote exactly s.size bytes into the buffer.
+ * - NotSupported: The file is not a Doris table segment.
+ */
+ Status fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off,
Slice s,
+ size_t* bytes_read, const IOContext* ctx);
+
+private:
+ io::Path _path;
+ bool _is_doris_table {false};
+ std::string _host = "127.0.0.1";
+ int _port = 9060;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index 79efa500c06..e6d8527e831 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -57,6 +57,8 @@ struct FileReaderOptions {
int64_t file_size = -1;
// Use modification time to determine whether the file is changed
int64_t mtime = 0;
+ // Used to query the location of the file cache
+ int64_t tablet_id = -1;
static const FileReaderOptions DEFAULT;
};
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index f89c3f0196e..710ec428b5b 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -57,6 +57,7 @@ bvar::PerSecond<bvar::Adder<uint64_t>>
s3_read_througthput("s3_file_reader", "s3
// record successfull request, and s3_get_request_qps will record all request.
bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader",
"s3_get_request",
&s3_file_reader_read_counter);
+bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency");
Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const
ObjClientHolder> client,
std::string bucket, std::string
key, int64_t file_size,
@@ -114,6 +115,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
+ VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req
+ << " req=" << result.size << " file size=" << _file_size;
if (UNLIKELY(bytes_req == 0)) {
*bytes_read = 0;
return Status::OK();
@@ -129,15 +132,23 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum
wait time in milliseconds
const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s,
8s for each backoff
+ int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
LIMIT_REMOTE_SCAN_IO(bytes_read);
-
DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", {
auto sleep_time = dp->param("sleep", 3);
- LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s",
sleep_time)
+ LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {}
s", sleep_time)
.tag("bucket", _bucket)
.tag("key", _key);
- std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+ std::this_thread::sleep_for(std::chrono::microseconds(sleep_time));
});
+ Defer defer_latency {[&]() {
+ int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ s3_file_reader_latency << (end_ts - begin_ts);
+ }};
SCOPED_RAW_TIMER(&_s3_stats.total_get_request_time_ns);
int total_sleep_time = 0;
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 4ce57b4954e..311fa1b01de 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -45,10 +45,13 @@ struct FileReaderStats {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
+ int64_t num_peer_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
+ int64_t bytes_read_from_peer = 0;
int64_t remote_io_timer = 0;
+ int64_t peer_io_timer = 0;
int64_t remote_wait_timer = 0;
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
@@ -61,10 +64,13 @@ struct FileCacheStatistics {
int64_t inverted_index_num_local_io_total = 0;
int64_t inverted_index_num_remote_io_total = 0;
+ int64_t inverted_index_num_peer_io_total = 0;
int64_t inverted_index_bytes_read_from_local = 0;
int64_t inverted_index_bytes_read_from_remote = 0;
+ int64_t inverted_index_bytes_read_from_peer = 0;
int64_t inverted_index_local_io_timer = 0;
int64_t inverted_index_remote_io_timer = 0;
+ int64_t inverted_index_peer_io_timer = 0;
int64_t inverted_index_io_timer = 0;
};
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 3032e0b7cbc..3765ac24220 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -198,6 +198,7 @@ Status BetaRowset::load_segment(int64_t seg_id,
OlapReaderStatistics* stats,
.is_doris_table = true,
.cache_base_path = "",
.file_size =
_rowset_meta->segment_file_size(static_cast<int>(seg_id)),
+ .tablet_id = _rowset_meta->tablet_id(),
};
auto s = segment_v2::Segment::open(
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index ac03f17c7a9..bf66bd064ae 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -21,7 +21,9 @@
#include <algorithm>
#include <cstdlib>
+#include <cstring>
#include <mutex>
+#include <ranges>
#include <unordered_map>
#include "gen_cpp/cloud.pb.h"
@@ -198,6 +200,90 @@ std::string StorageResource::remote_segment_path(const
RowsetMeta& rowset, int64
}
}
+// TODO(dx)
+// fix this, it is a tricky function. Pass the upper layer's tablet ID to the
io layer instead of using this tricky method
+// Tricky, It is used to parse tablet_id from remote segment path, and it is
used in tablet manager to parse tablet_id from remote segment path.
+// Static function to parse tablet_id from remote segment path
+std::optional<int64_t> StorageResource::parse_tablet_id_from_path(const
std::string& path) {
+ // Expected path formats:
+ // support both .dat and .idx file extensions
+ // support formate see ut.
storage_resource_test:StorageResourceTest.ParseTabletIdFromPath
+
+ if (path.empty()) {
+ return std::nullopt;
+ }
+
+ // Find the position of "data/" in the path
+ std::string_view path_view = path;
+ std::string_view data_prefix = DATA_PREFIX;
+ size_t data_pos = path_view.find(data_prefix);
+ if (data_pos == std::string_view::npos) {
+ return std::nullopt;
+ }
+
+ // Extract the part after "data/"
+ path_view = path_view.substr(data_pos + data_prefix.length() + 1);
+
+ // Check if path ends with .dat or .idx
+ if (!path_view.ends_with(".dat") && !path_view.ends_with(".idx")) {
+ return std::nullopt;
+ }
+
+ // Count slashes in the remaining path
+ size_t slash_count = 0;
+ for (char c : path_view) {
+ if (c == '/') {
+ slash_count++;
+ }
+ }
+
+ // Split path by '/'
+ std::vector<std::string_view> parts;
+ size_t start = 0;
+ size_t pos = 0;
+ while ((pos = path_view.find('/', start)) != std::string_view::npos) {
+ if (pos > start) {
+ parts.push_back(path_view.substr(start, pos - start));
+ }
+ start = pos + 1;
+ }
+ if (start < path_view.length()) {
+ parts.push_back(path_view.substr(start));
+ }
+
+ if (parts.empty()) {
+ return std::nullopt;
+ }
+
+ // Determine path version based on slash count and extract tablet_id
+ // Version 0: {tablet_id}/{rowset_id}_{seg_id}.dat (1 slash)
+ // Version 1: {shard}/{tablet_id}/{rowset_id}/{seg_id}.dat (3 slashes)
+
+ if (slash_count == 1) {
+ // Version 0 format: parts[0] should be tablet_id
+ if (parts.size() >= 1) {
+ try {
+ int64_t tablet_id = std::stoll(std::string(parts[0]));
+ return tablet_id;
+ } catch (const std::exception&) {
+ // Not a valid number, return nullopt at last
+ }
+ }
+ } else if (slash_count == 3) {
+ // Version 1 format: parts[1] should be tablet_id (parts[0] is shard)
+ if (parts.size() >= 2) {
+ try {
+ int64_t tablet_id = std::stoll(std::string(parts[1]));
+ return tablet_id;
+ } catch (const std::exception&) {
+ // Not a valid number, return nullopt at last
+ }
+ }
+ }
+
+ return std::nullopt;
+}
+
std::string StorageResource::remote_idx_v1_path(const RowsetMeta& rowset,
int64_t seg_id,
int64_t index_id,
std::string_view
index_path_suffix) const {
diff --git a/be/src/olap/storage_policy.h b/be/src/olap/storage_policy.h
index 3d0700633ef..b3d8058eb6d 100644
--- a/be/src/olap/storage_policy.h
+++ b/be/src/olap/storage_policy.h
@@ -85,6 +85,9 @@ struct StorageResource {
std::string cooldown_tablet_meta_path(int64_t tablet_id, int64_t
replica_id,
int64_t cooldown_term) const;
+
+ // Static function to parse tablet_id from remote segment path
+ static std::optional<int64_t> parse_tablet_id_from_path(const std::string&
path);
};
// return nullptr if not found
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 0c3a713aab7..625fbe1d264 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -225,6 +225,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing,
MetricUnit::FILESYSTEM)
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote,
MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_peer,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(udf_close_bthread_count,
MetricUnit::OPERATIONS);
@@ -394,6 +395,7 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_cache);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_remote);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_peer);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
udf_close_bthread_count);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 6e1c3e1a30e..5be9b40109f 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -243,6 +243,7 @@ public:
IntCounter* num_io_bytes_read_total = nullptr;
IntCounter* num_io_bytes_read_from_cache = nullptr;
IntCounter* num_io_bytes_read_from_remote = nullptr;
+ IntCounter* num_io_bytes_read_from_peer = nullptr;
IntCounter* udf_close_bthread_count = nullptr;
diff --git a/be/test/olap/storage_resource_test.cpp
b/be/test/olap/storage_resource_test.cpp
index 2db0f59644b..62123705e4d 100644
--- a/be/test/olap/storage_resource_test.cpp
+++ b/be/test/olap/storage_resource_test.cpp
@@ -81,4 +81,113 @@ TEST(StorageResourceTest, RemotePath) {
ASSERT_DEATH(StorageResource(res.value(), storage_vault_pb.path_format()),
"unknown");
}
+TEST(StorageResourceTest, ParseTabletIdFromPath) {
+ // Test Version 0 format: data/{tablet_id}/{rowset_id}_{seg_id}.dat
+ // see function StorageResource::remote_segment_path
+ // fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id,
seg_id);
+ EXPECT_EQ(
+ StorageResource::parse_tablet_id_from_path(
+
"prefix_xxx/data/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be_5.dat"),
+ 10005);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("//data/12345/rowset_001_0.dat"),
12345);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowset_abc_10.dat"),
999999);
+
+ // Test Version 0 format with .idx files (v1 format)
+ // see function StorageResource::remote_idx_v1_path
+ // fmt::format("{}/{}/{}_{}_{}{}.idx", DATA_PREFIX, rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id, index_id, suffix);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+
"//data/10005/0200000000001cc2224124562e7_6_6666_suffix.idx"),
+ 10005);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+ "bucket_xxx/data/12345/rowsetid_1_666_suffix.idx"),
+ 12345);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowsetid_10_8888_suffix.idx"),
+ 999999);
+
+ // Test Version 0 format with .idx files (v2 format)
+ // see function StorageResource::remote_idx_v2_path
+ // fmt::format("{}/{}/{}_{}.idx", DATA_PREFIX, rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+
"s3://prefix_bucket/data/10005/0200000000001cc2224124562e7_5.idx"),
+ 10005);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/12345/rowset001_0.idx"),
12345);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowsetabc_10.idx"),
999999);
+
+ // Test Version 1 format: data/{shard}/{tablet_id}/{rowset_id}/{seg_id}.dat
+ // see function StorageResource::remote_segment_path
+ // fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX,
shard_fn(rowset.tablet_id()), rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+
"prefix_xxxx/data/611/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be/"
+ "5.dat"),
+ 10005);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/0/12345/rowset_001/0.dat"),
12345);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("s3:///data/999/999999/rowset_abc/10.dat"),
+ 999999);
+
+ // Test Version 1 format with .idx files (v1 format)
+ // see function StorageResource::remote_idx_v1_path
+ // fmt::format("{}/{}/{}/{}/{}_{}{}.idx", DATA_PREFIX,
shard_fn(rowset.tablet_id()), rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id, index_id, suffix);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+
"s3:///data/611/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be/"
+ "5_6666_suffix.idx"),
+ 10005);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+ "prefix_bucket/data/0/12345/rowsetid/1_666_suffix.idx"),
+ 12345);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+ "data/999/999999/rowsetid/10_8888_suffix.idx"),
+ 999999);
+
+ // Test Version 1 format with .idx files (v2 format)
+ // see function StorageResource::remote_idx_v2_path
+ // fmt::format("{}/{}/{}/{}/{}.idx", DATA_PREFIX,
shard_fn(rowset.tablet_id()), rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+ "s3://prefix_bucket/data/611/10005/"
+
"0200000000001cc2224124562e7dfd4834d031b13c0210be/5.idx"),
+ 10005);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/0/12345/rowset001/0.idx"),
12345);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999/999999/rowsetabc/10.idx"),
+ 999999);
+
+ // Test edge cases
+ // fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id,
seg_id);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("prefix_bucket/data/0/rowset001_0.dat"),
+ 0);
+ // fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX,
shard_fn(rowset.tablet_id()), rowset.tablet_id(),
rowset.rowset_id().to_string(), seg_id);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/0/0/rowset001/0.dat"),
0);
+
+ // Test invalid cases
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(""), std::nullopt);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path("invalid_path"),
std::nullopt);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/"),
std::nullopt);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/abc/rowset_001_0.dat"),
+ std::nullopt);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+ "s3://prefix_bucket/data/0/abc/rowset_001/0.dat"),
+ std::nullopt);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001_0.txt"),
+ std::nullopt);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001_0"),
std::nullopt);
+
+ // Test paths with different slash counts (should return nullopt)
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001/extra/0.dat"),
+ std::nullopt);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/10005/rowset_001/extra/0.idx"),
+ std::nullopt);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path(
+ "prefix_bucket/data/10005/rowset_001/extra/0.dat"),
+ std::nullopt);
+ EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005.dat"),
std::nullopt);
+
+ // Test paths without data prefix
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("10005/rowset_001_0.dat"),
std::nullopt);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("0/12345/rowset_001/0.dat"),
std::nullopt);
+
+ // Test paths with leading slash after data prefix
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data//10005/rowset_001_0.dat"),
+ std::nullopt);
+
EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data//0/12345/rowset_001/0.dat"),
+ std::nullopt);
+}
+
} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 3cf51e31613..8b0e351f306 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -267,7 +267,8 @@ public class HeartbeatMgr extends MasterDaemon {
if (Config.isCloudMode()) {
String cloudUniqueId =
backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
-
copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds);
+ long reportInterval =
Config.rehash_tablet_after_be_dead_seconds * 1000L;
+
copiedMasterInfo.setTabletReportInactiveDurationMs(reportInterval);
TCloudClusterInfo clusterInfo = new TCloudClusterInfo();
clusterInfo.setIsStandby(backend.isInStandbyCluster());
copiedMasterInfo.setCloudClusterInfo(clusterInfo);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 1ddfbcf2502..f800e924e2a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -1081,6 +1081,40 @@ message PGetTabletRowsetsResponse {
optional DeleteBitmapPB delete_bitmap = 3;
}
+message CacheBlockReqest {
+ // PEER_FILE_CACHE_BLOCK
+ optional int64 block_offset = 1;
+ optional int64 block_size = 2;
+}
+
+message PFetchPeerDataRequest {
+ enum Type {
+ PEER_FILE_RANGE = 1;
+ PEER_FILE_CACHE_BLOCK = 2;
+ }
+ optional Type type = 1;
+ // obj path, let peer calc hash, and download file cache
+ // PEER_FILE_RANGE and PEER_FILE_CACHE_BLOCK use
+ optional string path = 2;
+
+ // PEER_FILE_RANGE
+ optional int64 file_offset = 3;
+ optional int64 file_size = 4;
+ // PEER_FILE_CACHE_BLOCK
+ repeated CacheBlockReqest cache_req = 5;
+}
+
+message CacheBlockPB {
+ optional int64 block_offset = 1;
+ optional int64 block_size = 2;
+ optional bytes data = 3;
+}
+
+message PFetchPeerDataResponse {
+ optional PStatus status = 1;
+ repeated CacheBlockPB datas = 2;
+}
+
service PBackendService {
// If #fragments of a query is < 3, use exec_plan_fragment directly.
// If #fragments of a query is >=3, use exec_plan_fragment_prepare +
exec_plan_fragment_start
@@ -1137,5 +1171,6 @@ service PBackendService {
rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns
(PCommitRefreshDictionaryResponse);
rpc abort_refresh_dictionary(PAbortRefreshDictionaryRequest) returns
(PAbortRefreshDictionaryResponse);
rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns
(PGetTabletRowsetsResponse);
+ rpc fetch_peer_data(PFetchPeerDataRequest) returns
(PFetchPeerDataResponse);
};
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy
index 2e24ae1f7bc..824ac4098e1 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy
@@ -65,7 +65,8 @@ class ProfileAction implements SuiteAction {
}
def httpCli = new HttpCliAction(context)
- httpCli.endpoint(context.config.feHttpAddress)
+ def addr = context.getFeHttpAddress()
+ httpCli.endpoint("${addr.hostString}:${addr.port}")
httpCli.uri("/rest/v1/query_profile")
httpCli.op("get")
httpCli.printResponse(false)
@@ -89,7 +90,7 @@ class ProfileAction implements SuiteAction {
def profileId = profileItem["Profile ID"].toString()
def profileCli = new HttpCliAction(context)
- profileCli.endpoint(context.config.feHttpAddress)
+ profileCli.endpoint("${addr.hostString}:${addr.port}")
profileCli.uri("/rest/v1/query_profile/${profileId}")
profileCli.op("get")
profileCli.printResponse(false)
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 50476800d75..56eace67c8e 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -35,13 +35,26 @@ suite('test_balance_warm_up', 'docker') {
'report_tablet_interval_seconds=1',
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
- 'sys_log_verbose_modules=*'
+ 'sys_log_verbose_modules=*',
+ 'cache_read_from_peer_expired_seconds=100',
+ 'enable_cache_read_from_peer=true'
]
options.setFeNum(1)
options.setBeNum(1)
options.cloudMode = true
options.enableDebugPoints()
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
def testCase = { table ->
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
@@ -74,6 +87,22 @@ suite('test_balance_warm_up', 'docker') {
insert into $table values (44, 1, 'comment', 'spez', '2006-10-11
23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0),
(46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470],
'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York
Times', [], 0);
"""
+ // more tablets accessed. for test metrics
`balance_tablet_be_mapping_size`
+ sql """CREATE TABLE more_tablets_warm_up_test_tbl (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into more_tablets_warm_up_test_tbl values (1, 'value1'),
(2, 'value2'), (3, 'value3'), (4, 'value4'), (5, 'value5'), (6, 'value6'), (7,
'value7'), (8, 'value8'), (9, 'value9'), (10, 'value10'), (11, 'value11'), (12,
'value12'), (13, 'value13'), (14, 'value14'), (15, 'value15'), (16, 'value16'),
(17, 'value17'), (18, 'value18'), (19, 'value19'), (20, 'value20');
+ """
+
// before add be
def beforeGetFromFe = getTabletAndBeHostFromFe(table)
def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
@@ -128,6 +157,8 @@ suite('test_balance_warm_up', 'docker') {
}
}
+ sql """select count(*) from more_tablets_warm_up_test_tbl"""
+
// from be1 -> be2, warm up this tablet
// after add be
def afterGetFromFe = getTabletAndBeHostFromFe(table)
@@ -179,6 +210,11 @@ suite('test_balance_warm_up', 'docker') {
"Expected cache file pattern ${hashFile} not found in BE
${newAddBe.Host}'s file_cache directory. " +
"Available subdirs: ${subDirs}")
}
+
+ sleep(105 * 1000)
+ // test expired be tablet cache info be removed
+ // after cache_read_from_peer_expired_seconds = 100s
+ assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"balance_tablet_be_mapping_size"))
}
docker(options) {
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy
new file mode 100644
index 00000000000..e2209931a93
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up_use_peer_cache', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=async_warmup',
+ 'cloud_pre_heating_time_limit_sec=30',
+ // disable Auto Analysis Job Executor
+ 'auto_check_statistics_in_minutes=60',
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*',
+ 'cache_read_from_peer_expired_seconds=100',
+ 'enable_cache_read_from_peer=true'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def mergeDirs = { base, add ->
+ base + add.collectEntries { host, hashFiles ->
+ [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+ }
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+
+ // disable cloud balance
+ setFeConfig('enable_cloud_multi_replica', true)
+ cluster.addBackend(1, "compute_cluster")
+
GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader.download_blocks.balance_task")
+ setFeConfig('enable_cloud_multi_replica', false)
+
+ sleep(5 * 1000)
+ sql """
+ insert into $table values (50, '4'), (60, '6')
+ """
+ // version 4, new rs after warm up task
+ def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort,
table, 4)
+ logger.info("cache dir version 4 {}", beforeCacheDirVersion4)
+ def afterMerged23CacheDir = [beforeCacheDirVersion2,
beforeCacheDirVersion3, beforeCacheDirVersion4]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+ logger.info("after version 4 fe tablets {}, be tablets {}, cache dir
{}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir)
+
+ // after cloud_pre_heating_time_limit_sec = 30s
+ sleep(40 * 1000)
+ // after 30s task timeout, check tablet in new be
+
+ def oldBe = sql_return_maparray('show backends').get(0)
+ def newAddBe = sql_return_maparray('show backends').get(1)
+ // balance tablet
+ awaitUntil(500) {
+ def afterWarmUpTaskTimeoutResult = sql_return_maparray """ADMIN
SHOW REPLICA DISTRIBUTION FROM $table"""
+ logger.info("after warm up result {}",
afterWarmUpTaskTimeoutResult)
+ afterWarmUpTaskTimeoutResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ // from be1 -> be2, warm up this tablet
+ // after add be
+ def afterGetFromFe = getTabletAndBeHostFromFe(table)
+ def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+ // version 3
+ def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+ sleep(5 * 1000)
+ // version 4
+ def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort,
table, 4)
+ logger.info("after cache dir version 4 {}", afterCacheDirVersion4)
+
+ def afterMergedCacheDir = [afterCacheDirVersion2,
afterCacheDirVersion3, afterCacheDirVersion4]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+
+ logger.info("after fe tablets {}, be tablets {}, cache dir {}",
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+ def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+ logger.info("new add be cache dir {}", newAddBeCacheDir)
+ assert newAddBeCacheDir.size() != 0
+ assert
afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+ def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+ def dataPath = new File("${be.path}/storage/file_cache")
+ logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+ logger.info("Directory exists: {}", dataPath.exists())
+
+ def subDirs = []
+
+ def collectDirs
+ collectDirs = { File dir ->
+ if (dir.exists()) {
+ dir.eachDir { subDir ->
+ logger.info("Found subdir: {}", subDir.name)
+ subDirs << subDir.name
+ collectDirs(subDir)
+ }
+ }
+ }
+
+ collectDirs(dataPath)
+ logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+ // check new be not have version 2,3,4 cache file
+ newAddBeCacheDir.each { hashFile ->
+ assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} should not found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+
+ // The query triggers reading the file cache from the peer
+ profile("test_balance_warm_up_use_peer_cache_profile") {
+ sql """ set enable_profile = true;"""
+ sql """ set profile_level = 2;"""
+ run {
+ sql """/* test_balance_warm_up_use_peer_cache_profile */
select * from $table"""
+ sleep(1000)
+ }
+
+ check { profileString, exception ->
+ log.info(profileString)
+ // Use a regular expression to match the numeric value inside
parentheses after "NumPeerIOTotal:"
+ def matcher = (profileString =~ /- NumPeerIOTotal:\s+(\d+)/)
+ def total = 0
+ while (matcher.find()) {
+ total += matcher.group(1).toInteger()
+ logger.info("NumPeerIOTotal: {}", matcher.group(1))
+ }
+ assertTrue(total > 0)
+ }
+ }
+
+ subDirs.clear()
+ collectDirs(dataPath)
+ logger.info("after query, BE {} file_cache subdirs: {}",
newAddBe.Host, subDirs)
+ // peer read cache, so it should have version 2,3,4 cache file
+ newAddBeCacheDir.each { hashFile ->
+ assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} should found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+ assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_peer_read"))
+ assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_s3_read"))
+ }
+
+ docker(options) {
+ testCase("test_balance_warm_up_use_peer_cache_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy
new file mode 100644
index 00000000000..a232a14dea1
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=async_warmup',
+ 'cloud_pre_heating_time_limit_sec=30',
+ // disable Auto Analysis Job Executor
+ 'auto_check_statistics_in_minutes=60',
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*',
+ 'cumulative_compaction_min_deltas=5',
+ 'cache_read_from_peer_expired_seconds=100',
+ 'enable_cache_read_from_peer=true'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def mergeDirs = { base, add ->
+ base + add.collectEntries { host, hashFiles ->
+ [(host): base[host] ? (base[host] + hashFiles) : hashFiles]
+ }
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """set enable_file_cache=true"""
+ sql """CREATE TABLE $table (
+ `id` BIGINT,
+ `deleted` TINYINT,
+ `type` String,
+ `author` String,
+ `timestamp` DateTimeV2,
+ `comment` String,
+ `dead` TINYINT,
+ `parent` BIGINT,
+ `poll` BIGINT,
+ `children` Array<BIGINT>,
+ `url` String,
+ `score` INT,
+ `title` String,
+ `parts` Array<INT>,
+ `descendants` INT,
+ INDEX idx_comment (`comment`) USING INVERTED
PROPERTIES("parser" = "english") COMMENT 'inverted index for comment'
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES ("replication_num" = "1");
+ """
+ sql """
+ insert into $table values (344083, 1, 'comment', 'spez',
'2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '',
[], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar
chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0);
+ """
+ sql """
+ insert into $table values (44, 1, 'comment', 'spez', '2006-10-11
23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0),
(46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470],
'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York
Times', [], 0);
+ """
+ sql """
+ insert into $table values (344089, 1, 'comment', 'spez',
'2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '',
[], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar
chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0);
+ """
+ sql """
+ insert into $table values (449, 1, 'comment', 'spez', '2006-10-11
23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0),
(469, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470],
'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York
Times', [], 0);
+ """
+ sql """
+ insert into $table values (344084, 1, 'comment', 'spez',
'2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '',
[], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar
chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0);
+ """
+ sql """
+ insert into $table values (849, 1, 'comment', 'spez', '2006-10-11
23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0),
(869, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470],
'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York
Times', [], 0);
+ """
+ // trigger compaction to generate some cache files
+ trigger_and_wait_compaction(table, "cumulative")
+ sleep(5 * 1000)
+
+ def beforeCacheDirVersion7 = getTabletFileCacheDirFromBe(msHttpPort,
table, 7)
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+
+ // disable cloud balance
+ setFeConfig('enable_cloud_multi_replica', true)
+ cluster.addBackend(1, "compute_cluster")
+
GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader.download_blocks.balance_task")
+ setFeConfig('enable_cloud_multi_replica', false)
+
+
+ def afterMergedVersion7CacheDir = [beforeCacheDirVersion7]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+ logger.info("after version 7 fe tablets {}, be tablets {}, cache dir
{}", beforeGetFromFe, beforeGetFromBe, afterMergedVersion7CacheDir)
+
+ // after cloud_pre_heating_time_limit_sec = 30s
+ sleep(40 * 1000)
+ // after 30s task timeout, check tablet in new be
+
+ def oldBe = sql_return_maparray('show backends').get(0)
+ def newAddBe = sql_return_maparray('show backends').get(1)
+ // balance tablet
+ awaitUntil(500) {
+ def afterWarmUpTaskTimeoutResult = sql_return_maparray """ADMIN
SHOW REPLICA DISTRIBUTION FROM $table"""
+ logger.info("after warm up result {}",
afterWarmUpTaskTimeoutResult)
+ afterWarmUpTaskTimeoutResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ // from be1 -> be2, warm up this tablet
+ // after add be
+ def afterGetFromFe = getTabletAndBeHostFromFe(table)
+ def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 7
+ def afterCacheDirVersion7 = getTabletFileCacheDirFromBe(msHttpPort,
table, 7)
+ logger.info("after cache dir version 7 {}", afterCacheDirVersion7)
+
+ def afterMergedCacheDir = [afterCacheDirVersion7]
+ .inject([:]) { acc, m -> mergeDirs(acc, m) }
+
+ logger.info("after fe tablets {}, be tablets {}, cache dir {}",
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+ // calc file cache hash on new added BE, but these cache files should
not exist on new BE yet
+ def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+ logger.info("new add be cache dir {}", newAddBeCacheDir)
+ assert newAddBeCacheDir.size() != 0
+ assert
afterMergedVersion7CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+ def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+ def dataPath = new File("${be.path}/storage/file_cache")
+ logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+ logger.info("Directory exists: {}", dataPath.exists())
+
+ def subDirs = []
+
+ def collectDirs
+ collectDirs = { File dir ->
+ if (dir.exists()) {
+ dir.eachDir { subDir ->
+ logger.info("Found subdir: {}", subDir.name)
+ subDirs << subDir.name
+ collectDirs(subDir)
+ }
+ }
+ }
+
+ collectDirs(dataPath)
+ logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+ // check new be not have version 7 cache file
+ newAddBeCacheDir.each { hashFile ->
+ assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} should not found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+
+ // The query triggers reading the file cache from the peer
+ profile("test_balance_warm_up_with_compaction_use_peer_cache_profile")
{
+ sql """ set enable_profile = true;"""
+ sql """ set profile_level = 2;"""
+ run {
+ sql """/*
test_balance_warm_up_with_compaction_use_peer_cache_profile */ SELECT count()
FROM $table WHERE comment MATCH_ALL 'Welcome'"""
+ sleep(1000)
+ }
+
+ check { profileString, exception ->
+ log.info(profileString)
+ // Use a regular expression to match the numeric value inside
parentheses after "NumPeerIOTotal:"
+ def matcher = (profileString =~ /-
InvertedIndexNumPeerIOTotal:\s+(\d+)/)
+ def total = 0
+ while (matcher.find()) {
+ total += matcher.group(1).toInteger()
+ logger.info("InvertedIndexNumPeerIOTotal: {}",
matcher.group(1))
+ }
+ assertTrue(total > 0)
+ }
+ }
+ subDirs.clear()
+ collectDirs(dataPath)
+ logger.info("after query, BE {} file_cache subdirs: {}",
newAddBe.Host, subDirs)
+ // peer read cache, so it should have version 7 cache file
+ newAddBeCacheDir.each { hashFile ->
+ assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} should found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+ assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_peer_read"))
+ assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_s3_read"))
+ }
+
+ docker(options) {
+ testCase("test_balance_warm_up_with_compaction_use_peer_cache_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy
b/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy
new file mode 100644
index 00000000000..7ccf118b65f
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.doris.regression.util.NodeType
+
+suite('test_read_from_peer', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'workload_group_check_interval_ms=1'
+ ]
+ options.beConfigs += [
+ 'file_cache_each_block_size=131072',
+ // 'sys_log_verbose_modules=*',
+ 'enable_cache_read_from_peer=true'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def tableName = "test_read_from_table"
+
+ def clusterBe = { clusterName ->
+ def bes = sql_return_maparray "show backends"
+ def clusterBes = bes.findAll { be -> be.Tag.contains(clusterName) }
+ logger.info("cluster {}, bes {}", clusterName, clusterBes)
+ clusterBes[0]
+ }
+
+ def testCase = { String clusterName, String runType ->
+ def startTime = System.currentTimeMillis()
+
GetDebugPoint().enableDebugPointForAllBEs("CachedRemoteFileReader.read_at_impl.change_type",
[type: runType])
+
+ try {
+ sql """
+ use @$clusterName
+ """
+
+ def be = clusterBe(clusterName)
+ def haveCacheBe = clusterBe("compute_cluster")
+
+ switch (runType) {
+ case "peer":
+ GetDebugPoint().enableDebugPoint(be.Host, be.HttpPort as
int, NodeType.BE, "PeerFileCacheReader::_fetch_from_peer_cache_blocks",
+ [host: haveCacheBe.Host, port: haveCacheBe.BrpcPort])
+ break
+ case "s3":
+ break
+ default:
+ throw new IllegalArgumentException("Invalid type:
$runType. Expected: peer, s3")
+ }
+
+ // Execute the query and measure time
+ def queryStartTime = System.currentTimeMillis()
+ def ret = sql """
+ select count(*) from $tableName
+ """
+ logger.info("select ret={}", ret)
+ def queryTime = System.currentTimeMillis() - queryStartTime
+ logger.info("Test completed - Type:{}, Cluster: {}, Query
execution time: {}ms", runType, clusterName, queryTime)
+ } catch (Exception e) {
+ def totalTime = System.currentTimeMillis() - startTime
+ logger.info("Test failed after {}ms - Type: {}, Cluster: {} Error:
{}", totalTime, runType, clusterName, e.message)
+ throw e
+ }
+ }
+
+ docker(options) {
+ // 添加一个新的cluster, 只从s3上读
+ cluster.addBackend(1, "readS3cluster")
+
+ // 添加一个新的cluster, 只从peer上读
+ cluster.addBackend(1, "readPeercluster")
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ ss_sold_date_sk bigint,
+ ss_sold_time_sk bigint,
+ ss_item_sk bigint,
+ ss_customer_sk bigint,
+ ss_cdemo_sk bigint,
+ ss_hdemo_sk bigint,
+ ss_addr_sk bigint,
+ ss_store_sk bigint,
+ ss_promo_sk bigint,
+ ss_ticket_number bigint,
+ ss_quantity integer,
+ ss_wholesale_cost decimal(7,2),
+ ss_list_price decimal(7,2),
+ ss_sales_price decimal(7,2),
+ ss_ext_discount_amt decimal(7,2),
+ ss_ext_sales_price decimal(7,2),
+ ss_ext_wholesale_cost decimal(7,2),
+ ss_ext_list_price decimal(7,2),
+ ss_ext_tax decimal(7,2),
+ ss_coupon_amt decimal(7,2),
+ ss_net_paid decimal(7,2),
+ ss_net_paid_inc_tax decimal(7,2),
+ ss_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk,
ss_customer_sk)
+ DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ )
+ """
+
+ sql """
+ use @compute_cluster
+ """
+
+ // in compute_cluster be-1, cache all data in file cache
+ def txnId = -1;
+ // version 2
+ streamLoad {
+ table "${tableName}"
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+ // file """store_sales.dat.gz"""
+
+ time 10000 // limit inflight 10s
+ setFeAddr cluster.getAllFrontends().get(0).host,
cluster.getAllFrontends().get(0).httpPort
+
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${res}".toString())
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ def ret = sql """
+ select count(*) from $tableName
+ """
+ logger.info("ret after load, ret {}", ret)
+
+ testCase("compute_cluster", "s3")
+ testCase("readS3cluster", "s3")
+ testCase("readPeercluster", "peer")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]