This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new f4a37c3deac branch-3.1: [fix](warmup) refresh tablet location cache 
and retry on error #53852 (#54445)
f4a37c3deac is described below

commit f4a37c3deac4502a6d222b863125a52fee7db862
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Aug 14 11:51:13 2025 +0800

    branch-3.1: [fix](warmup) refresh tablet location cache and retry on error 
#53852 (#54445)
    
    ### What problem does this PR solve?
    
    Related PR: #53852
    
    When warm up rowset on event, if the tablet is not found on the target
    BE, it will now return an error to the source BE.
    The source BE will try to refresh the tablet location cache, and retry
    the warmup with `force` flag.
    When the `force` flag is set, the target BE will do the warm up
    regardless of the tablet existence.
---
 be/src/cloud/cloud_internal_service.cpp |  7 ++-
 be/src/cloud/cloud_warm_up_manager.cpp  | 81 ++++++++++++++++++++++++---------
 be/src/cloud/cloud_warm_up_manager.h    |  7 ++-
 gensrc/proto/internal_service.proto     |  2 +
 4 files changed, 73 insertions(+), 24 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index bc6689a4635..08690bbbc8b 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -186,12 +186,17 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
             continue;
         }
         int64_t tablet_id = rs_meta.tablet_id();
+        bool local_only = !(request->has_skip_existence_check() && 
request->skip_existence_check());
         auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = 
*/ false,
                                                    /* sync_delete_bitmap = */ 
true,
                                                    /* sync_stats = */ nullptr,
-                                                   /* local_only = */ true);
+                                                   /* local_only = */ 
local_only);
         if (!res.has_value()) {
             LOG_WARNING("Warm up error ").tag("tablet_id", 
tablet_id).error(res.error());
+            if (res.error().msg().find("local_only=true") != 
std::string::npos) {
+                res.error().set_code(ErrorCode::TABLE_NOT_FOUND);
+            }
+            res.error().to_protobuf(response->mutable_status());
             continue;
         }
         auto tablet = res.value();
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index 833cb18d95c..bc14e607a6b 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -402,30 +402,39 @@ Status CloudWarmUpManager::set_event(int64_t job_id, 
TWarmUpEventType::type even
     return st;
 }
 
-std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t 
tablet_id) {
+std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t 
tablet_id, bool bypass_cache,
+                                                               bool& 
cache_hit) {
     std::vector<TReplicaInfo> replicas;
     std::vector<int64_t> cancelled_jobs;
     std::lock_guard<std::mutex> lock(_mtx);
+    cache_hit = false;
     for (auto& [job_id, cache] : _tablet_replica_cache) {
-        auto it = cache.find(tablet_id);
-        if (it != cache.end()) {
-            // check ttl expire
-            auto now = std::chrono::steady_clock::now();
-            auto sec = std::chrono::duration_cast<std::chrono::seconds>(now - 
it->second.first);
-            if (sec.count() < 
config::warmup_tablet_replica_info_cache_ttl_sec) {
-                replicas.push_back(it->second.second);
-                LOG(INFO) << "get_replica_info: cache hit, tablet_id=" << 
tablet_id
-                          << ", job_id=" << job_id;
-                continue;
-            } else {
-                LOG(INFO) << "get_replica_info: cache expired, tablet_id=" << 
tablet_id
-                          << ", job_id=" << job_id;
-                cache.erase(it);
+        if (!bypass_cache) {
+            auto it = cache.find(tablet_id);
+            if (it != cache.end()) {
+                // check ttl expire
+                auto now = std::chrono::steady_clock::now();
+                auto sec = 
std::chrono::duration_cast<std::chrono::seconds>(now - it->second.first);
+                if (sec.count() < 
config::warmup_tablet_replica_info_cache_ttl_sec) {
+                    replicas.push_back(it->second.second);
+                    LOG(INFO) << "get_replica_info: cache hit, tablet_id=" << 
tablet_id
+                              << ", job_id=" << job_id;
+                    cache_hit = true;
+                    continue;
+                } else {
+                    LOG(INFO) << "get_replica_info: cache expired, tablet_id=" 
<< tablet_id
+                              << ", job_id=" << job_id;
+                    cache.erase(it);
+                }
             }
+            LOG(INFO) << "get_replica_info: cache miss, tablet_id=" << 
tablet_id
+                      << ", job_id=" << job_id;
         }
-        LOG(INFO) << "get_replica_info: cache miss, tablet_id=" << tablet_id
-                  << ", job_id=" << job_id;
 
+        if (!cache_hit) {
+            // We are trying to save one retry by refresh all the remaining 
caches
+            bypass_cache = true;
+        }
         ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
         if (cluster_info == nullptr) {
             LOG(WARNING) << "get_replica_info: have not get FE Master 
heartbeat yet, job_id="
@@ -493,22 +502,40 @@ std::vector<TReplicaInfo> 
CloudWarmUpManager::get_replica_info(int64_t tablet_id
 }
 
 void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t 
sync_wait_timeout_ms) {
-    auto tablet_id = rs_meta.tablet_id();
-    auto replicas = get_replica_info(tablet_id);
+    bool cache_hit = false;
+    auto replicas = get_replica_info(rs_meta.tablet_id(), false, cache_hit);
     if (replicas.empty()) {
         LOG(INFO) << "There is no need to warmup tablet=" << 
rs_meta.tablet_id()
                   << ", skipping rowset=" << rs_meta.rowset_id().to_string();
         return;
     }
+    Status st = _do_warm_up_rowset(rs_meta, replicas, sync_wait_timeout_ms, 
!cache_hit);
+    if (cache_hit && !st.ok() && st.is<ErrorCode::TABLE_NOT_FOUND>()) {
+        replicas = get_replica_info(rs_meta.tablet_id(), true, cache_hit);
+        st = _do_warm_up_rowset(rs_meta, replicas, sync_wait_timeout_ms, true);
+    }
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to warm up rowset, tablet_id=" << 
rs_meta.tablet_id()
+                     << ", rowset_id=" << rs_meta.rowset_id().to_string() << 
", status=" << st;
+    }
+}
+
+Status CloudWarmUpManager::_do_warm_up_rowset(RowsetMeta& rs_meta,
+                                              std::vector<TReplicaInfo>& 
replicas,
+                                              int64_t sync_wait_timeout_ms,
+                                              bool skip_existence_check) {
+    auto tablet_id = rs_meta.tablet_id();
     int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
                              
std::chrono::system_clock::now().time_since_epoch())
                              .count();
     g_file_cache_warm_up_rowset_last_call_unix_ts.set_value(now_ts);
+    auto ret_st = Status::OK();
 
     PWarmUpRowsetRequest request;
     request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
     request.set_unix_ts_us(now_ts);
     request.set_sync_wait_timeout_ms(sync_wait_timeout_ms);
+    request.set_skip_existence_check(skip_existence_check);
     for (auto& replica : replicas) {
         // send sync request
         std::string host = replica.host;
@@ -520,7 +547,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta, int64_t sync_wait_t
             if (!status.ok()) {
                 LOG(WARNING) << "failed to get ip from host " << replica.host 
<< ": "
                              << status.to_string();
-                return;
+                continue;
             }
         }
         std::string brpc_addr = get_host_port(host, replica.brpc_port);
@@ -583,7 +610,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta, int64_t sync_wait_t
         if (cntl.Failed()) {
             LOG_WARNING("warm up rowset {} for tablet {} failed, rpc error: 
{}",
                         rs_meta.rowset_id().to_string(), tablet_id, 
cntl.ErrorText());
-            return;
+            return Status::RpcError(cntl.ErrorText());
         }
         if (sync_wait_timeout_ms > 0) {
             auto cost_us = watch.elapsed_time_microseconds();
@@ -595,13 +622,23 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta, int64_t sync_wait_t
             }
             g_file_cache_warm_up_rowset_wait_for_compaction_latency << cost_us;
         }
+        auto status = Status::create(response.status());
+        if (response.has_status() && !status.ok()) {
+            LOG(INFO) << "warm_up_rowset failed, tablet_id=" << 
rs_meta.tablet_id()
+                      << ", rowset_id=" << rs_meta.rowset_id().to_string()
+                      << ", target=" << replica.host << ", 
skip_existence_check"
+                      << skip_existence_check << ", status=" << status;
+            ret_st = status;
+        }
     }
+    return ret_st;
 }
 
 void CloudWarmUpManager::recycle_cache(int64_t tablet_id,
                                        const std::vector<RecycledRowsets>& 
rowsets) {
     LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets=" 
<< rowsets.size();
-    auto replicas = get_replica_info(tablet_id);
+    bool cache_hit = false;
+    auto replicas = get_replica_info(tablet_id, false, cache_hit);
     if (replicas.empty()) {
         return;
     }
diff --git a/be/src/cloud/cloud_warm_up_manager.h 
b/be/src/cloud/cloud_warm_up_manager.h
index 8c64499dce7..c801e77acc7 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -86,7 +86,12 @@ public:
 
 private:
     void handle_jobs();
-    std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id);
+
+    Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& 
replicas,
+                              int64_t sync_wait_timeout_ms, bool 
skip_existence_check);
+
+    std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool 
bypass_cache,
+                                               bool& cache_hit);
 
     void submit_download_tasks(io::Path path, int64_t file_size, 
io::FileSystemSPtr file_system,
                                int64_t expiration_time,
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 2f23eeae453..5fdce934879 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -841,9 +841,11 @@ message PWarmUpRowsetRequest {
     repeated RowsetMetaPB rowset_metas = 1;
     optional int64 unix_ts_us = 2;
     optional int64 sync_wait_timeout_ms = 3;
+    optional bool skip_existence_check = 4;
 }
 
 message PWarmUpRowsetResponse {
+    optional PStatus status = 1;
 }
 
 message RecycleCacheMeta {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to