This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 8eb7e3b058a [improve](task) Support splitting agent batch tasks 
automatically #42703 (#42987)
8eb7e3b058a is described below

commit 8eb7e3b058a3e82aae1b50289f29ac155bb3646b
Author: walter <w41te...@gmail.com>
AuthorDate: Thu Oct 31 21:14:09 2024 +0800

    [improve](task) Support splitting agent batch tasks automatically #42703 
(#42987)
    
    cherry pick from #42703
---
 .../main/java/org/apache/doris/common/Config.java  |  7 +++++
 .../java/org/apache/doris/backup/BackupJob.java    |  9 +++---
 .../java/org/apache/doris/backup/RestoreJob.java   | 14 +++++-----
 .../java/org/apache/doris/task/AgentBatchTask.java | 32 ++++++++++++++++++----
 4 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 509a266365c..8aedeb091f3 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2418,6 +2418,13 @@ public class Config extends ConfigBase {
     })
     public static int restore_download_task_num_per_be = 3;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
+            "The max number of batched tasks per RPC assigned to each be 
during the backup/restore process, "
+            + "the default value is 10000."
+    })
+    public static int backup_restore_batch_task_num_per_rpc = 10000;
+
     @ConfField(description = {"是否开启通过http接口获取log文件的功能",
             "Whether to enable the function of getting log files through http 
interface"})
     public static boolean enable_get_log_file_api = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 9ee4f6d0603..4315ad8ee4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -209,11 +209,10 @@ public class BackupJob extends AbstractJob {
                     task.getIndexId(), task.getTabletId(),
                     task.getVersion(),
                     task.getSchemaHash(), timeoutMs, false /* not restore task 
*/);
-            AgentBatchTask batchTask = new AgentBatchTask();
-            batchTask.addTask(newTask);
             unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
 
             //send task
+            AgentBatchTask batchTask = new AgentBatchTask(newTask);
             AgentTaskQueue.addTask(newTask);
             AgentTaskExecutor.submit(batchTask);
 
@@ -447,7 +446,7 @@ public class BackupJob extends AbstractJob {
         // copy all related schema at this moment
         List<Table> copiedTables = Lists.newArrayList();
         List<Resource> copiedResources = Lists.newArrayList();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (TableRef tableRef : tableRefs) {
             String tblName = tableRef.getName().getTbl();
             Table tbl = db.getTableNullable(tblName);
@@ -695,7 +694,7 @@ public class BackupJob extends AbstractJob {
             beToSnapshots.put(info.getBeId(), info);
         }
 
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (Long beId : beToSnapshots.keySet()) {
             List<SnapshotInfo> infos = beToSnapshots.get(beId);
             int totalNum = infos.size();
@@ -851,7 +850,7 @@ public class BackupJob extends AbstractJob {
         }
         // we do not care about the release snapshot tasks' success or failure,
         // the GC thread on BE will sweep the snapshot, finally.
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (SnapshotInfo info : snapshotInfos.values()) {
             ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, 
info.getBeId(), info.getDbId(),
                     info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 173af1c25f3..fb96fdbc0bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -859,7 +859,7 @@ public class RestoreJob extends AbstractJob {
 
                 AgentBatchTask batchTask = 
batchTaskPerTable.get(localTbl.getId());
                 if (batchTask == null) {
-                    batchTask = new AgentBatchTask();
+                    batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
                     batchTaskPerTable.put(localTbl.getId(), batchTask);
                 }
                 createReplicas(db, batchTask, localTbl, restorePart);
@@ -875,7 +875,7 @@ public class RestoreJob extends AbstractJob {
                     for (Partition restorePart : 
restoreOlapTable.getPartitions()) {
                         AgentBatchTask batchTask = 
batchTaskPerTable.get(restoreTbl.getId());
                         if (batchTask == null) {
-                            batchTask = new AgentBatchTask();
+                            batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
                             batchTaskPerTable.put(restoreTbl.getId(), 
batchTask);
                         }
                         createReplicas(db, batchTask, restoreOlapTable, 
restorePart, tabletBases);
@@ -1128,7 +1128,7 @@ public class RestoreJob extends AbstractJob {
         taskProgress.clear();
         taskErrMsg.clear();
         Multimap<Long, Long> bePathsMap = HashMultimap.create();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         db.readLock();
         try {
             for (Map.Entry<IdChain, IdChain> entry : 
fileMapping.getMapping().entrySet()) {
@@ -1592,7 +1592,7 @@ public class RestoreJob extends AbstractJob {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (long dbId : dbToSnapshotInfos.keySet()) {
             List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
 
@@ -1745,7 +1745,7 @@ public class RestoreJob extends AbstractJob {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (long dbId : dbToSnapshotInfos.keySet()) {
             List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
 
@@ -1922,7 +1922,7 @@ public class RestoreJob extends AbstractJob {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         // tablet id->(be id -> download info)
         for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
             SnapshotInfo info = cell.getValue();
@@ -2111,7 +2111,7 @@ public class RestoreJob extends AbstractJob {
         }
         // we do not care about the release snapshot tasks' success or failure,
         // the GC thread on BE will sweep the snapshot, finally.
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (SnapshotInfo info : snapshotInfos.values()) {
             ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, 
info.getBeId(), info.getDbId(),
                     info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 848211f9413..b1839400d31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -51,6 +51,7 @@ import org.apache.doris.thrift.TUploadReq;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -63,6 +64,8 @@ import java.util.Map;
 public class AgentBatchTask implements Runnable {
     private static final Logger LOG = 
LogManager.getLogger(AgentBatchTask.class);
 
+    private int batchSize = Integer.MAX_VALUE;
+
     // backendId -> AgentTask List
     private Map<Long, List<AgentTask>> backendIdToTasks;
 
@@ -70,6 +73,12 @@ public class AgentBatchTask implements Runnable {
         this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
     }
 
+    public AgentBatchTask(int batchSize) {
+        this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
+        this.batchSize = batchSize;
+        assert batchSize > 0;
+    }
+
     public AgentBatchTask(AgentTask singleTask) {
         this();
         addTask(singleTask);
@@ -168,14 +177,12 @@ public class AgentBatchTask implements Runnable {
                 List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
                 for (AgentTask task : tasks) {
                     agentTaskRequests.add(toAgentTaskRequest(task));
-                }
-                client.submitTasks(agentTaskRequests);
-                if (LOG.isDebugEnabled()) {
-                    for (AgentTask task : tasks) {
-                        LOG.debug("send task: type[{}], backend[{}], 
signature[{}]",
-                                task.getTaskType(), backendId, 
task.getSignature());
+                    if (agentTaskRequests.size() >= batchSize) {
+                        submitTasks(backendId, client, agentTaskRequests);
+                        agentTaskRequests.clear();
                     }
                 }
+                submitTasks(backendId, client, agentTaskRequests);
                 ok = true;
             } catch (Exception e) {
                 LOG.warn("task exec error. backend[{}]", backendId, e);
@@ -194,6 +201,19 @@ public class AgentBatchTask implements Runnable {
         } // end for backend
     }
 
+    private static void submitTasks(long backendId,
+            BackendService.Client client, List<TAgentTaskRequest> 
agentTaskRequests) throws TException {
+        if (!agentTaskRequests.isEmpty()) {
+            client.submitTasks(agentTaskRequests);
+        }
+        if (LOG.isDebugEnabled()) {
+            for (TAgentTaskRequest req : agentTaskRequests) {
+                LOG.debug("send task: type[{}], backend[{}], signature[{}]",
+                        req.getTaskType(), backendId, req.getSignature());
+            }
+        }
+    }
+
     private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
         TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
         tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to