This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3734fcdd17b branch-3.0: [fix](warmup) refresh tablet location cache
and retry on error #54755 (#54757)
3734fcdd17b is described below
commit 3734fcdd17b4a691070a382267fe50502d6b2357
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 15 21:07:53 2025 +0800
branch-3.0: [fix](warmup) refresh tablet location cache and retry on error
#54755 (#54757)
backport #54755
---
be/src/cloud/cloud_internal_service.cpp | 7 +++-
be/src/cloud/cloud_warm_up_manager.cpp | 68 +++++++++++++++++++++++----------
be/src/cloud/cloud_warm_up_manager.h | 7 +++-
gensrc/proto/internal_service.proto | 3 ++
4 files changed, 63 insertions(+), 22 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 39b51b06dd4..9705aca7fdc 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -171,12 +171,17 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
continue;
}
int64_t tablet_id = rs_meta.tablet_id();
+ bool local_only = !(request->has_skip_existence_check() &&
request->skip_existence_check());
auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data =
*/ false,
/* sync_delete_bitmap = */
true,
/* sync_stats = */ nullptr,
- /* local_only = */ true);
+ /* local_only = */
local_only);
if (!res.has_value()) {
LOG_WARNING("Warm up error ").tag("tablet_id",
tablet_id).error(res.error());
+ if (res.error().msg().find("local_only=true") !=
std::string::npos) {
+ res.error().set_code(ErrorCode::TABLE_NOT_FOUND);
+ }
+ res.error().to_protobuf(response->mutable_status());
continue;
}
auto tablet = res.value();
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 57e0a9ad275..5b23f8ebe88 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -415,30 +415,39 @@ Status CloudWarmUpManager::set_event(int64_t job_id,
TWarmUpEventType::type even
return st;
}
-std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t
tablet_id) {
+std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t
tablet_id, bool bypass_cache,
+ bool&
cache_hit) {
std::vector<TReplicaInfo> replicas;
std::vector<int64_t> cancelled_jobs;
std::lock_guard<std::mutex> lock(_mtx);
+ cache_hit = false;
for (auto& [job_id, cache] : _tablet_replica_cache) {
- auto it = cache.find(tablet_id);
- if (it != cache.end()) {
- // check ttl expire
- auto now = std::chrono::steady_clock::now();
- auto sec = std::chrono::duration_cast<std::chrono::seconds>(now -
it->second.first);
- if (sec.count() <
config::warmup_tablet_replica_info_cache_ttl_sec) {
- replicas.push_back(it->second.second);
- LOG(INFO) << "get_replica_info: cache hit, tablet_id=" <<
tablet_id
- << ", job_id=" << job_id;
- continue;
- } else {
- LOG(INFO) << "get_replica_info: cache expired, tablet_id=" <<
tablet_id
- << ", job_id=" << job_id;
- cache.erase(it);
+ if (!bypass_cache) {
+ auto it = cache.find(tablet_id);
+ if (it != cache.end()) {
+ // check ttl expire
+ auto now = std::chrono::steady_clock::now();
+ auto sec =
std::chrono::duration_cast<std::chrono::seconds>(now - it->second.first);
+ if (sec.count() <
config::warmup_tablet_replica_info_cache_ttl_sec) {
+ replicas.push_back(it->second.second);
+ LOG(INFO) << "get_replica_info: cache hit, tablet_id=" <<
tablet_id
+ << ", job_id=" << job_id;
+ cache_hit = true;
+ continue;
+ } else {
+ LOG(INFO) << "get_replica_info: cache expired, tablet_id="
<< tablet_id
+ << ", job_id=" << job_id;
+ cache.erase(it);
+ }
}
+ LOG(INFO) << "get_replica_info: cache miss, tablet_id=" <<
tablet_id
+ << ", job_id=" << job_id;
}
- LOG(INFO) << "get_replica_info: cache miss, tablet_id=" << tablet_id
- << ", job_id=" << job_id;
+ if (!cache_hit) {
+ // We are trying to save one retry by refresh all the remaining
caches
+ bypass_cache = true;
+ }
ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
if (cluster_info == nullptr) {
LOG(WARNING) << "get_replica_info: have not get FE Master
heartbeat yet, job_id="
@@ -506,20 +515,37 @@ std::vector<TReplicaInfo>
CloudWarmUpManager::get_replica_info(int64_t tablet_id
}
void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
- auto replicas = get_replica_info(rs_meta.tablet_id());
+ bool cache_hit = false;
+ auto replicas = get_replica_info(rs_meta.tablet_id(), false, cache_hit);
if (replicas.empty()) {
LOG(INFO) << "There is no need to warmup tablet=" <<
rs_meta.tablet_id()
<< ", skipping rowset=" << rs_meta.rowset_id().to_string();
return;
}
+ Status st = _do_warm_up_rowset(rs_meta, replicas, !cache_hit);
+ if (cache_hit && !st.ok() && st.is<ErrorCode::TABLE_NOT_FOUND>()) {
+ replicas = get_replica_info(rs_meta.tablet_id(), true, cache_hit);
+ st = _do_warm_up_rowset(rs_meta, replicas, true);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to warm up rowset, tablet_id=" <<
rs_meta.tablet_id()
+ << ", rowset_id=" << rs_meta.rowset_id().to_string() <<
", status=" << st;
+ }
+}
+
+Status CloudWarmUpManager::_do_warm_up_rowset(RowsetMeta& rs_meta,
+ std::vector<TReplicaInfo>&
replicas,
+ bool skip_existence_check) {
int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_file_cache_warm_up_rowset_last_call_unix_ts.set_value(now_ts);
+ auto ret_st = Status::OK();
PWarmUpRowsetRequest request;
request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
request.set_unix_ts_us(now_ts);
+ request.set_skip_existence_check(skip_existence_check);
for (auto& replica : replicas) {
// send sync request
std::string host = replica.host;
@@ -531,7 +557,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta&
rs_meta) {
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << replica.host
<< ": "
<< status.to_string();
- return;
+ continue;
}
}
std::string brpc_addr = get_host_port(host, replica.brpc_port);
@@ -587,12 +613,14 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta&
rs_meta) {
PWarmUpRowsetResponse response;
brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr);
}
+ return ret_st;
}
void CloudWarmUpManager::recycle_cache(int64_t tablet_id,
const std::vector<RecycledRowsets>&
rowsets) {
LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets="
<< rowsets.size();
- auto replicas = get_replica_info(tablet_id);
+ bool cache_hit = false;
+ auto replicas = get_replica_info(tablet_id, false, cache_hit);
if (replicas.empty()) {
return;
}
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index 6feef0e9d42..02ad4a7b6e0 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -78,7 +78,12 @@ public:
private:
void handle_jobs();
- std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id);
+
+ Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>&
replicas,
+ bool skip_existence_check);
+
+ std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool
bypass_cache,
+ bool& cache_hit);
void submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 108f7d76138..5fdce934879 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -840,9 +840,12 @@ message PGetFileCacheMetaResponse {
message PWarmUpRowsetRequest {
repeated RowsetMetaPB rowset_metas = 1;
optional int64 unix_ts_us = 2;
+ optional int64 sync_wait_timeout_ms = 3;
+ optional bool skip_existence_check = 4;
}
message PWarmUpRowsetResponse {
+ optional PStatus status = 1;
}
message RecycleCacheMeta {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]