freemandealer opened a new issue, #48959: URL: https://github.com/apache/doris/issues/48959
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Description ## 背景知识 ### 预热功能介绍 在存算分离架构中,我们引入了 file cache 作为数据本地的缓存来减少查询数据直接访问 S3 的次数。在冷启动时,用户可以通过下面的预热语句,将对应的数据预热到本地: ``` warm up cluster cluster_name1 with cluster cluster_name0 warm up cluster cluster_name1 with table customer warm up cluster cluster_name1 with table customer partition p1 SHOW WARM UP JOB; // 获取 Job 信息 SHOW WARM UP JOB WHERE ID = 13418; // 指定 JobID cancel warm up job where id = 13418; ``` 具体参看:https://doris.apache.org/zh-CN/docs/3.0/sql-manual/sql-statements/cluster-management/storage-management/WARM-UP/ ### 预热任务创建 stmt -> CloudWarmUpJob ```java CloudWarmUpJob.JobType jobType = stmt.isWarmUpWithTable() ? JobType.TABLE : JobType.CLUSTER; CloudWarmUpJob warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType); addCloudWarmUpJob(warmUpJob); ``` 其中 beToTabletIdBatches: fe 元数据 获取 table、partition、tablet 这些信息,然后从 snapshot 中`Env.getCurrentEnv()).getCloudTabletRebalancer().getSnapshotTabletsByBeId` 获取 beid,构造 be -> tablet 的映射 每个 be 所有的需要预热的数据切分成 10G 的 batch 去做。切分是为了做 checkpoint 避免失败重传。具体的切分算法: ``` // simplified from:doris/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java splitBatch(Map<Long/*BE_ID*/, List<Tablet>> beToWarmUpTablets) { Map<Long/*BE_ID*/, List<List<TabletID>> /*Batch list, each batch is a tablet list*/> beToTabletIdBatches = new HashMap<>(); for each BE { List<List<TabletID>> batches = new ArrayList<>(); List<TabletID> batch = new ArrayList<>(); Long curBatchSize = 0L; for each tablet in this BE { if (curBatchSize + tablet.getDataSize(true) > 10G) { // finish this batch and start new batch batches.add(batch); batch = new ArrayList<>(); curBatchSize = 0L; } // otherwise, add this tablet to existing batch batch.add(tablet.getId()); curBatchSize += tablet.getDataSize(true); } beToTabletIdBatches.put(this_BE_id, batches); } return beToTabletIdBatches; } ``` 如果 目标集群没有足够的容量? set Force (bug) 失败或者只会cache 容量允许的部分 不知道什么原因,限制每个集群只能执行一个 warmup任务,如果已经有了就不能再创建额外的了。 所有 fe 的同时最高只有 10 个 warmup 任务,超过的可以加入但需要排队。 ### 预热调度执行 CloudWarmUpJob 执行过程:  1. fe -> all be: SET_JOB 2. fe -> all be: GET_CURRENT_JOB_STATE_AND_LEASE 3. be -> fe: status & pending_job_size 4. fe 轮询 GET_CURRENT_JOB_STATE_AND_LEASE 知道所有 be 上一轮 job 都完成了再进入下面的步骤 5. fe batch ++, Edit log 持久化,下发下一轮 batch -> all be: SET_BATCH,重复上面的 步骤 2 - 4 6. fe 直到所有 be 的所有 batch 清空, fe -> all be CLEAR_JOB 7. fe Edit Log 记录完成状态,cluster 可以接受新 warmup job BE 接受和处理 job 的过程 (RPC处理入口在CloudBackendService::warm_up_tablets ,大部分调用 CloudWarmupManager 来实现): - SET_JOB: 没什么太多内容,就是设置一下CloudWarmupManager 的 cur_job_id - SET_BATCH: 设置 CloudWarmupManage current job 的 current batch。这里可能出现的情况: - job id 为 0:之前 SET_JOB 可能没成功,再 set 一遍 - job id 不对:不理会新 batch - job id 对但是上一个 batch 还没执行完:不理会新 batch - 重复 batch:不理会 - job id 对 + 上一个 batch 做完了 + 新 batch id 情况(预期正确的情况),就会把这个 batch 任务加进 CloudWarmupManager 并有独立的线程通过 batch 中携带的 tablet 信息来把所有 segment 数据从 remote (S3) 下载到本地,完成对这批 tablet 的预热。 - GET_CURRENT_JOB_STATE_AND_LEASE - CLEAR_JOB 所以这里多个 BE 是同步做 batch 的,即本轮 batch 都做完了, fe 才会往所有 be 发下一轮 batch。这样的设计和实现都很简单,但先做完的 be 会等待其余 be 做完本轮 batch,才会一起开始下一轮。 fe 和 多个 be 的互动模型:  ### 任务进度信息维护 主要任务进度信息维护在 CacheHotspotManager 中,更具体是在 CloudWarmUpJob 这个类中,并在 handleShowCloudWarmUpJob 逻辑中展示。其中 CloudWarmUpJob::getJobInfo 核心逻辑如下: ``` info.add(Long.toString(lastBatchId + 1)); // 获得上一个完成的 batch long maxBatchSize = 0; for (List<List<Long>> list : beToTabletIdBatches.values()) { long size = list.size(); if (size > maxBatchSize) { maxBatchSize = size; // 获取所有 be 中 batch 数目最大的那个作为总 batch } } info.add(Long.toString(maxBatchSize)); ``` 这里获取所有 be 中 batch 数目最大的那个作为总 batch (fe 认知的 batch 数量) 是因为从每个 be 视角看 batch 数量可能不一样,所以取最大的那个。为什么会不一样? 原因有如下几点: - be 间 tablet 数量可能不均衡 - tablet 数量均衡了数据量也可能不完全一样 - 哪怕上述两者都完全一致,因为 tablet 个体大小的差异,上文中描述的 splitBatch 分离算法也会造成 batch 数量的不同,例如: - BE1 上三个 tablet 分别是 4G 6G 9G,那么按照每个 batch 不超过 10G 会划分为 (4G 6G) (9G) 2个 batch - BE2 上三个 tablet 分别是 4G 8G 6G,那么按照每个 batch 不超过 10 会划分为 (4G) (8G) (6G) 3 个 batch ## 问题引入 通过上面的梳理,这里有两个坑: 1. “任务进度信息维护” 一节提到的 划分 batch 的数量 不稳定 2. “调度执行” 一节提到的 先做完的 be 会等待其余 be 做完本轮 batch 这两个在单 tablet 数据量很小的时候 (很小指远小于 10G),不会有太多问题:划分基本是均匀的。但如果 tablet 数据量很大,负载相差很大,就容易出现下面这种调度。从图中可以看到 BE 有很多的等待,并没有很好地利用计算、存储、网络的资源,影响预热效率。  ## 修改方案 作为对比,我们希望的是更加紧凑的运行,这样即使 tablet 很大导致负载不均匀,也不会在预热过程中出现 BE 干等的情况 (任务最后可能会因为数据量的差异有长尾)。  只要我们一个 BE 完成了当前 batch,我们就写入 editlog 持久化这个 checkpoint,然后不做等待直接下发该 be 下一个 batch 直到该 BE 的 batch 全部完成。等到所有 BE 的所有 batch 都完成后,fe 判定预热任务成功结束。 这样紧凑的调度模型,fe 没有了 lastBatch 这个记录,改导致之前任务进度信息维护也需要统一修改。这个代价是值得的,因为暴露内部 Batch 概念给用户会增加用户的认知负担,而且用户难以从数据量直接推算出预期 Batch,因为Batch的大小受上面提到的均衡、splitbatch算法等影响,不完全与数据量挂钩 (不能简单地使用 batch数量 x be 数量 x 10G 来计算 )。所以这里可以一鼓作气,直接用目前已预热的数据量和总数据量来进行进度展示。 ## 异常处理 如果 FE 重启,能从 editlog 持久化的 checkpoint 恢复各个 BE 剩余预热任务的执行。 如果用户 CANCEL 预热任务,利用原来的逻辑应该也能正确处理: 各个 BE 会完成当前手头上的 batch,然后清理自身资源。 如果某个 BE 预热出错,将会自动执行 CANCEL 逻辑,逻辑同上。 ## 参考: 1 fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java 2 fe/fe-core/src/main/java/org/apache/doris/analysis/CancelCloudWarmUpStmt.java 3 fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java ### Solution _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org