This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8ff77c95096 [improve](restore) Split batch creating replica task by 
table id #42235 (#42342)
8ff77c95096 is described below

commit 8ff77c950968cb701244c50529c5092e414027dc
Author: walter <w41te...@gmail.com>
AuthorDate: Thu Oct 24 16:20:19 2024 +0800

    [improve](restore) Split batch creating replica task by table id #42235 
(#42342)
    
    cherry pick from #42235
---
 .../java/org/apache/doris/backup/RestoreJob.java   | 36 ++++++++++++++++------
 1 file changed, 26 insertions(+), 10 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 1a937c58a0f..67120534f46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -635,7 +635,7 @@ public class RestoreJob extends AbstractJob {
         Map<Long, TabletRef> tabletBases = new HashMap<>();
 
         // Check and prepare meta objects.
-        AgentBatchTask batchTask = new AgentBatchTask();
+        Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
         db.readLock();
         try {
             for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry : 
jobInfo.backupOlapTableObjects.entrySet()) {
@@ -871,6 +871,11 @@ public class RestoreJob extends AbstractJob {
                 BackupPartitionInfo backupPartitionInfo
                         = 
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
 
+                AgentBatchTask batchTask = 
batchTaskPerTable.get(localTbl.getId());
+                if (batchTask == null) {
+                    batchTask = new AgentBatchTask();
+                    batchTaskPerTable.put(localTbl.getId(), batchTask);
+                }
                 createReplicas(db, batchTask, localTbl, restorePart);
 
                 genFileMapping(localTbl, restorePart, remoteTbl.getId(), 
backupPartitionInfo,
@@ -882,6 +887,11 @@ public class RestoreJob extends AbstractJob {
                 if (restoreTbl.getType() == TableType.OLAP) {
                     OlapTable restoreOlapTable = (OlapTable) restoreTbl;
                     for (Partition restorePart : 
restoreOlapTable.getPartitions()) {
+                        AgentBatchTask batchTask = 
batchTaskPerTable.get(restoreTbl.getId());
+                        if (batchTask == null) {
+                            batchTask = new AgentBatchTask();
+                            batchTaskPerTable.put(restoreTbl.getId(), 
batchTask);
+                        }
                         createReplicas(db, batchTask, restoreOlapTable, 
restorePart, tabletBases);
                         BackupOlapTableInfo backupOlapTableInfo = 
jobInfo.getOlapTableInfo(restoreOlapTable.getName());
                         genFileMapping(restoreOlapTable, restorePart, 
backupOlapTableInfo.id,
@@ -919,20 +929,26 @@ public class RestoreJob extends AbstractJob {
 
         // Send create replica task to BE outside the db lock
         boolean ok = false;
-        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
-        if (batchTask.getTaskNum() > 0) {
-            for (AgentTask task : batchTask.getAllTasks()) {
-                latch.addMark(task.getBackendId(), task.getTabletId());
-                ((CreateReplicaTask) task).setLatch(latch);
-                AgentTaskQueue.addTask(task);
+        int numBatchTasks = batchTaskPerTable.values()
+                .stream()
+                .mapToInt(AgentBatchTask::getTaskNum)
+                .sum();
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(numBatchTasks);
+        if (batchTaskPerTable.size() > 0) {
+            for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
+                for (AgentTask task : batchTask.getAllTasks()) {
+                    latch.addMark(task.getBackendId(), task.getTabletId());
+                    ((CreateReplicaTask) task).setLatch(latch);
+                    AgentTaskQueue.addTask(task);
+                }
+                AgentTaskExecutor.submit(batchTask);
             }
-            AgentTaskExecutor.submit(batchTask);
 
             // estimate timeout
-            long timeout = 
DbUtil.getCreateReplicasTimeoutMs(batchTask.getTaskNum());
+            long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks);
             try {
                 LOG.info("begin to send create replica tasks to BE for 
restore. total {} tasks. timeout: {}",
-                        batchTask.getTaskNum(), timeout);
+                        numBatchTasks, timeout);
                 ok = latch.await(timeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 LOG.warn("InterruptedException: ", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to