This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8ff77c95096 [improve](restore) Split batch creating replica task by table id #42235 (#42342) 8ff77c95096 is described below commit 8ff77c950968cb701244c50529c5092e414027dc Author: walter <w41te...@gmail.com> AuthorDate: Thu Oct 24 16:20:19 2024 +0800 [improve](restore) Split batch creating replica task by table id #42235 (#42342) cherry pick from #42235 --- .../java/org/apache/doris/backup/RestoreJob.java | 36 ++++++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 1a937c58a0f..67120534f46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -635,7 +635,7 @@ public class RestoreJob extends AbstractJob { Map<Long, TabletRef> tabletBases = new HashMap<>(); // Check and prepare meta objects. - AgentBatchTask batchTask = new AgentBatchTask(); + Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>(); db.readLock(); try { for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry : jobInfo.backupOlapTableObjects.entrySet()) { @@ -871,6 +871,11 @@ public class RestoreJob extends AbstractJob { BackupPartitionInfo backupPartitionInfo = jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName()); + AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId()); + if (batchTask == null) { + batchTask = new AgentBatchTask(); + batchTaskPerTable.put(localTbl.getId(), batchTask); + } createReplicas(db, batchTask, localTbl, restorePart); genFileMapping(localTbl, restorePart, remoteTbl.getId(), backupPartitionInfo, @@ -882,6 +887,11 @@ public class RestoreJob extends AbstractJob { if (restoreTbl.getType() == TableType.OLAP) { OlapTable restoreOlapTable = (OlapTable) restoreTbl; for (Partition restorePart : restoreOlapTable.getPartitions()) { + AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId()); + if (batchTask == null) { + batchTask = new AgentBatchTask(); + batchTaskPerTable.put(restoreTbl.getId(), batchTask); + } createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases); BackupOlapTableInfo backupOlapTableInfo = jobInfo.getOlapTableInfo(restoreOlapTable.getName()); genFileMapping(restoreOlapTable, restorePart, backupOlapTableInfo.id, @@ -919,20 +929,26 @@ public class RestoreJob extends AbstractJob { // Send create replica task to BE outside the db lock boolean ok = false; - MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum()); - if (batchTask.getTaskNum() > 0) { - for (AgentTask task : batchTask.getAllTasks()) { - latch.addMark(task.getBackendId(), task.getTabletId()); - ((CreateReplicaTask) task).setLatch(latch); - AgentTaskQueue.addTask(task); + int numBatchTasks = batchTaskPerTable.values() + .stream() + .mapToInt(AgentBatchTask::getTaskNum) + .sum(); + MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(numBatchTasks); + if (batchTaskPerTable.size() > 0) { + for (AgentBatchTask batchTask : batchTaskPerTable.values()) { + for (AgentTask task : batchTask.getAllTasks()) { + latch.addMark(task.getBackendId(), task.getTabletId()); + ((CreateReplicaTask) task).setLatch(latch); + AgentTaskQueue.addTask(task); + } + AgentTaskExecutor.submit(batchTask); } - AgentTaskExecutor.submit(batchTask); // estimate timeout - long timeout = DbUtil.getCreateReplicasTimeoutMs(batchTask.getTaskNum()); + long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks); try { LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. timeout: {}", - batchTask.getTaskNum(), timeout); + numBatchTasks, timeout); ok = latch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.warn("InterruptedException: ", e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org