This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3734fcdd17b branch-3.0: [fix](warmup) refresh tablet location cache 
and retry on error #54755 (#54757)
3734fcdd17b is described below

commit 3734fcdd17b4a691070a382267fe50502d6b2357
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 15 21:07:53 2025 +0800

    branch-3.0: [fix](warmup) refresh tablet location cache and retry on error 
#54755 (#54757)
    
    backport #54755
---
 be/src/cloud/cloud_internal_service.cpp |  7 +++-
 be/src/cloud/cloud_warm_up_manager.cpp  | 68 +++++++++++++++++++++++----------
 be/src/cloud/cloud_warm_up_manager.h    |  7 +++-
 gensrc/proto/internal_service.proto     |  3 ++
 4 files changed, 63 insertions(+), 22 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 39b51b06dd4..9705aca7fdc 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -171,12 +171,17 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
             continue;
         }
         int64_t tablet_id = rs_meta.tablet_id();
+        bool local_only = !(request->has_skip_existence_check() && 
request->skip_existence_check());
         auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = 
*/ false,
                                                    /* sync_delete_bitmap = */ 
true,
                                                    /* sync_stats = */ nullptr,
-                                                   /* local_only = */ true);
+                                                   /* local_only = */ 
local_only);
         if (!res.has_value()) {
             LOG_WARNING("Warm up error ").tag("tablet_id", 
tablet_id).error(res.error());
+            if (res.error().msg().find("local_only=true") != 
std::string::npos) {
+                res.error().set_code(ErrorCode::TABLE_NOT_FOUND);
+            }
+            res.error().to_protobuf(response->mutable_status());
             continue;
         }
         auto tablet = res.value();
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index 57e0a9ad275..5b23f8ebe88 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -415,30 +415,39 @@ Status CloudWarmUpManager::set_event(int64_t job_id, 
TWarmUpEventType::type even
     return st;
 }
 
-std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t 
tablet_id) {
+std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t 
tablet_id, bool bypass_cache,
+                                                               bool& 
cache_hit) {
     std::vector<TReplicaInfo> replicas;
     std::vector<int64_t> cancelled_jobs;
     std::lock_guard<std::mutex> lock(_mtx);
+    cache_hit = false;
     for (auto& [job_id, cache] : _tablet_replica_cache) {
-        auto it = cache.find(tablet_id);
-        if (it != cache.end()) {
-            // check ttl expire
-            auto now = std::chrono::steady_clock::now();
-            auto sec = std::chrono::duration_cast<std::chrono::seconds>(now - 
it->second.first);
-            if (sec.count() < 
config::warmup_tablet_replica_info_cache_ttl_sec) {
-                replicas.push_back(it->second.second);
-                LOG(INFO) << "get_replica_info: cache hit, tablet_id=" << 
tablet_id
-                          << ", job_id=" << job_id;
-                continue;
-            } else {
-                LOG(INFO) << "get_replica_info: cache expired, tablet_id=" << 
tablet_id
-                          << ", job_id=" << job_id;
-                cache.erase(it);
+        if (!bypass_cache) {
+            auto it = cache.find(tablet_id);
+            if (it != cache.end()) {
+                // check ttl expire
+                auto now = std::chrono::steady_clock::now();
+                auto sec = 
std::chrono::duration_cast<std::chrono::seconds>(now - it->second.first);
+                if (sec.count() < 
config::warmup_tablet_replica_info_cache_ttl_sec) {
+                    replicas.push_back(it->second.second);
+                    LOG(INFO) << "get_replica_info: cache hit, tablet_id=" << 
tablet_id
+                              << ", job_id=" << job_id;
+                    cache_hit = true;
+                    continue;
+                } else {
+                    LOG(INFO) << "get_replica_info: cache expired, tablet_id=" 
<< tablet_id
+                              << ", job_id=" << job_id;
+                    cache.erase(it);
+                }
             }
+            LOG(INFO) << "get_replica_info: cache miss, tablet_id=" << 
tablet_id
+                      << ", job_id=" << job_id;
         }
-        LOG(INFO) << "get_replica_info: cache miss, tablet_id=" << tablet_id
-                  << ", job_id=" << job_id;
 
+        if (!cache_hit) {
+            // We are trying to save one retry by refresh all the remaining 
caches
+            bypass_cache = true;
+        }
         ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
         if (cluster_info == nullptr) {
             LOG(WARNING) << "get_replica_info: have not get FE Master 
heartbeat yet, job_id="
@@ -506,20 +515,37 @@ std::vector<TReplicaInfo> 
CloudWarmUpManager::get_replica_info(int64_t tablet_id
 }
 
 void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
-    auto replicas = get_replica_info(rs_meta.tablet_id());
+    bool cache_hit = false;
+    auto replicas = get_replica_info(rs_meta.tablet_id(), false, cache_hit);
     if (replicas.empty()) {
         LOG(INFO) << "There is no need to warmup tablet=" << 
rs_meta.tablet_id()
                   << ", skipping rowset=" << rs_meta.rowset_id().to_string();
         return;
     }
+    Status st = _do_warm_up_rowset(rs_meta, replicas, !cache_hit);
+    if (cache_hit && !st.ok() && st.is<ErrorCode::TABLE_NOT_FOUND>()) {
+        replicas = get_replica_info(rs_meta.tablet_id(), true, cache_hit);
+        st = _do_warm_up_rowset(rs_meta, replicas, true);
+    }
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to warm up rowset, tablet_id=" << 
rs_meta.tablet_id()
+                     << ", rowset_id=" << rs_meta.rowset_id().to_string() << 
", status=" << st;
+    }
+}
+
+Status CloudWarmUpManager::_do_warm_up_rowset(RowsetMeta& rs_meta,
+                                              std::vector<TReplicaInfo>& 
replicas,
+                                              bool skip_existence_check) {
     int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
                              
std::chrono::system_clock::now().time_since_epoch())
                              .count();
     g_file_cache_warm_up_rowset_last_call_unix_ts.set_value(now_ts);
+    auto ret_st = Status::OK();
 
     PWarmUpRowsetRequest request;
     request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
     request.set_unix_ts_us(now_ts);
+    request.set_skip_existence_check(skip_existence_check);
     for (auto& replica : replicas) {
         // send sync request
         std::string host = replica.host;
@@ -531,7 +557,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta) {
             if (!status.ok()) {
                 LOG(WARNING) << "failed to get ip from host " << replica.host 
<< ": "
                              << status.to_string();
-                return;
+                continue;
             }
         }
         std::string brpc_addr = get_host_port(host, replica.brpc_port);
@@ -587,12 +613,14 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta) {
         PWarmUpRowsetResponse response;
         brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr);
     }
+    return ret_st;
 }
 
 void CloudWarmUpManager::recycle_cache(int64_t tablet_id,
                                        const std::vector<RecycledRowsets>& 
rowsets) {
     LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets=" 
<< rowsets.size();
-    auto replicas = get_replica_info(tablet_id);
+    bool cache_hit = false;
+    auto replicas = get_replica_info(tablet_id, false, cache_hit);
     if (replicas.empty()) {
         return;
     }
diff --git a/be/src/cloud/cloud_warm_up_manager.h 
b/be/src/cloud/cloud_warm_up_manager.h
index 6feef0e9d42..02ad4a7b6e0 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -78,7 +78,12 @@ public:
 
 private:
     void handle_jobs();
-    std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id);
+
+    Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& 
replicas,
+                              bool skip_existence_check);
+
+    std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool 
bypass_cache,
+                                               bool& cache_hit);
 
     void submit_download_tasks(io::Path path, int64_t file_size, 
io::FileSystemSPtr file_system,
                                int64_t expiration_time,
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 108f7d76138..5fdce934879 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -840,9 +840,12 @@ message PGetFileCacheMetaResponse {
 message PWarmUpRowsetRequest {
     repeated RowsetMetaPB rowset_metas = 1;
     optional int64 unix_ts_us = 2;
+    optional int64 sync_wait_timeout_ms = 3;
+    optional bool skip_existence_check = 4;
 }
 
 message PWarmUpRowsetResponse {
+    optional PStatus status = 1;
 }
 
 message RecycleCacheMeta {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to