This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 97ea1738dba branch-4.0: [fix](cloud) Fix cloud warm up balance slow
scheduling #58962 (#59337)
97ea1738dba is described below
commit 97ea1738dbaa0f18de0c8c7a6b31866972863730
Author: deardeng <[email protected]>
AuthorDate: Thu Dec 25 14:29:09 2025 +0800
branch-4.0: [fix](cloud) Fix cloud warm up balance slow scheduling #58962
(#59337)
cherry pick from #58962
---
be/src/cloud/cloud_backend_service.cpp | 144 ++++----
be/src/cloud/cloud_internal_service.cpp | 27 +-
be/src/cloud/cloud_storage_engine.cpp | 17 +-
be/src/cloud/cloud_storage_engine.h | 3 +
be/src/cloud/config.cpp | 2 +
be/src/cloud/config.h | 2 +
.../main/java/org/apache/doris/common/Config.java | 31 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 408 +++++++++++++++++----
.../doris/cloud/catalog/CloudUpgradeMgr.java | 2 +-
.../balance/test_expanding_node_balance.groovy | 2 +-
10 files changed, 485 insertions(+), 153 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index 62f72f0bb76..4865a4ae06d 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -38,6 +38,10 @@ namespace doris {
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
"file_cache_warm_up_cache_async_submitted_segment_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
+ "file_cache_warm_up_cache_async_submitted_task_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
+ "file_cache_warm_up_cache_async_submitted_tablet_num");
CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv*
exec_env)
: BaseBackendService(exec_env), _engine(engine) {}
@@ -169,79 +173,89 @@ void
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse&
response,
const TWarmUpCacheAsyncRequest&
request) {
- std::ostringstream oss;
- oss << "[";
- for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
- if (i > 0) oss << ",";
- oss << request.tablet_ids[i];
- }
- oss << "]";
- LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
<< request.brpc_port
- << ", tablets num=" << request.tablet_ids.size() << ",
tablet_ids=" << oss.str();
+ // just submit the task to the thread pool, no need to wait for the result
+ auto do_warm_up = [this, request]() {
+ std::ostringstream oss;
+ oss << "[";
+ for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
+ if (i > 0) oss << ",";
+ oss << request.tablet_ids[i];
+ }
+ oss << "]";
+ g_file_cache_warm_up_cache_async_submitted_tablet_num <<
request.tablet_ids.size();
+ LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host <<
":"
+ << request.brpc_port << ", tablets num=" <<
request.tablet_ids.size()
+ << ", tablet_ids=" << oss.str();
- auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
- // Record each tablet in manager
- for (int64_t tablet_id : request.tablet_ids) {
- manager.record_balanced_tablet(tablet_id, request.host,
request.brpc_port);
- }
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ // Record each tablet in manager
+ for (int64_t tablet_id : request.tablet_ids) {
+ manager.record_balanced_tablet(tablet_id, request.host,
request.brpc_port);
+ }
- std::string host = request.host;
- auto dns_cache = ExecEnv::GetInstance()->dns_cache();
- if (dns_cache == nullptr) {
- LOG(WARNING) << "DNS cache is not initialized, skipping hostname
resolve";
- } else if (!is_valid_ip(request.host)) {
- Status status = dns_cache->get(request.host, &host);
- if (!status.ok()) {
- LOG(WARNING) << "failed to get ip from host " << request.host <<
": "
- << status.to_string();
- // Remove failed tablets from tracking
- manager.remove_balanced_tablets(request.tablet_ids);
+ std::string host = request.host;
+ auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+ if (dns_cache == nullptr) {
+ LOG(WARNING) << "DNS cache is not initialized, skipping hostname
resolve";
+ } else if (!is_valid_ip(request.host)) {
+ Status status = dns_cache->get(request.host, &host);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get ip from host " << request.host
<< ": "
+ << status.to_string();
+ return;
+ }
+ }
+ std::string brpc_addr = get_host_port(host, request.brpc_port);
+ std::shared_ptr<PBackendService_Stub> brpc_stub =
+
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
+ if (!brpc_stub) {
+ LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for
addr " << brpc_addr;
return;
}
- }
- std::string brpc_addr = get_host_port(host, request.brpc_port);
- Status st = Status::OK();
- TStatus t_status;
- std::shared_ptr<PBackendService_Stub> brpc_stub =
-
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
- if (!brpc_stub) {
- st = Status::RpcError("Address {} is wrong", brpc_addr);
- LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr
" << brpc_addr;
- // Remove failed tablets from tracking
- manager.remove_balanced_tablets(request.tablet_ids);
- return;
- }
- brpc::Controller cntl;
- PGetFileCacheMetaRequest brpc_request;
- std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
- [&](int64_t tablet_id) {
brpc_request.add_tablet_ids(tablet_id); });
- PGetFileCacheMetaResponse brpc_response;
+ PGetFileCacheMetaRequest brpc_request;
+ std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
+ [&](int64_t tablet_id) {
brpc_request.add_tablet_ids(tablet_id); });
- brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request,
&brpc_response, nullptr);
- VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
- << ", response=" << brpc_response.DebugString();
- if (!cntl.Failed()) {
- g_file_cache_warm_up_cache_async_submitted_segment_num
- << brpc_response.file_cache_block_metas().size();
- auto& file_cache_block_metas =
*brpc_response.mutable_file_cache_block_metas();
- if (!file_cache_block_metas.empty()) {
+ auto run_rpc = [this, brpc_stub,
+ brpc_addr](PGetFileCacheMetaRequest request_copy) ->
Status {
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(20 * 1000); // 20s
+ PGetFileCacheMetaResponse brpc_response;
+ brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy,
&brpc_response,
+ nullptr);
+ if (cntl.Failed()) {
+ LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr="
<< brpc_addr
+ << ", error=" << cntl.ErrorText()
+ << ", error code=" << cntl.ErrorCode();
+ return Status::RpcError("{} isn't connected, error code={}",
brpc_addr,
+ cntl.ErrorCode());
+ }
+ VLOG_DEBUG << "warm_up_cache_async: request=" <<
request_copy.DebugString()
+ << ", response=" << brpc_response.DebugString();
+ g_file_cache_warm_up_cache_async_submitted_segment_num
+ << brpc_response.file_cache_block_metas().size();
_engine.file_cache_block_downloader().submit_download_task(
- std::move(file_cache_block_metas));
- LOG(INFO) << "warm_up_cache_async: successfully submitted download
task for tablets="
- << oss.str();
- } else {
- LOG(INFO) << "warm_up_cache_async: no file cache block meta found,
addr=" << brpc_addr;
- manager.remove_balanced_tablets(request.tablet_ids);
+
std::move(*brpc_response.mutable_file_cache_block_metas()));
+ return Status::OK();
+ };
+
+ Status rpc_status = run_rpc(std::move(brpc_request));
+ if (!rpc_status.ok()) {
+ LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" <<
brpc_addr
+ << ", status=" << rpc_status;
}
- } else {
- st = Status::RpcError("{} isn't connected", brpc_addr);
- // Remove failed tablets from tracking
- manager.remove_balanced_tablets(request.tablet_ids);
- LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" <<
brpc_addr
- << ", error=" << cntl.ErrorText();
+ };
+ g_file_cache_warm_up_cache_async_submitted_task_num << 1;
+ Status submit_st =
_engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
+ if (!submit_st.ok()) {
+ LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
+ "warmup_cache_async_thread_pool, status="
+ << submit_st.to_string() << ", execute synchronously";
+ do_warm_up();
}
- st.to_thrift(&t_status);
- response.status = t_status;
+ TStatus t_status;
+ submit_st.to_thrift(&t_status);
+ response.status = std::move(t_status);
}
void
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse&
response,
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 2584ce8146b..d242f90a2ec 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
"file_cache_get_by_peer_server_latency");
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
"file_cache_get_by_peer_read_cache_file_latency");
+bvar::LatencyRecorder
g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
+ "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine,
ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}
@@ -95,13 +97,26 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
LOG_WARNING("try to access tablet file cache meta, but file cache not
enabled");
return;
}
- LOG(INFO) << "warm up get meta from this be, tablets num=" <<
request->tablet_ids().size();
+ auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ std::ostringstream tablet_ids_stream;
+ int count = 0;
+ for (const auto& tablet_id : request->tablet_ids()) {
+ tablet_ids_stream << tablet_id << ", ";
+ count++;
+ if (count >= 10) {
+ break;
+ }
+ }
+ LOG(INFO) << "warm up get meta from this be, tablets num=" <<
request->tablet_ids().size()
+ << ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
for (const auto& tablet_id : request->tablet_ids()) {
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
if (!res.has_value()) {
LOG(ERROR) << "failed to get tablet: " << tablet_id
<< " err msg: " << res.error().msg();
- return;
+ continue;
}
CloudTabletSPtr tablet = std::move(res.value());
auto st = tablet->sync_rowsets();
@@ -166,7 +181,13 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
}
}
}
- VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
+ auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency <<
(end_ts - begin_ts);
+ LOG(INFO) << "get file cache meta by tablet ids = [ " <<
tablet_ids_stream.str() << " ] took "
+ << end_ts - begin_ts << " us";
+ VLOG_DEBUG << "get file cache meta by tablet id request=" <<
request->DebugString()
<< ", response=" << response->DebugString();
}
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 8529fb0ed92..f8a7ac68515 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -238,10 +238,19 @@ Status CloudStorageEngine::open() {
// check cluster id
RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to
check cluster id");
- return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
- .set_max_threads(config::sync_load_for_tablets_thread)
- .set_min_threads(config::sync_load_for_tablets_thread)
- .build(&_sync_load_for_tablets_thread_pool);
+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
+
.set_max_threads(config::sync_load_for_tablets_thread)
+
.set_min_threads(config::sync_load_for_tablets_thread)
+
.build(&_sync_load_for_tablets_thread_pool),
+ "fail to build
SyncLoadForTabletsThreadPool");
+
+
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
+
.set_max_threads(config::warmup_cache_async_thread)
+
.set_min_threads(config::warmup_cache_async_thread)
+
.build(&_warmup_cache_async_thread_pool),
+ "fail to build WarmupCacheAsyncThreadPool");
+
+ return Status::OK();
}
void CloudStorageEngine::stop() {
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 0b61fe20762..6948d8c3594 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -152,6 +152,8 @@ public:
return *_sync_load_for_tablets_thread_pool;
}
+ ThreadPool& warmup_cache_async_thread_pool() const { return
*_warmup_cache_async_thread_pool; }
+
Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t
initiator);
Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool
clear_ms);
@@ -204,6 +206,7 @@ private:
std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
std::unique_ptr<TabletHotspot> _tablet_hotspot;
std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
+ std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
// FileSystem with latest shared storage info, new data will be written to
this fs.
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index b915c1e0034..30cecb7a405 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -69,6 +69,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true");
DEFINE_mInt32(sync_load_for_tablets_thread, "32");
+DEFINE_Int32(warmup_cache_async_thread, "16");
+
DEFINE_mBool(enable_new_tablet_do_compaction, "true");
// Empty rowset compaction strategy configurations
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index ce4c7e0cd8c..b12ff464700 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -113,6 +113,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log);
// the theads which sync the datas which loaded in other clusters
DECLARE_mInt32(sync_load_for_tablets_thread);
+DECLARE_Int32(warmup_cache_async_thread);
+
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7f91044f2df..655df8d0c81 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3372,16 +3372,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static double cloud_balance_tablet_percent_per_run = 0.05;
- @ConfField(mutable = true, masterOnly = true)
- public static int cloud_min_balance_tablet_num_per_run = 2;
-
- @ConfField(mutable = true, masterOnly = true, description =
{"指定存算分离模式下所有Compute group的扩缩容预热方式。"
- + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
- + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
- + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
- + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer
BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
- + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
- + "设置compute group维度的balance类型,compute group维度配置优先级更高",
+ @ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有
Compute group 的扩缩容预热方式。"
+ + "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
+ + "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
+ + "sync_warmup: 同步预热,确保 cache 迁移完成,均衡较慢但无 cache miss;"
+ + "peer_read_async_warmup: 直接修改 tablet 分片映射,首次读从 Peer BE
拉取,均衡最快可能会影响同计算组中其他 BE 性能。"
+ + "注意:此为全局 FE 配置,也可通过 SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+ + "设置 compute group 维度的 balance 类型,compute group 维度配置优先级更高",
"Specify the scaling and warming methods for all Compute groups in a
cloud mode. "
+ "without_warmup: Directly modify shard mapping, first read from
S3,"
+ "fastest re-balance but largest fluctuation; "
@@ -3396,6 +3393,20 @@ public class Config extends ConfigBase {
options = {"without_warmup", "async_warmup", "sync_warmup",
"peer_read_async_warmup"})
public static String cloud_warm_up_for_rebalance_type = "async_warmup";
+ @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
+ + "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per
host "
+ + "when batching warm-up requests during cloud tablet rebalancing,
default 10"})
+ public static int cloud_warm_up_batch_size = 10;
+
+ @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
+ + "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds
before a "
+ + "pending warm-up batch is flushed, default 50ms"})
+ public static int cloud_warm_up_batch_flush_interval_ms = 50;
+
+ @ConfField(mutable = true, masterOnly = true, description =
{"云上tablet均衡预热rpc异步线程池大小,默认4",
+ "Thread pool size for asynchronous warm-up RPC dispatch during cloud
tablet rebalancing, default 4"})
+ public static int cloud_warm_up_rpc_async_pool_size = 4;
+
@ConfField(mutable = true, masterOnly = false)
public static String security_checker_class_name = "";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 019fb33aeff..bbdec086030 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -34,6 +34,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
@@ -64,7 +65,12 @@ import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class CloudTabletRebalancer extends MasterDaemon {
@@ -80,21 +86,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
private volatile ConcurrentHashMap<Long, Set<Tablet>>
beToTabletsGlobalInSecondary =
new ConcurrentHashMap<Long, Set<Tablet>>();
- private Map<Long, Set<Tablet>> futureBeToTabletsGlobal;
+ private volatile ConcurrentHashMap<Long, Set<Tablet>>
futureBeToTabletsGlobal;
private Map<String, List<Long>> clusterToBes;
private Set<Long> allBes;
// partitionId -> indexId -> be -> tablet
- private Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionToTablets;
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>> partitionToTablets;
- private Map<Long, Map<Long, Map<Long, Set<Tablet>>>>
futurePartitionToTablets;
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ futurePartitionToTablets;
// tableId -> be -> tablet
- private Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable;
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
beToTabletsInTable;
- private Map<Long, Map<Long, Set<Tablet>>> futureBeToTabletsInTable;
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
futureBeToTabletsInTable;
private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();
@@ -108,10 +115,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new
LinkedBlockingQueue<Pair<Long, Long>>();
- private Map<InfightTablet, InfightTask> tabletToInfightTask = new
HashMap<>();
+ private Map<InfightTablet, InfightTask> tabletToInfightTask = new
ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<WarmupBatchKey, WarmupBatch> warmupBatches
= new ConcurrentHashMap<>();
+
+ private volatile ScheduledExecutorService warmupBatchScheduler;
+
+ private volatile ScheduledExecutorService warmupCheckScheduler;
+
+ private volatile ExecutorService warmupRpcExecutor;
+
+ private final ConcurrentLinkedQueue<WarmupTabletTask> failedWarmupTasks =
new ConcurrentLinkedQueue<>();
private CloudSystemInfoService cloudSystemInfoService;
+ private final Object warmupExecutorInitLock = new Object();
+
private BalanceTypeEnum globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
/**
@@ -165,6 +184,49 @@ public class CloudTabletRebalancer extends MasterDaemon {
this.cloudSystemInfoService = cloudSystemInfoService;
}
+ private void initializeWarmupExecutorsIfNeeded() {
+ if (warmupRpcExecutor != null) {
+ return; // Already initialized
+ }
+ synchronized (warmupExecutorInitLock) {
+ if (warmupRpcExecutor != null) {
+ return; // Double check
+ }
+ Env env = Env.getCurrentEnv();
+ if (env == null || !env.isMaster()) {
+ LOG.info("Env not initialized or not master, skip start warmup
batch scheduler");
+ return;
+ }
+ warmupRpcExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+ Math.max(1, Config.cloud_warm_up_rpc_async_pool_size), 1000,
+ "cloud-warmup-rpc-dispatch", true);
+ warmupBatchScheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "cloud-warmup-batch-flusher");
+ t.setDaemon(true);
+ return t;
+ });
+ long flushInterval = Math.max(1L,
Config.cloud_warm_up_batch_flush_interval_ms);
+
warmupBatchScheduler.scheduleAtFixedRate(this::flushExpiredWarmupBatches,
+ flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+
+ warmupCheckScheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "cloud-warmup-checker");
+ t.setDaemon(true);
+ return t;
+ });
+ long warmupCheckInterval = 10L;
+ warmupCheckScheduler.scheduleAtFixedRate(() -> {
+ try {
+ // send check rpc to be, 10s check once
+ checkInflightWarmUpCacheAsync();
+ } catch (Throwable t) {
+ LOG.warn("unexpected error when checking inflight warm up
cache async", t);
+ }
+ }, warmupCheckInterval, warmupCheckInterval, TimeUnit.SECONDS);
+ LOG.info("Warmup executors initialized successfully");
+ }
+ }
+
private interface Operator {
void op(Database db, Table table, Partition partition,
MaterializedIndex index, String cluster);
}
@@ -212,6 +274,88 @@ public class CloudTabletRebalancer extends MasterDaemon {
BalanceType balanceType;
}
+ @Getter
+ private static class WarmupBatchKey {
+ private final long srcBe;
+ private final long destBe;
+
+ WarmupBatchKey(long srcBe, long destBe) {
+ this.srcBe = srcBe;
+ this.destBe = destBe;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof WarmupBatchKey)) {
+ return false;
+ }
+ WarmupBatchKey that = (WarmupBatchKey) o;
+ return srcBe == that.srcBe && destBe == that.destBe;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(srcBe, destBe);
+ }
+ }
+
+ private static class WarmupTabletTask {
+ private final Tablet pickedTablet;
+ private final long srcBe;
+ private final long destBe;
+ private final String clusterId;
+
+ WarmupTabletTask(Tablet pickedTablet, long srcBe, long destBe, String
clusterId) {
+ this.pickedTablet = pickedTablet;
+ this.srcBe = srcBe;
+ this.destBe = destBe;
+ this.clusterId = clusterId;
+ }
+ }
+
+ private static class WarmupBatch {
+ private final WarmupBatchKey key;
+ private final List<WarmupTabletTask> tasks = new ArrayList<>();
+ private long lastUpdateMs = System.currentTimeMillis();
+
+ WarmupBatch(WarmupBatchKey key) {
+ this.key = key;
+ }
+
+ synchronized List<WarmupTabletTask> addTask(WarmupTabletTask task, int
batchSize) {
+ tasks.add(task);
+ lastUpdateMs = System.currentTimeMillis();
+ if (tasks.size() >= batchSize) {
+ return drain();
+ }
+ return Collections.emptyList();
+ }
+
+ synchronized List<WarmupTabletTask> drainIfExpired(long
flushIntervalMs) {
+ if (tasks.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (System.currentTimeMillis() - lastUpdateMs >= flushIntervalMs) {
+ return drain();
+ }
+ return Collections.emptyList();
+ }
+
+ synchronized boolean isEmpty() {
+ return tasks.isEmpty();
+ }
+
+ private List<WarmupTabletTask> drain() {
+ List<WarmupTabletTask> copy = new ArrayList<>(tasks);
+ tasks.clear();
+ lastUpdateMs = System.currentTimeMillis();
+ return copy;
+ }
+ }
+
private class TransferPairInfo {
public long srcBe;
public long destBe;
@@ -260,7 +404,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
public int getTabletNumByBackendId(long beId) {
- Set<Tablet> tablets = beToTabletsGlobal.get(beId);
+ Map<Long, Set<Tablet>> sourceMap = beToTabletsGlobal;
+ ConcurrentHashMap<Long, Set<Tablet>> futureMap =
futureBeToTabletsGlobal;
+ if (futureMap != null && !futureMap.isEmpty()) {
+ sourceMap = futureMap;
+ }
+
+ Set<Tablet> tablets = sourceMap.get(beId);
Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
int tabletsSize = (tablets == null) ? 0 : tablets.size();
@@ -280,6 +430,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
// 9 check whether all tablets of decomission node have been migrated
@Override
protected void runAfterCatalogReady() {
+ // Initialize warmup executors when catalog is ready
+ initializeWarmupExecutorsIfNeeded();
+
if (Config.enable_cloud_multi_replica) {
LOG.info("Tablet balance is temporarily not supported when multi
replica enabled");
return;
@@ -294,7 +447,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
return;
}
- checkInflightWarmUpCacheAsync();
statRouteInfo();
migrateTabletsForSmoothUpgrade();
statRouteInfo();
@@ -728,42 +880,118 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
public void fillBeToTablets(long be, long tableId, long partId, long
indexId, Tablet tablet,
- Map<Long, Set<Tablet>> globalBeToTablets,
- Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
- Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
+ ConcurrentHashMap<Long, Set<Tablet>>
globalBeToTablets,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
+ partToTablets) {
// global
- globalBeToTablets.putIfAbsent(be, new HashSet<Tablet>());
+ globalBeToTablets.putIfAbsent(be, ConcurrentHashMap.newKeySet());
globalBeToTablets.get(be).add(tablet);
// table
- beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long,
Set<Tablet>>());
- Map<Long, Set<Tablet>> beToTabletsOfTable =
beToTabletsInTable.get(tableId);
- beToTabletsOfTable.putIfAbsent(be, new HashSet<Tablet>());
+ beToTabletsInTable.putIfAbsent(tableId, new ConcurrentHashMap<Long,
Set<Tablet>>());
+ ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfTable =
beToTabletsInTable.get(tableId);
+ beToTabletsOfTable.putIfAbsent(be, ConcurrentHashMap.newKeySet());
beToTabletsOfTable.get(be).add(tablet);
// partition
- partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long,
Set<Tablet>>>());
- Map<Long, Map<Long, Set<Tablet>>> indexToTablets =
partToTablets.get(partId);
- indexToTablets.putIfAbsent(indexId, new HashMap<Long, Set<Tablet>>());
- Map<Long, Set<Tablet>> beToTabletsOfIndex =
indexToTablets.get(indexId);
- beToTabletsOfIndex.putIfAbsent(be, new HashSet<Tablet>());
+ partToTablets.putIfAbsent(partId, new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>());
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
indexToTablets = partToTablets.get(partId);
+ indexToTablets.putIfAbsent(indexId, new ConcurrentHashMap<Long,
Set<Tablet>>());
+ ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfIndex =
indexToTablets.get(indexId);
+ beToTabletsOfIndex.putIfAbsent(be, ConcurrentHashMap.newKeySet());
beToTabletsOfIndex.get(be).add(tablet);
}
+ private void enqueueWarmupTask(WarmupTabletTask task) {
+ WarmupBatchKey key = new WarmupBatchKey(task.srcBe, task.destBe);
+ WarmupBatch batch = warmupBatches.computeIfAbsent(key,
WarmupBatch::new);
+ List<WarmupTabletTask> readyTasks = batch.addTask(task, Math.max(1,
Config.cloud_warm_up_batch_size));
+ if (!readyTasks.isEmpty()) {
+ dispatchWarmupBatch(key, readyTasks);
+ }
+ }
+
+ private void dispatchWarmupBatch(WarmupBatchKey key,
List<WarmupTabletTask> tasks) {
+ if (tasks.isEmpty()) {
+ return;
+ }
+ initializeWarmupExecutorsIfNeeded();
+ if (warmupRpcExecutor != null) {
+ warmupRpcExecutor.submit(() -> sendWarmupBatch(key, tasks));
+ } else {
+ LOG.warn("warmupRpcExecutor is not initialized, skip dispatching
warmup batch");
+ }
+ }
+
+ private void sendWarmupBatch(WarmupBatchKey key, List<WarmupTabletTask>
tasks) {
+ Backend srcBackend = cloudSystemInfoService.getBackend(key.getSrcBe());
+ Backend destBackend =
cloudSystemInfoService.getBackend(key.getDestBe());
+ if (srcBackend == null || destBackend == null ||
!destBackend.isAlive()) {
+ handleWarmupBatchFailure(tasks, new IllegalStateException(
+ String.format("backend missing or dead, src %s dest %s",
srcBackend, destBackend)));
+ return;
+ }
+ List<Long> tabletIds = tasks.stream().map(task ->
task.pickedTablet.getId()).collect(Collectors.toList());
+ try {
+ sendPreHeatingRpc(tabletIds, key.getSrcBe(), key.getDestBe());
+ } catch (Exception e) {
+ handleWarmupBatchFailure(tasks, e);
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("dispatch preheat batch {} from {} to {}, tablet num {}",
+ tabletIds, key.getSrcBe(), key.getDestBe(),
tabletIds.size());
+ }
+ }
+
+ private void handleWarmupBatchFailure(List<WarmupTabletTask> tasks,
Exception e) {
+ if (e != null) {
+ LOG.warn("preheat batch failed, size {}", tasks.size(), e);
+ }
+ for (WarmupTabletTask task : tasks) {
+ failedWarmupTasks.offer(task);
+ }
+ }
+
+ private void revertWarmupState(WarmupTabletTask task) {
+ updateBeToTablets(task.pickedTablet, task.destBe, task.srcBe,
+ futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
+ tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), task.clusterId));
+ }
+
+ private void processFailedWarmupTasks() {
+ WarmupTabletTask task;
+ while ((task = failedWarmupTasks.poll()) != null) {
+ revertWarmupState(task);
+ }
+ }
+
+ private void flushExpiredWarmupBatches() {
+ long flushInterval = Math.max(1L,
Config.cloud_warm_up_batch_flush_interval_ms);
+ for (Map.Entry<WarmupBatchKey, WarmupBatch> entry :
warmupBatches.entrySet()) {
+ List<WarmupTabletTask> readyTasks =
entry.getValue().drainIfExpired(flushInterval);
+ if (!readyTasks.isEmpty()) {
+ dispatchWarmupBatch(entry.getKey(), readyTasks);
+ }
+ }
+ }
+
public void statRouteInfo() {
ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Tablet>>();
+ ConcurrentHashMap<Long, Set<Tablet>> tmpFutureBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Tablet>>();
ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobalInSecondary
= new ConcurrentHashMap<Long, Set<Tablet>>();
ConcurrentHashMap<Long, Set<Tablet>> tmpBeToColocateTabletsGlobal
= new ConcurrentHashMap<Long, Set<Tablet>>();
- futureBeToTabletsGlobal = new HashMap<Long, Set<Tablet>>();
-
- partitionToTablets = new HashMap<Long, Map<Long, Map<Long,
Set<Tablet>>>>();
- futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long,
Set<Tablet>>>>();
+ partitionToTablets = new ConcurrentHashMap<Long,
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>();
+ futurePartitionToTablets =
+ new ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>();
- beToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
- futureBeToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
+ beToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>();
+ futureBeToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>();
loopCloudReplica((Database db, Table table, Partition partition,
MaterializedIndex index, String cluster) -> {
boolean isColocated =
Env.getCurrentColocateIndex().isColocateTable(table.getId());
@@ -806,12 +1034,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
tmpBeToTabletsGlobal, beToTabletsInTable,
this.partitionToTablets);
fillBeToTablets(futureBeId, table.getId(),
partition.getId(), index.getId(), tablet,
- futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
+ tmpFutureBeToTabletsGlobal,
futureBeToTabletsInTable, futurePartitionToTablets);
}
}
});
beToTabletsGlobal = tmpBeToTabletsGlobal;
+ futureBeToTabletsGlobal = tmpFutureBeToTabletsGlobal;
beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;
}
@@ -848,10 +1077,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void balanceInPartition(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
// balance all partition
- for (Map.Entry<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionEntry
: futurePartitionToTablets.entrySet()) {
- Map<Long, Map<Long, Set<Tablet>>> indexToTablets =
partitionEntry.getValue();
+ for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>>> partitionEntry
+ : futurePartitionToTablets.entrySet()) {
+ Map<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets =
partitionEntry.getValue();
// balance all index of a partition
- for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry :
indexToTablets.entrySet()) {
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
indexToTablets.entrySet()) {
// balance a index
balanceImpl(bes, clusterId, entry.getValue(),
BalanceType.PARTITION, infos);
}
@@ -860,12 +1090,16 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void balanceInTable(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
// balance all tables
- for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry :
futureBeToTabletsInTable.entrySet()) {
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
futureBeToTabletsInTable.entrySet()) {
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE,
infos);
}
}
private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long
destBe) throws Exception {
+ sendPreHeatingRpc(Collections.singletonList(pickedTablet.getId()),
srcBe, destBe);
+ }
+
+ private void sendPreHeatingRpc(List<Long> tabletIds, long srcBe, long
destBe) throws Exception {
BackendService.Client client = null;
TNetworkAddress address = null;
Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
@@ -877,9 +1111,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
TWarmUpCacheAsyncRequest req = new TWarmUpCacheAsyncRequest();
req.setHost(srcBackend.getHost());
req.setBrpcPort(srcBackend.getBrpcPort());
- List<Long> tablets = new ArrayList<Long>();
- tablets.add(pickedTablet.getId());
- req.setTabletIds(tablets);
+ req.setTabletIds(new ArrayList<>(tabletIds));
TWarmUpCacheAsyncResponse result = client.warmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
LOG.warn("pre cache failed status {} {}",
result.getStatus().getStatusCode(),
@@ -995,9 +1227,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private void updateBeToTablets(Tablet pickedTablet, long srcBe, long
destBe,
- Map<Long, Set<Tablet>> globalBeToTablets,
- Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
- Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
+ ConcurrentHashMap<Long, Set<Tablet>>
globalBeToTablets,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, ConcurrentHashMap<Long,
+ Set<Tablet>>>> partToTablets) {
CloudReplica replica = (CloudReplica)
pickedTablet.getReplicas().get(0);
long tableId = replica.getTableId();
long partId = replica.getPartitionId();
@@ -1073,11 +1306,11 @@ public class CloudTabletRebalancer extends MasterDaemon
{
// Check if the backend is decommissioned
if (backend != null) {
- if (backend.isDecommissioning() && tabletNum > 0) {
+ if ((backend.isDecommissioning() ||
backend.isDecommissioned()) && tabletNum > 0) {
srcBe = be; // Mark as source if decommissioned and has
tablets
break; // Exit early if we found a decommissioned backend
}
- if (!backend.isDecommissioning() && tabletNum > maxTabletsNum)
{
+ if (!backend.isDecommissioning() &&
!backend.isDecommissioned() && tabletNum > maxTabletsNum) {
srcBe = be;
maxTabletsNum = tabletNum;
}
@@ -1089,24 +1322,38 @@ public class CloudTabletRebalancer extends MasterDaemon
{
}
private long findDestinationBackend(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets, long srcBe) {
- long destBe = -1;
long minTabletsNum = Long.MAX_VALUE;
+ List<Long> candidateBes = new ArrayList<>();
for (Long be : bes) {
long tabletNum = beToTablets.getOrDefault(be,
Collections.emptySet()).size();
Backend backend = cloudSystemInfoService.getBackend(be);
- if (backend != null && backend.isAlive() &&
!backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) {
+ if (backend != null && backend.isAlive() &&
!backend.isDecommissioning()
+ && !backend.isDecommissioned() &&
!backend.isSmoothUpgradeSrc()) {
if (tabletNum < minTabletsNum) {
- destBe = be;
+ // Found a BE with fewer tablets, reset candidates
minTabletsNum = tabletNum;
+ candidateBes.clear();
+ candidateBes.add(be);
+ } else if (tabletNum == minTabletsNum) {
+ // Found a BE with the same minimum tablet count, add to
candidates
+ candidateBes.add(be);
}
}
}
- return destBe;
+
+ if (candidateBes.isEmpty()) {
+ return -1;
+ }
+
+ // Shuffle candidates with the same tablet count for better load
balancing
+ Collections.shuffle(candidateBes, rand);
+ return candidateBes.get(0);
}
private boolean isTransferValid(long srcBe, long minTabletsNum, long
maxTabletsNum, long avgNum) {
- boolean srcDecommissioned =
cloudSystemInfoService.getBackend(srcBe).isDecommissioning();
+ boolean srcDecommissioned =
cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
+ || cloudSystemInfoService.getBackend(srcBe).isDecommissioned();
if (!srcDecommissioned) {
if ((maxTabletsNum < avgNum * (1 +
Config.cloud_rebalance_percent_threshold)
@@ -1119,9 +1366,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private boolean isConflict(long srcBe, long destBe, CloudReplica
cloudReplica, BalanceType balanceType,
- Map<Long, Map<Long, Map<Long, Set<Tablet>>>>
beToTabletsInParts,
- Map<Long, Map<Long, Set<Tablet>>>
beToTabletsInTables) {
- if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()) {
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ beToTabletsInParts,
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>> beToTabletsInTables) {
+ if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
+ ||
cloudSystemInfoService.getBackend(srcBe).isDecommissioned()) {
return false; // If source BE is decommissioned, no conflict
}
@@ -1135,8 +1384,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private boolean checkGlobalBalanceConflict(long srcBe, long destBe,
CloudReplica cloudReplica,
- Map<Long, Map<Long, Map<Long,
Set<Tablet>>>> beToTabletsInParts,
- Map<Long, Map<Long,
Set<Tablet>>> beToTabletsInTables) {
+ ConcurrentHashMap<Long,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ beToTabletsInParts,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>
+ beToTabletsInTables) {
long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica,
beToTabletsInParts);
long minBeSize = getTabletSizeInParts(destBe, cloudReplica,
beToTabletsInParts);
@@ -1151,7 +1403,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private boolean checkTableBalanceConflict(long srcBe, long destBe,
CloudReplica cloudReplica,
- Map<Long, Map<Long, Map<Long,
Set<Tablet>>>> beToTabletsInParts) {
+ ConcurrentHashMap<Long,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ beToTabletsInParts) {
long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica,
beToTabletsInParts);
long minBeSize = getTabletSizeInParts(destBe, cloudReplica,
beToTabletsInParts);
@@ -1159,15 +1413,29 @@ public class CloudTabletRebalancer extends MasterDaemon
{
}
private long getTabletSizeInParts(long beId, CloudReplica cloudReplica,
- Map<Long, Map<Long, Map<Long,
Set<Tablet>>>> beToTabletsInParts) {
- Set<Tablet> tablets =
beToTabletsInParts.get(cloudReplica.getPartitionId())
- .get(cloudReplica.getIndexId()).get(beId);
+ ConcurrentHashMap<Long,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ beToTabletsInParts) {
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
indexToTablets
+ = beToTabletsInParts.get(cloudReplica.getPartitionId());
+ if (indexToTablets == null) {
+ return 0;
+ }
+ ConcurrentHashMap<Long, Set<Tablet>> beToTablets =
indexToTablets.get(cloudReplica.getIndexId());
+ if (beToTablets == null) {
+ return 0;
+ }
+ Set<Tablet> tablets = beToTablets.get(beId);
return tablets == null ? 0 : tablets.size();
}
private long getTabletSizeInBes(long beId, CloudReplica cloudReplica,
- Map<Long, Map<Long, Set<Tablet>>>
beToTabletsInTables) {
- Set<Tablet> tablets =
beToTabletsInTables.get(cloudReplica.getTableId()).get(beId);
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTables) {
+ ConcurrentHashMap<Long, Set<Tablet>> beToTablets =
beToTabletsInTables.get(cloudReplica.getTableId());
+ if (beToTablets == null) {
+ return 0;
+ }
+ Set<Tablet> tablets = beToTablets.get(beId);
return tablets == null ? 0 : tablets.size();
}
@@ -1178,6 +1446,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
return;
}
+ processFailedWarmupTasks();
+
long totalTabletsNum = calculateTotalTablets(bes, beToTablets);
long beNum = countActiveBackends(bes);
@@ -1187,7 +1457,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
long avgNum = totalTabletsNum / beNum;
- long transferNum = calculateTransferNum(avgNum);
+ long transferNum = calculateTransferNum(avgNum, beNum);
BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
LOG.debug("balance type {}, be num {}, total tablets num {}, avg num
{}, transfer num {}",
@@ -1254,14 +1524,13 @@ public class CloudTabletRebalancer extends MasterDaemon
{
return bes.stream()
.filter(be -> {
Backend backend = cloudSystemInfoService.getBackend(be);
- return backend != null && !backend.isDecommissioning();
+ return backend != null && !backend.isDecommissioning() &&
!backend.isDecommissioned();
})
.count();
}
- private long calculateTransferNum(long avgNum) {
- return Math.max(Math.round(avgNum *
Config.cloud_balance_tablet_percent_per_run),
- Config.cloud_min_balance_tablet_num_per_run);
+ private long calculateTransferNum(long avgNum, long beNum) {
+ return Math.max(Math.round(avgNum *
Config.cloud_balance_tablet_percent_per_run), beNum);
}
private void updateBalanceStatus(BalanceType balanceType) {
@@ -1282,12 +1551,11 @@ public class CloudTabletRebalancer extends MasterDaemon
{
private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long
destBe, String clusterId,
BalanceType balanceType, Map<Long,
Set<Tablet>> beToTablets) {
- try {
- sendPreHeatingRpc(pickedTablet, srcBe, destBe);
- } catch (Exception e) {
- LOG.warn("Failed to preheat tablet {} from {} to {}, "
- + "help msg change fe config
cloud_warm_up_for_rebalance_type to without_warmup ",
- pickedTablet.getId(), srcBe, destBe, e);
+ Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
+ Backend destBackend = cloudSystemInfoService.getBackend(destBe);
+ if (srcBackend == null || destBackend == null) {
+ LOG.warn("backend missing when preheating tablet {} from {} to {},
cluster {}",
+ pickedTablet.getId(), srcBe, destBe, clusterId);
return;
}
@@ -1298,16 +1566,18 @@ public class CloudTabletRebalancer extends MasterDaemon
{
task.balanceType = balanceType;
task.beToTablets = beToTablets;
task.startTimestamp = System.currentTimeMillis() / 1000;
- tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(),
clusterId), task);
+ InfightTablet key = new InfightTablet(pickedTablet.getId(), clusterId);
- LOG.info("pre cache {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
+ tabletToInfightTask.put(key, task);
updateBeToTablets(pickedTablet, srcBe, destBe,
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
+ LOG.debug("pre cache {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
+ enqueueWarmupTask(new WarmupTabletTask(pickedTablet, srcBe, destBe,
clusterId));
}
private void transferTablet(Tablet pickedTablet, long srcBe, long destBe,
String clusterId,
BalanceType balanceType,
List<UpdateCloudReplicaInfo> infos) {
- LOG.info("transfer {} from {} to {}, cluster {}, type {}",
+ LOG.debug("transfer {} from {} to {}, cluster {}, type {}",
pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
updateBeToTablets(pickedTablet, srcBe, destBe,
beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
index 5f2a80274e5..0a466e73f5f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
@@ -120,7 +120,7 @@ public class CloudUpgradeMgr extends MasterDaemon {
txnIds.offer(new DbWithWaterTxn(dbid, nextTransactionId));
}
txnBePairList.offer(new DbWithWaterTxnInBe(txnIds, be));
- LOG.info("register watershedtxnid {} for BE {}", txnIds.stream()
+ LOG.debug("register watershedtxnid {} for BE {}", txnIds.stream()
.map(dbWithWaterTxn -> "(" + dbWithWaterTxn.dbId + ":" +
dbWithWaterTxn.txnId + ")")
.collect((Collectors.joining(", ", "{", "}"))), be);
}
diff --git
a/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
b/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
index 1c8874864c0..4a9b77b5491 100644
--- a/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy
@@ -90,7 +90,7 @@ suite('test_expanding_node_balance', 'docker') {
}
docker(clusterOptions[0]) {
- def command = 'admin set frontend
config("cloud_min_balance_tablet_num_per_run"="16");'
+ def command = 'select 1';
// assert < 300s
testCase(command, 300)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]