This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 97ea1738dba branch-4.0: [fix](cloud) Fix cloud warm up balance slow 
scheduling #58962 (#59337)
97ea1738dba is described below

commit 97ea1738dbaa0f18de0c8c7a6b31866972863730
Author: deardeng <[email protected]>
AuthorDate: Thu Dec 25 14:29:09 2025 +0800

    branch-4.0: [fix](cloud) Fix cloud warm up balance slow scheduling #58962 
(#59337)
    
    cherry pick from #58962
---
 be/src/cloud/cloud_backend_service.cpp             | 144 ++++----
 be/src/cloud/cloud_internal_service.cpp            |  27 +-
 be/src/cloud/cloud_storage_engine.cpp              |  17 +-
 be/src/cloud/cloud_storage_engine.h                |   3 +
 be/src/cloud/config.cpp                            |   2 +
 be/src/cloud/config.h                              |   2 +
 .../main/java/org/apache/doris/common/Config.java  |  31 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java | 408 +++++++++++++++++----
 .../doris/cloud/catalog/CloudUpgradeMgr.java       |   2 +-
 .../balance/test_expanding_node_balance.groovy     |   2 +-
 10 files changed, 485 insertions(+), 153 deletions(-)

diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
index 62f72f0bb76..4865a4ae06d 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -38,6 +38,10 @@ namespace doris {
 
 bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
         "file_cache_warm_up_cache_async_submitted_segment_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
+        "file_cache_warm_up_cache_async_submitted_task_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
+        "file_cache_warm_up_cache_async_submitted_tablet_num");
 
 CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* 
exec_env)
         : BaseBackendService(exec_env), _engine(engine) {}
@@ -169,79 +173,89 @@ void 
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
 
 void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& 
response,
                                               const TWarmUpCacheAsyncRequest& 
request) {
-    std::ostringstream oss;
-    oss << "[";
-    for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
-        if (i > 0) oss << ",";
-        oss << request.tablet_ids[i];
-    }
-    oss << "]";
-    LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" 
<< request.brpc_port
-              << ", tablets num=" << request.tablet_ids.size() << ", 
tablet_ids=" << oss.str();
+    // just submit the task to the thread pool, no need to wait for the result
+    auto do_warm_up = [this, request]() {
+        std::ostringstream oss;
+        oss << "[";
+        for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
+            if (i > 0) oss << ",";
+            oss << request.tablet_ids[i];
+        }
+        oss << "]";
+        g_file_cache_warm_up_cache_async_submitted_tablet_num << 
request.tablet_ids.size();
+        LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << 
":"
+                  << request.brpc_port << ", tablets num=" << 
request.tablet_ids.size()
+                  << ", tablet_ids=" << oss.str();
 
-    auto& manager = 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
-    // Record each tablet in manager
-    for (int64_t tablet_id : request.tablet_ids) {
-        manager.record_balanced_tablet(tablet_id, request.host, 
request.brpc_port);
-    }
+        auto& manager = 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+        // Record each tablet in manager
+        for (int64_t tablet_id : request.tablet_ids) {
+            manager.record_balanced_tablet(tablet_id, request.host, 
request.brpc_port);
+        }
 
-    std::string host = request.host;
-    auto dns_cache = ExecEnv::GetInstance()->dns_cache();
-    if (dns_cache == nullptr) {
-        LOG(WARNING) << "DNS cache is not initialized, skipping hostname 
resolve";
-    } else if (!is_valid_ip(request.host)) {
-        Status status = dns_cache->get(request.host, &host);
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to get ip from host " << request.host << 
": "
-                         << status.to_string();
-            // Remove failed tablets from tracking
-            manager.remove_balanced_tablets(request.tablet_ids);
+        std::string host = request.host;
+        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+        if (dns_cache == nullptr) {
+            LOG(WARNING) << "DNS cache is not initialized, skipping hostname 
resolve";
+        } else if (!is_valid_ip(request.host)) {
+            Status status = dns_cache->get(request.host, &host);
+            if (!status.ok()) {
+                LOG(WARNING) << "failed to get ip from host " << request.host 
<< ": "
+                             << status.to_string();
+                return;
+            }
+        }
+        std::string brpc_addr = get_host_port(host, request.brpc_port);
+        std::shared_ptr<PBackendService_Stub> brpc_stub =
+                
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
+        if (!brpc_stub) {
+            LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for 
addr " << brpc_addr;
             return;
         }
-    }
-    std::string brpc_addr = get_host_port(host, request.brpc_port);
-    Status st = Status::OK();
-    TStatus t_status;
-    std::shared_ptr<PBackendService_Stub> brpc_stub =
-            
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
-    if (!brpc_stub) {
-        st = Status::RpcError("Address {} is wrong", brpc_addr);
-        LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr 
" << brpc_addr;
-        // Remove failed tablets from tracking
-        manager.remove_balanced_tablets(request.tablet_ids);
-        return;
-    }
-    brpc::Controller cntl;
-    PGetFileCacheMetaRequest brpc_request;
-    std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
-                  [&](int64_t tablet_id) { 
brpc_request.add_tablet_ids(tablet_id); });
-    PGetFileCacheMetaResponse brpc_response;
+        PGetFileCacheMetaRequest brpc_request;
+        std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
+                      [&](int64_t tablet_id) { 
brpc_request.add_tablet_ids(tablet_id); });
 
-    brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, 
&brpc_response, nullptr);
-    VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
-               << ", response=" << brpc_response.DebugString();
-    if (!cntl.Failed()) {
-        g_file_cache_warm_up_cache_async_submitted_segment_num
-                << brpc_response.file_cache_block_metas().size();
-        auto& file_cache_block_metas = 
*brpc_response.mutable_file_cache_block_metas();
-        if (!file_cache_block_metas.empty()) {
+        auto run_rpc = [this, brpc_stub,
+                        brpc_addr](PGetFileCacheMetaRequest request_copy) -> 
Status {
+            brpc::Controller cntl;
+            cntl.set_timeout_ms(20 * 1000); // 20s
+            PGetFileCacheMetaResponse brpc_response;
+            brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy, 
&brpc_response,
+                                                        nullptr);
+            if (cntl.Failed()) {
+                LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" 
<< brpc_addr
+                             << ", error=" << cntl.ErrorText()
+                             << ", error code=" << cntl.ErrorCode();
+                return Status::RpcError("{} isn't connected, error code={}", 
brpc_addr,
+                                        cntl.ErrorCode());
+            }
+            VLOG_DEBUG << "warm_up_cache_async: request=" << 
request_copy.DebugString()
+                       << ", response=" << brpc_response.DebugString();
+            g_file_cache_warm_up_cache_async_submitted_segment_num
+                    << brpc_response.file_cache_block_metas().size();
             _engine.file_cache_block_downloader().submit_download_task(
-                    std::move(file_cache_block_metas));
-            LOG(INFO) << "warm_up_cache_async: successfully submitted download 
task for tablets="
-                      << oss.str();
-        } else {
-            LOG(INFO) << "warm_up_cache_async: no file cache block meta found, 
addr=" << brpc_addr;
-            manager.remove_balanced_tablets(request.tablet_ids);
+                    
std::move(*brpc_response.mutable_file_cache_block_metas()));
+            return Status::OK();
+        };
+
+        Status rpc_status = run_rpc(std::move(brpc_request));
+        if (!rpc_status.ok()) {
+            LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << 
brpc_addr
+                         << ", status=" << rpc_status;
         }
-    } else {
-        st = Status::RpcError("{} isn't connected", brpc_addr);
-        // Remove failed tablets from tracking
-        manager.remove_balanced_tablets(request.tablet_ids);
-        LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << 
brpc_addr
-                     << ", error=" << cntl.ErrorText();
+    };
+    g_file_cache_warm_up_cache_async_submitted_task_num << 1;
+    Status submit_st = 
_engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
+    if (!submit_st.ok()) {
+        LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
+                        "warmup_cache_async_thread_pool, status="
+                     << submit_st.to_string() << ", execute synchronously";
+        do_warm_up();
     }
-    st.to_thrift(&t_status);
-    response.status = t_status;
+    TStatus t_status;
+    submit_st.to_thrift(&t_status);
+    response.status = std::move(t_status);
 }
 
 void 
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& 
response,
diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 2584ce8146b..d242f90a2ec 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
         "file_cache_get_by_peer_server_latency");
 bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
         "file_cache_get_by_peer_read_cache_file_latency");
+bvar::LatencyRecorder 
g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
+        "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
 
 CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, 
ExecEnv* exec_env)
         : PInternalService(exec_env), _engine(engine) {}
@@ -95,13 +97,26 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
         LOG_WARNING("try to access tablet file cache meta, but file cache not 
enabled");
         return;
     }
-    LOG(INFO) << "warm up get meta from this be, tablets num=" << 
request->tablet_ids().size();
+    auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+                            
std::chrono::steady_clock::now().time_since_epoch())
+                            .count();
+    std::ostringstream tablet_ids_stream;
+    int count = 0;
+    for (const auto& tablet_id : request->tablet_ids()) {
+        tablet_ids_stream << tablet_id << ", ";
+        count++;
+        if (count >= 10) {
+            break;
+        }
+    }
+    LOG(INFO) << "warm up get meta from this be, tablets num=" << 
request->tablet_ids().size()
+              << ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
     for (const auto& tablet_id : request->tablet_ids()) {
         auto res = _engine.tablet_mgr().get_tablet(tablet_id);
         if (!res.has_value()) {
             LOG(ERROR) << "failed to get tablet: " << tablet_id
                        << " err msg: " << res.error().msg();
-            return;
+            continue;
         }
         CloudTabletSPtr tablet = std::move(res.value());
         auto st = tablet->sync_rowsets();
@@ -166,7 +181,13 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
             }
         }
     }
-    VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
+    auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+                          std::chrono::steady_clock::now().time_since_epoch())
+                          .count();
+    g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << 
(end_ts - begin_ts);
+    LOG(INFO) << "get file cache meta by tablet ids = [ " << 
tablet_ids_stream.str() << " ] took "
+              << end_ts - begin_ts << " us";
+    VLOG_DEBUG << "get file cache meta by tablet id request=" << 
request->DebugString()
                << ", response=" << response->DebugString();
 }
 
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 8529fb0ed92..f8a7ac68515 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -238,10 +238,19 @@ Status CloudStorageEngine::open() {
     // check cluster id
     RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to 
check cluster id");
 
-    return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
-            .set_max_threads(config::sync_load_for_tablets_thread)
-            .set_min_threads(config::sync_load_for_tablets_thread)
-            .build(&_sync_load_for_tablets_thread_pool);
+    
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
+                                           
.set_max_threads(config::sync_load_for_tablets_thread)
+                                           
.set_min_threads(config::sync_load_for_tablets_thread)
+                                           
.build(&_sync_load_for_tablets_thread_pool),
+                                   "fail to build 
SyncLoadForTabletsThreadPool");
+
+    
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
+                                           
.set_max_threads(config::warmup_cache_async_thread)
+                                           
.set_min_threads(config::warmup_cache_async_thread)
+                                           
.build(&_warmup_cache_async_thread_pool),
+                                   "fail to build WarmupCacheAsyncThreadPool");
+
+    return Status::OK();
 }
 
 void CloudStorageEngine::stop() {
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 0b61fe20762..6948d8c3594 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -152,6 +152,8 @@ public:
         return *_sync_load_for_tablets_thread_pool;
     }
 
+    ThreadPool& warmup_cache_async_thread_pool() const { return 
*_warmup_cache_async_thread_pool; }
+
     Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t 
initiator);
 
     Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool 
clear_ms);
@@ -204,6 +206,7 @@ private:
     std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
     std::unique_ptr<TabletHotspot> _tablet_hotspot;
     std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
+    std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
     std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
 
     // FileSystem with latest shared storage info, new data will be written to 
this fs.
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index b915c1e0034..30cecb7a405 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -69,6 +69,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true");
 
 DEFINE_mInt32(sync_load_for_tablets_thread, "32");
 
+DEFINE_Int32(warmup_cache_async_thread, "16");
+
 DEFINE_mBool(enable_new_tablet_do_compaction, "true");
 
 // Empty rowset compaction strategy configurations
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index ce4c7e0cd8c..b12ff464700 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -113,6 +113,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log);
 // the theads which sync the datas which loaded in other clusters
 DECLARE_mInt32(sync_load_for_tablets_thread);
 
+DECLARE_Int32(warmup_cache_async_thread);
+
 DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
 
 DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7f91044f2df..655df8d0c81 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3372,16 +3372,13 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static double cloud_balance_tablet_percent_per_run = 0.05;
 
-    @ConfField(mutable = true, masterOnly = true)
-    public static int cloud_min_balance_tablet_num_per_run = 2;
-
-    @ConfField(mutable = true, masterOnly = true, description = 
{"指定存算分离模式下所有Compute group的扩缩容预热方式。"
-            + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
-            + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
-            + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
-            + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer 
BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
-            + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
-            + "设置compute group维度的balance类型,compute group维度配置优先级更高",
+    @ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有 
Compute group 的扩缩容预热方式。"
+            + "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
+            + "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
+            + "sync_warmup: 同步预热,确保 cache 迁移完成,均衡较慢但无 cache miss;"
+            + "peer_read_async_warmup: 直接修改 tablet 分片映射,首次读从 Peer BE 
拉取,均衡最快可能会影响同计算组中其他 BE 性能。"
+            + "注意:此为全局 FE 配置,也可通过 SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+            + "设置 compute group 维度的 balance 类型,compute group 维度配置优先级更高",
         "Specify the scaling and warming methods for all Compute groups in a 
cloud mode. "
             + "without_warmup: Directly modify shard mapping, first read from 
S3,"
             + "fastest re-balance but largest fluctuation; "
@@ -3396,6 +3393,20 @@ public class Config extends ConfigBase {
             options = {"without_warmup", "async_warmup", "sync_warmup", 
"peer_read_async_warmup"})
     public static String cloud_warm_up_for_rebalance_type = "async_warmup";
 
+    @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
+            + "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per 
host "
+            + "when batching warm-up requests during cloud tablet rebalancing, 
default 10"})
+    public static int cloud_warm_up_batch_size = 10;
+
+    @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
+            + "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds 
before a "
+            + "pending warm-up batch is flushed, default 50ms"})
+    public static int cloud_warm_up_batch_flush_interval_ms = 50;
+
+    @ConfField(mutable = true, masterOnly = true, description = 
{"云上tablet均衡预热rpc异步线程池大小,默认4",
+        "Thread pool size for asynchronous warm-up RPC dispatch during cloud 
tablet rebalancing, default 4"})
+    public static int cloud_warm_up_rpc_async_pool_size = 4;
+
     @ConfField(mutable = true, masterOnly = false)
     public static String security_checker_class_name = "";
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 019fb33aeff..bbdec086030 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -34,6 +34,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.MasterDaemon;
@@ -64,7 +65,12 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class CloudTabletRebalancer extends MasterDaemon {
@@ -80,21 +86,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
     private volatile ConcurrentHashMap<Long, Set<Tablet>> 
beToTabletsGlobalInSecondary =
             new ConcurrentHashMap<Long, Set<Tablet>>();
 
-    private Map<Long, Set<Tablet>> futureBeToTabletsGlobal;
+    private volatile ConcurrentHashMap<Long, Set<Tablet>> 
futureBeToTabletsGlobal;
 
     private Map<String, List<Long>> clusterToBes;
 
     private Set<Long> allBes;
 
     // partitionId -> indexId -> be -> tablet
-    private Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionToTablets;
+    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>> partitionToTablets;
 
-    private Map<Long, Map<Long, Map<Long, Set<Tablet>>>> 
futurePartitionToTablets;
+    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>>
+            futurePartitionToTablets;
 
     // tableId -> be -> tablet
-    private Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable;
+    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> 
beToTabletsInTable;
 
-    private Map<Long, Map<Long, Set<Tablet>>> futureBeToTabletsInTable;
+    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> 
futureBeToTabletsInTable;
 
     private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();
 
@@ -108,10 +115,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new 
LinkedBlockingQueue<Pair<Long, Long>>();
 
-    private Map<InfightTablet, InfightTask> tabletToInfightTask = new 
HashMap<>();
+    private Map<InfightTablet, InfightTask> tabletToInfightTask = new 
ConcurrentHashMap<>();
+
+    private final ConcurrentHashMap<WarmupBatchKey, WarmupBatch> warmupBatches 
= new ConcurrentHashMap<>();
+
+    private volatile ScheduledExecutorService warmupBatchScheduler;
+
+    private volatile ScheduledExecutorService warmupCheckScheduler;
+
+    private volatile ExecutorService warmupRpcExecutor;
+
+    private final ConcurrentLinkedQueue<WarmupTabletTask> failedWarmupTasks = 
new ConcurrentLinkedQueue<>();
 
     private CloudSystemInfoService cloudSystemInfoService;
 
+    private final Object warmupExecutorInitLock = new Object();
+
     private BalanceTypeEnum globalBalanceTypeEnum = 
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
 
     /**
@@ -165,6 +184,49 @@ public class CloudTabletRebalancer extends MasterDaemon {
         this.cloudSystemInfoService = cloudSystemInfoService;
     }
 
+    private void initializeWarmupExecutorsIfNeeded() {
+        if (warmupRpcExecutor != null) {
+            return; // Already initialized
+        }
+        synchronized (warmupExecutorInitLock) {
+            if (warmupRpcExecutor != null) {
+                return; // Double check
+            }
+            Env env = Env.getCurrentEnv();
+            if (env == null || !env.isMaster()) {
+                LOG.info("Env not initialized or not master, skip start warmup 
batch scheduler");
+                return;
+            }
+            warmupRpcExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+                Math.max(1, Config.cloud_warm_up_rpc_async_pool_size), 1000,
+                "cloud-warmup-rpc-dispatch", true);
+            warmupBatchScheduler = 
Executors.newSingleThreadScheduledExecutor(r -> {
+                Thread t = new Thread(r, "cloud-warmup-batch-flusher");
+                t.setDaemon(true);
+                return t;
+            });
+            long flushInterval = Math.max(1L, 
Config.cloud_warm_up_batch_flush_interval_ms);
+            
warmupBatchScheduler.scheduleAtFixedRate(this::flushExpiredWarmupBatches,
+                    flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+
+            warmupCheckScheduler = 
Executors.newSingleThreadScheduledExecutor(r -> {
+                Thread t = new Thread(r, "cloud-warmup-checker");
+                t.setDaemon(true);
+                return t;
+            });
+            long warmupCheckInterval = 10L;
+            warmupCheckScheduler.scheduleAtFixedRate(() -> {
+                try {
+                    // send check rpc to be, 10s check once
+                    checkInflightWarmUpCacheAsync();
+                } catch (Throwable t) {
+                    LOG.warn("unexpected error when checking inflight warm up 
cache async", t);
+                }
+            }, warmupCheckInterval, warmupCheckInterval, TimeUnit.SECONDS);
+            LOG.info("Warmup executors initialized successfully");
+        }
+    }
+
     private interface Operator {
         void op(Database db, Table table, Partition partition, 
MaterializedIndex index, String cluster);
     }
@@ -212,6 +274,88 @@ public class CloudTabletRebalancer extends MasterDaemon {
         BalanceType balanceType;
     }
 
+    @Getter
+    private static class WarmupBatchKey {
+        private final long srcBe;
+        private final long destBe;
+
+        WarmupBatchKey(long srcBe, long destBe) {
+            this.srcBe = srcBe;
+            this.destBe = destBe;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof WarmupBatchKey)) {
+                return false;
+            }
+            WarmupBatchKey that = (WarmupBatchKey) o;
+            return srcBe == that.srcBe && destBe == that.destBe;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(srcBe, destBe);
+        }
+    }
+
+    private static class WarmupTabletTask {
+        private final Tablet pickedTablet;
+        private final long srcBe;
+        private final long destBe;
+        private final String clusterId;
+
+        WarmupTabletTask(Tablet pickedTablet, long srcBe, long destBe, String 
clusterId) {
+            this.pickedTablet = pickedTablet;
+            this.srcBe = srcBe;
+            this.destBe = destBe;
+            this.clusterId = clusterId;
+        }
+    }
+
+    private static class WarmupBatch {
+        private final WarmupBatchKey key;
+        private final List<WarmupTabletTask> tasks = new ArrayList<>();
+        private long lastUpdateMs = System.currentTimeMillis();
+
+        WarmupBatch(WarmupBatchKey key) {
+            this.key = key;
+        }
+
+        synchronized List<WarmupTabletTask> addTask(WarmupTabletTask task, int 
batchSize) {
+            tasks.add(task);
+            lastUpdateMs = System.currentTimeMillis();
+            if (tasks.size() >= batchSize) {
+                return drain();
+            }
+            return Collections.emptyList();
+        }
+
+        synchronized List<WarmupTabletTask> drainIfExpired(long 
flushIntervalMs) {
+            if (tasks.isEmpty()) {
+                return Collections.emptyList();
+            }
+            if (System.currentTimeMillis() - lastUpdateMs >= flushIntervalMs) {
+                return drain();
+            }
+            return Collections.emptyList();
+        }
+
+        synchronized boolean isEmpty() {
+            return tasks.isEmpty();
+        }
+
+        private List<WarmupTabletTask> drain() {
+            List<WarmupTabletTask> copy = new ArrayList<>(tasks);
+            tasks.clear();
+            lastUpdateMs = System.currentTimeMillis();
+            return copy;
+        }
+    }
+
     private class TransferPairInfo {
         public long srcBe;
         public long destBe;
@@ -260,7 +404,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     public int getTabletNumByBackendId(long beId) {
-        Set<Tablet> tablets = beToTabletsGlobal.get(beId);
+        Map<Long, Set<Tablet>> sourceMap = beToTabletsGlobal;
+        ConcurrentHashMap<Long, Set<Tablet>> futureMap = 
futureBeToTabletsGlobal;
+        if (futureMap != null && !futureMap.isEmpty()) {
+            sourceMap = futureMap;
+        }
+
+        Set<Tablet> tablets = sourceMap.get(beId);
         Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
 
         int tabletsSize = (tablets == null) ? 0 : tablets.size();
@@ -280,6 +430,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
     // 9 check whether all tablets of decomission node have been migrated
     @Override
     protected void runAfterCatalogReady() {
+        // Initialize warmup executors when catalog is ready
+        initializeWarmupExecutorsIfNeeded();
+
         if (Config.enable_cloud_multi_replica) {
             LOG.info("Tablet balance is temporarily not supported when multi 
replica enabled");
             return;
@@ -294,7 +447,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
             return;
         }
 
-        checkInflightWarmUpCacheAsync();
         statRouteInfo();
         migrateTabletsForSmoothUpgrade();
         statRouteInfo();
@@ -728,42 +880,118 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     public void fillBeToTablets(long be, long tableId, long partId, long 
indexId, Tablet tablet,
-            Map<Long, Set<Tablet>> globalBeToTablets,
-            Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
-            Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
+                                ConcurrentHashMap<Long, Set<Tablet>> 
globalBeToTablets,
+                                ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
+                                ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
+                                    partToTablets) {
         // global
-        globalBeToTablets.putIfAbsent(be, new HashSet<Tablet>());
+        globalBeToTablets.putIfAbsent(be, ConcurrentHashMap.newKeySet());
         globalBeToTablets.get(be).add(tablet);
 
         // table
-        beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long, 
Set<Tablet>>());
-        Map<Long, Set<Tablet>> beToTabletsOfTable = 
beToTabletsInTable.get(tableId);
-        beToTabletsOfTable.putIfAbsent(be, new HashSet<Tablet>());
+        beToTabletsInTable.putIfAbsent(tableId, new ConcurrentHashMap<Long, 
Set<Tablet>>());
+        ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfTable = 
beToTabletsInTable.get(tableId);
+        beToTabletsOfTable.putIfAbsent(be, ConcurrentHashMap.newKeySet());
         beToTabletsOfTable.get(be).add(tablet);
 
         // partition
-        partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long, 
Set<Tablet>>>());
-        Map<Long, Map<Long, Set<Tablet>>> indexToTablets = 
partToTablets.get(partId);
-        indexToTablets.putIfAbsent(indexId, new HashMap<Long, Set<Tablet>>());
-        Map<Long, Set<Tablet>> beToTabletsOfIndex = 
indexToTablets.get(indexId);
-        beToTabletsOfIndex.putIfAbsent(be, new HashSet<Tablet>());
+        partToTablets.putIfAbsent(partId, new ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>());
+        ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> 
indexToTablets = partToTablets.get(partId);
+        indexToTablets.putIfAbsent(indexId, new ConcurrentHashMap<Long, 
Set<Tablet>>());
+        ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfIndex = 
indexToTablets.get(indexId);
+        beToTabletsOfIndex.putIfAbsent(be, ConcurrentHashMap.newKeySet());
         beToTabletsOfIndex.get(be).add(tablet);
     }
 
+    private void enqueueWarmupTask(WarmupTabletTask task) {
+        WarmupBatchKey key = new WarmupBatchKey(task.srcBe, task.destBe);
+        WarmupBatch batch = warmupBatches.computeIfAbsent(key, 
WarmupBatch::new);
+        List<WarmupTabletTask> readyTasks = batch.addTask(task, Math.max(1, 
Config.cloud_warm_up_batch_size));
+        if (!readyTasks.isEmpty()) {
+            dispatchWarmupBatch(key, readyTasks);
+        }
+    }
+
+    private void dispatchWarmupBatch(WarmupBatchKey key, 
List<WarmupTabletTask> tasks) {
+        if (tasks.isEmpty()) {
+            return;
+        }
+        initializeWarmupExecutorsIfNeeded();
+        if (warmupRpcExecutor != null) {
+            warmupRpcExecutor.submit(() -> sendWarmupBatch(key, tasks));
+        } else {
+            LOG.warn("warmupRpcExecutor is not initialized, skip dispatching 
warmup batch");
+        }
+    }
+
+    private void sendWarmupBatch(WarmupBatchKey key, List<WarmupTabletTask> 
tasks) {
+        Backend srcBackend = cloudSystemInfoService.getBackend(key.getSrcBe());
+        Backend destBackend = 
cloudSystemInfoService.getBackend(key.getDestBe());
+        if (srcBackend == null || destBackend == null || 
!destBackend.isAlive()) {
+            handleWarmupBatchFailure(tasks, new IllegalStateException(
+                    String.format("backend missing or dead, src %s dest %s", 
srcBackend, destBackend)));
+            return;
+        }
+        List<Long> tabletIds = tasks.stream().map(task -> 
task.pickedTablet.getId()).collect(Collectors.toList());
+        try {
+            sendPreHeatingRpc(tabletIds, key.getSrcBe(), key.getDestBe());
+        } catch (Exception e) {
+            handleWarmupBatchFailure(tasks, e);
+            return;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("dispatch preheat batch {} from {} to {}, tablet num {}",
+                    tabletIds, key.getSrcBe(), key.getDestBe(), 
tabletIds.size());
+        }
+    }
+
+    private void handleWarmupBatchFailure(List<WarmupTabletTask> tasks, 
Exception e) {
+        if (e != null) {
+            LOG.warn("preheat batch failed, size {}", tasks.size(), e);
+        }
+        for (WarmupTabletTask task : tasks) {
+            failedWarmupTasks.offer(task);
+        }
+    }
+
+    private void revertWarmupState(WarmupTabletTask task) {
+        updateBeToTablets(task.pickedTablet, task.destBe, task.srcBe,
+                futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
+        tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), task.clusterId));
+    }
+
+    private void processFailedWarmupTasks() {
+        WarmupTabletTask task;
+        while ((task = failedWarmupTasks.poll()) != null) {
+            revertWarmupState(task);
+        }
+    }
+
+    private void flushExpiredWarmupBatches() {
+        long flushInterval = Math.max(1L, 
Config.cloud_warm_up_batch_flush_interval_ms);
+        for (Map.Entry<WarmupBatchKey, WarmupBatch> entry : 
warmupBatches.entrySet()) {
+            List<WarmupTabletTask> readyTasks = 
entry.getValue().drainIfExpired(flushInterval);
+            if (!readyTasks.isEmpty()) {
+                dispatchWarmupBatch(entry.getKey(), readyTasks);
+            }
+        }
+    }
+
     public void statRouteInfo() {
         ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobal = new 
ConcurrentHashMap<Long, Set<Tablet>>();
+        ConcurrentHashMap<Long, Set<Tablet>> tmpFutureBeToTabletsGlobal = new 
ConcurrentHashMap<Long, Set<Tablet>>();
         ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobalInSecondary
                 = new ConcurrentHashMap<Long, Set<Tablet>>();
         ConcurrentHashMap<Long, Set<Tablet>> tmpBeToColocateTabletsGlobal
                 = new ConcurrentHashMap<Long, Set<Tablet>>();
 
-        futureBeToTabletsGlobal = new HashMap<Long, Set<Tablet>>();
-
-        partitionToTablets = new HashMap<Long, Map<Long, Map<Long, 
Set<Tablet>>>>();
-        futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long, 
Set<Tablet>>>>();
+        partitionToTablets = new ConcurrentHashMap<Long,
+            ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>();
+        futurePartitionToTablets =
+                new ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>>();
 
-        beToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
-        futureBeToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
+        beToTabletsInTable = new ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>();
+        futureBeToTabletsInTable = new ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>();
 
         loopCloudReplica((Database db, Table table, Partition partition, 
MaterializedIndex index, String cluster) -> {
             boolean isColocated = 
Env.getCurrentColocateIndex().isColocateTable(table.getId());
@@ -806,12 +1034,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
                             tmpBeToTabletsGlobal, beToTabletsInTable, 
this.partitionToTablets);
 
                     fillBeToTablets(futureBeId, table.getId(), 
partition.getId(), index.getId(), tablet,
-                            futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
+                            tmpFutureBeToTabletsGlobal, 
futureBeToTabletsInTable, futurePartitionToTablets);
                 }
             }
         });
 
         beToTabletsGlobal = tmpBeToTabletsGlobal;
+        futureBeToTabletsGlobal = tmpFutureBeToTabletsGlobal;
         beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
         beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;
     }
@@ -848,10 +1077,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public void balanceInPartition(List<Long> bes, String clusterId, 
List<UpdateCloudReplicaInfo> infos) {
         // balance all partition
-        for (Map.Entry<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionEntry 
: futurePartitionToTablets.entrySet()) {
-            Map<Long, Map<Long, Set<Tablet>>> indexToTablets = 
partitionEntry.getValue();
+        for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
Set<Tablet>>>> partitionEntry
+                : futurePartitionToTablets.entrySet()) {
+            Map<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets = 
partitionEntry.getValue();
             // balance all index of a partition
-            for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry : 
indexToTablets.entrySet()) {
+            for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry : 
indexToTablets.entrySet()) {
                 // balance a index
                 balanceImpl(bes, clusterId, entry.getValue(), 
BalanceType.PARTITION, infos);
             }
@@ -860,12 +1090,16 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public void balanceInTable(List<Long> bes, String clusterId, 
List<UpdateCloudReplicaInfo> infos) {
         // balance all tables
-        for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry : 
futureBeToTabletsInTable.entrySet()) {
+        for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry : 
futureBeToTabletsInTable.entrySet()) {
             balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, 
infos);
         }
     }
 
     private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long 
destBe) throws Exception {
+        sendPreHeatingRpc(Collections.singletonList(pickedTablet.getId()), 
srcBe, destBe);
+    }
+
+    private void sendPreHeatingRpc(List<Long> tabletIds, long srcBe, long 
destBe) throws Exception {
         BackendService.Client client = null;
         TNetworkAddress address = null;
         Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
@@ -877,9 +1111,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             TWarmUpCacheAsyncRequest req = new TWarmUpCacheAsyncRequest();
             req.setHost(srcBackend.getHost());
             req.setBrpcPort(srcBackend.getBrpcPort());
-            List<Long> tablets = new ArrayList<Long>();
-            tablets.add(pickedTablet.getId());
-            req.setTabletIds(tablets);
+            req.setTabletIds(new ArrayList<>(tabletIds));
             TWarmUpCacheAsyncResponse result = client.warmUpCacheAsync(req);
             if (result.getStatus().getStatusCode() != TStatusCode.OK) {
                 LOG.warn("pre cache failed status {} {}", 
result.getStatus().getStatusCode(),
@@ -995,9 +1227,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     private void updateBeToTablets(Tablet pickedTablet, long srcBe, long 
destBe,
-            Map<Long, Set<Tablet>> globalBeToTablets,
-            Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
-            Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
+                                   ConcurrentHashMap<Long, Set<Tablet>> 
globalBeToTablets,
+                                   ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
+                                   ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, ConcurrentHashMap<Long,
+                                       Set<Tablet>>>> partToTablets) {
         CloudReplica replica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
         long tableId = replica.getTableId();
         long partId = replica.getPartitionId();
@@ -1073,11 +1306,11 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
 
             // Check if the backend is decommissioned
             if (backend != null) {
-                if (backend.isDecommissioning() && tabletNum > 0) {
+                if ((backend.isDecommissioning() || 
backend.isDecommissioned()) && tabletNum > 0) {
                     srcBe = be; // Mark as source if decommissioned and has 
tablets
                     break; // Exit early if we found a decommissioned backend
                 }
-                if (!backend.isDecommissioning() && tabletNum > maxTabletsNum) 
{
+                if (!backend.isDecommissioning() && 
!backend.isDecommissioned() && tabletNum > maxTabletsNum) {
                     srcBe = be;
                     maxTabletsNum = tabletNum;
                 }
@@ -1089,24 +1322,38 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
     }
 
     private long findDestinationBackend(List<Long> bes, Map<Long, Set<Tablet>> 
beToTablets, long srcBe) {
-        long destBe = -1;
         long minTabletsNum = Long.MAX_VALUE;
+        List<Long> candidateBes = new ArrayList<>();
 
         for (Long be : bes) {
             long tabletNum = beToTablets.getOrDefault(be, 
Collections.emptySet()).size();
             Backend backend = cloudSystemInfoService.getBackend(be);
-            if (backend != null && backend.isAlive() && 
!backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) {
+            if (backend != null && backend.isAlive() && 
!backend.isDecommissioning()
+                    && !backend.isDecommissioned() && 
!backend.isSmoothUpgradeSrc()) {
                 if (tabletNum < minTabletsNum) {
-                    destBe = be;
+                    // Found a BE with fewer tablets, reset candidates
                     minTabletsNum = tabletNum;
+                    candidateBes.clear();
+                    candidateBes.add(be);
+                } else if (tabletNum == minTabletsNum) {
+                    // Found a BE with the same minimum tablet count, add to 
candidates
+                    candidateBes.add(be);
                 }
             }
         }
-        return destBe;
+
+        if (candidateBes.isEmpty()) {
+            return -1;
+        }
+
+        // Shuffle candidates with the same tablet count for better load 
balancing
+        Collections.shuffle(candidateBes, rand);
+        return candidateBes.get(0);
     }
 
     private boolean isTransferValid(long srcBe, long minTabletsNum, long 
maxTabletsNum, long avgNum) {
-        boolean srcDecommissioned = 
cloudSystemInfoService.getBackend(srcBe).isDecommissioning();
+        boolean srcDecommissioned = 
cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
+                || cloudSystemInfoService.getBackend(srcBe).isDecommissioned();
 
         if (!srcDecommissioned) {
             if ((maxTabletsNum < avgNum * (1 + 
Config.cloud_rebalance_percent_threshold)
@@ -1119,9 +1366,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     private boolean isConflict(long srcBe, long destBe, CloudReplica 
cloudReplica, BalanceType balanceType,
-                           Map<Long, Map<Long, Map<Long, Set<Tablet>>>> 
beToTabletsInParts,
-                           Map<Long, Map<Long, Set<Tablet>>> 
beToTabletsInTables) {
-        if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()) {
+                               ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>>
+                                   beToTabletsInParts,
+                               ConcurrentHashMap<Long, ConcurrentHashMap<Long, 
Set<Tablet>>> beToTabletsInTables) {
+        if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
+                || 
cloudSystemInfoService.getBackend(srcBe).isDecommissioned()) {
             return false; // If source BE is decommissioned, no conflict
         }
 
@@ -1135,8 +1384,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     private boolean checkGlobalBalanceConflict(long srcBe, long destBe, 
CloudReplica cloudReplica,
-                                               Map<Long, Map<Long, Map<Long, 
Set<Tablet>>>> beToTabletsInParts,
-                                               Map<Long, Map<Long, 
Set<Tablet>>> beToTabletsInTables) {
+                                               ConcurrentHashMap<Long,
+                                                   ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>>
+                                                   beToTabletsInParts,
+                                               ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>
+                                                   beToTabletsInTables) {
         long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, 
beToTabletsInParts);
         long minBeSize = getTabletSizeInParts(destBe, cloudReplica, 
beToTabletsInParts);
 
@@ -1151,7 +1403,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     private boolean checkTableBalanceConflict(long srcBe, long destBe, 
CloudReplica cloudReplica,
-                                              Map<Long, Map<Long, Map<Long, 
Set<Tablet>>>> beToTabletsInParts) {
+                                              ConcurrentHashMap<Long,
+                                                  ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>>
+                                                  beToTabletsInParts) {
         long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, 
beToTabletsInParts);
         long minBeSize = getTabletSizeInParts(destBe, cloudReplica, 
beToTabletsInParts);
 
@@ -1159,15 +1413,29 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
     }
 
     private long getTabletSizeInParts(long beId, CloudReplica cloudReplica,
-                                         Map<Long, Map<Long, Map<Long, 
Set<Tablet>>>> beToTabletsInParts) {
-        Set<Tablet> tablets = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
-                .get(cloudReplica.getIndexId()).get(beId);
+                                      ConcurrentHashMap<Long,
+                                          ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>>>
+                                          beToTabletsInParts) {
+        ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> 
indexToTablets
+                = beToTabletsInParts.get(cloudReplica.getPartitionId());
+        if (indexToTablets == null) {
+            return 0;
+        }
+        ConcurrentHashMap<Long, Set<Tablet>> beToTablets = 
indexToTablets.get(cloudReplica.getIndexId());
+        if (beToTablets == null) {
+            return 0;
+        }
+        Set<Tablet> tablets = beToTablets.get(beId);
         return tablets == null ? 0 : tablets.size();
     }
 
     private long getTabletSizeInBes(long beId, CloudReplica cloudReplica,
-                                    Map<Long, Map<Long, Set<Tablet>>> 
beToTabletsInTables) {
-        Set<Tablet> tablets = 
beToTabletsInTables.get(cloudReplica.getTableId()).get(beId);
+                                    ConcurrentHashMap<Long, 
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTables) {
+        ConcurrentHashMap<Long, Set<Tablet>> beToTablets = 
beToTabletsInTables.get(cloudReplica.getTableId());
+        if (beToTablets == null) {
+            return 0;
+        }
+        Set<Tablet> tablets = beToTablets.get(beId);
         return tablets == null ? 0 : tablets.size();
     }
 
@@ -1178,6 +1446,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
             return;
         }
 
+        processFailedWarmupTasks();
+
         long totalTabletsNum = calculateTotalTablets(bes, beToTablets);
         long beNum = countActiveBackends(bes);
 
@@ -1187,7 +1457,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         }
 
         long avgNum = totalTabletsNum / beNum;
-        long transferNum = calculateTransferNum(avgNum);
+        long transferNum = calculateTransferNum(avgNum, beNum);
 
         BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
         LOG.debug("balance type {}, be num {}, total tablets num {}, avg num 
{}, transfer num {}",
@@ -1254,14 +1524,13 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
         return bes.stream()
                 .filter(be -> {
                     Backend backend = cloudSystemInfoService.getBackend(be);
-                    return backend != null && !backend.isDecommissioning();
+                    return backend != null && !backend.isDecommissioning() && 
!backend.isDecommissioned();
                 })
                 .count();
     }
 
-    private long calculateTransferNum(long avgNum) {
-        return Math.max(Math.round(avgNum * 
Config.cloud_balance_tablet_percent_per_run),
-                        Config.cloud_min_balance_tablet_num_per_run);
+    private long calculateTransferNum(long avgNum, long beNum) {
+        return Math.max(Math.round(avgNum * 
Config.cloud_balance_tablet_percent_per_run), beNum);
     }
 
     private void updateBalanceStatus(BalanceType balanceType) {
@@ -1282,12 +1551,11 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
 
     private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long 
destBe, String clusterId,
                                      BalanceType balanceType, Map<Long, 
Set<Tablet>> beToTablets) {
-        try {
-            sendPreHeatingRpc(pickedTablet, srcBe, destBe);
-        } catch (Exception e) {
-            LOG.warn("Failed to preheat tablet {} from {} to {}, "
-                    + "help msg change fe config 
cloud_warm_up_for_rebalance_type to without_warmup ",
-                    pickedTablet.getId(), srcBe, destBe, e);
+        Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
+        Backend destBackend = cloudSystemInfoService.getBackend(destBe);
+        if (srcBackend == null || destBackend == null) {
+            LOG.warn("backend missing when preheating tablet {} from {} to {}, 
cluster {}",
+                    pickedTablet.getId(), srcBe, destBe, clusterId);
             return;
         }
 
@@ -1298,16 +1566,18 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
         task.balanceType = balanceType;
         task.beToTablets = beToTablets;
         task.startTimestamp = System.currentTimeMillis() / 1000;
-        tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), 
clusterId), task);
+        InfightTablet key = new InfightTablet(pickedTablet.getId(), clusterId);
 
-        LOG.info("pre cache {} from {} to {}, cluster {}", 
pickedTablet.getId(), srcBe, destBe, clusterId);
+        tabletToInfightTask.put(key, task);
         updateBeToTablets(pickedTablet, srcBe, destBe,
                 futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
+        LOG.debug("pre cache {} from {} to {}, cluster {}", 
pickedTablet.getId(), srcBe, destBe, clusterId);
+        enqueueWarmupTask(new WarmupTabletTask(pickedTablet, srcBe, destBe, 
clusterId));
     }
 
     private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, 
String clusterId,
                             BalanceType balanceType, 
List<UpdateCloudReplicaInfo> infos) {
-        LOG.info("transfer {} from {} to {}, cluster {}, type {}",
+        LOG.debug("transfer {} from {} to {}, cluster {}, type {}",
                 pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
         updateBeToTablets(pickedTablet, srcBe, destBe,
                 beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
index 5f2a80274e5..0a466e73f5f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
@@ -120,7 +120,7 @@ public class CloudUpgradeMgr extends MasterDaemon {
             txnIds.offer(new DbWithWaterTxn(dbid, nextTransactionId));
         }
         txnBePairList.offer(new DbWithWaterTxnInBe(txnIds, be));
-        LOG.info("register watershedtxnid {} for BE {}", txnIds.stream()
+        LOG.debug("register watershedtxnid {} for BE {}", txnIds.stream()
                 .map(dbWithWaterTxn -> "(" + dbWithWaterTxn.dbId + ":" + 
dbWithWaterTxn.txnId + ")")
                 .collect((Collectors.joining(", ", "{", "}"))), be);
     }
diff --git 
a/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy 
b/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
index 1c8874864c0..4a9b77b5491 100644
--- a/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
@@ -90,7 +90,7 @@ suite('test_expanding_node_balance', 'docker') {
     }
 
     docker(clusterOptions[0]) {
-        def command = 'admin set frontend 
config("cloud_min_balance_tablet_num_per_run"="16");' 
+        def command = 'select 1'; 
         // assert < 300s
         testCase(command, 300)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to