This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eaa09d165e [improve](task) Support splitting agent batch tasks 
automatically (#42703)
5eaa09d165e is described below

commit 5eaa09d165e5d36bd6bf5e52275dbbfa5644c94d
Author: walter <w41te...@gmail.com>
AuthorDate: Wed Oct 30 11:10:14 2024 +0800

    [improve](task) Support splitting agent batch tasks automatically (#42703)
---
 .../main/java/org/apache/doris/common/Config.java  |  7 +++++
 .../java/org/apache/doris/backup/BackupJob.java    |  9 +++---
 .../java/org/apache/doris/backup/RestoreJob.java   | 14 +++++-----
 .../java/org/apache/doris/task/AgentBatchTask.java | 32 ++++++++++++++++++----
 4 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 97bcb39403a..b017935021b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2709,6 +2709,13 @@ public class Config extends ConfigBase {
     })
     public static int restore_download_task_num_per_be = 3;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
+            "The max number of batched tasks per RPC assigned to each be 
during the backup/restore process, "
+            + "the default value is 10000."
+    })
+    public static int backup_restore_batch_task_num_per_rpc = 10000;
+
     @ConfField(description = {"是否开启通过http接口获取log文件的功能",
             "Whether to enable the function of getting log files through http 
interface"})
     public static boolean enable_get_log_file_api = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 1ba9fb129b2..b14fae1ed3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -208,11 +208,10 @@ public class BackupJob extends AbstractJob {
                     task.getIndexId(), task.getTabletId(),
                     task.getVersion(),
                     task.getSchemaHash(), timeoutMs, false /* not restore task 
*/);
-            AgentBatchTask batchTask = new AgentBatchTask();
-            batchTask.addTask(newTask);
             unfinishedTaskIds.put(tablet.getId(), 
replica.getBackendIdWithoutException());
 
             //send task
+            AgentBatchTask batchTask = new AgentBatchTask(newTask);
             AgentTaskQueue.addTask(newTask);
             AgentTaskExecutor.submit(batchTask);
 
@@ -474,7 +473,7 @@ public class BackupJob extends AbstractJob {
         // copy all related schema at this moment
         List<Table> copiedTables = Lists.newArrayList();
         List<Resource> copiedResources = Lists.newArrayList();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (TableRef tableRef : tableRefs) {
             String tblName = tableRef.getName().getTbl();
             Table tbl = db.getTableNullable(tblName);
@@ -729,7 +728,7 @@ public class BackupJob extends AbstractJob {
             beToSnapshots.put(info.getBeId(), info);
         }
 
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (Long beId : beToSnapshots.keySet()) {
             List<SnapshotInfo> infos = beToSnapshots.get(beId);
             int totalNum = infos.size();
@@ -892,7 +891,7 @@ public class BackupJob extends AbstractJob {
         }
         // we do not care about the release snapshot tasks' success or failure,
         // the GC thread on BE will sweep the snapshot, finally.
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (SnapshotInfo info : snapshotInfos.values()) {
             ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, 
info.getBeId(), info.getDbId(),
                     info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 7549bfb4b44..bf7a787ead4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -894,7 +894,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
 
                 AgentBatchTask batchTask = 
batchTaskPerTable.get(localTbl.getId());
                 if (batchTask == null) {
-                    batchTask = new AgentBatchTask();
+                    batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
                     batchTaskPerTable.put(localTbl.getId(), batchTask);
                 }
                 createReplicas(db, batchTask, localTbl, restorePart);
@@ -910,7 +910,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                     for (Partition restorePart : 
restoreOlapTable.getPartitions()) {
                         AgentBatchTask batchTask = 
batchTaskPerTable.get(restoreTbl.getId());
                         if (batchTask == null) {
-                            batchTask = new AgentBatchTask();
+                            batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
                             batchTaskPerTable.put(restoreTbl.getId(), 
batchTask);
                         }
                         createReplicas(db, batchTask, restoreOlapTable, 
restorePart, tabletBases);
@@ -1167,7 +1167,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         taskProgress.clear();
         taskErrMsg.clear();
         Multimap<Long, Long> bePathsMap = HashMultimap.create();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         db.readLock();
         try {
             for (Map.Entry<IdChain, IdChain> entry : 
fileMapping.getMapping().entrySet()) {
@@ -1652,7 +1652,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (long dbId : dbToSnapshotInfos.keySet()) {
             List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
 
@@ -1812,7 +1812,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (long dbId : dbToSnapshotInfos.keySet()) {
             List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
 
@@ -1992,7 +1992,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         // tablet id->(be id -> download info)
         for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
             SnapshotInfo info = cell.getValue();
@@ -2184,7 +2184,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         }
         // we do not care about the release snapshot tasks' success or failure,
         // the GC thread on BE will sweep the snapshot, finally.
-        AgentBatchTask batchTask = new AgentBatchTask();
+        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
         for (SnapshotInfo info : snapshotInfos.values()) {
             ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, 
info.getBeId(), info.getDbId(),
                     info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index bf73f9b83fe..be698776cac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -55,6 +55,7 @@ import org.apache.doris.thrift.TVisibleVersionReq;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -67,6 +68,8 @@ import java.util.Map;
 public class AgentBatchTask implements Runnable {
     private static final Logger LOG = 
LogManager.getLogger(AgentBatchTask.class);
 
+    private int batchSize = Integer.MAX_VALUE;
+
     // backendId -> AgentTask List
     private Map<Long, List<AgentTask>> backendIdToTasks;
 
@@ -74,6 +77,12 @@ public class AgentBatchTask implements Runnable {
         this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
     }
 
+    public AgentBatchTask(int batchSize) {
+        this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
+        this.batchSize = batchSize;
+        assert batchSize > 0;
+    }
+
     public AgentBatchTask(AgentTask singleTask) {
         this();
         addTask(singleTask);
@@ -172,14 +181,12 @@ public class AgentBatchTask implements Runnable {
                 List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
                 for (AgentTask task : tasks) {
                     agentTaskRequests.add(toAgentTaskRequest(task));
-                }
-                client.submitTasks(agentTaskRequests);
-                if (LOG.isDebugEnabled()) {
-                    for (AgentTask task : tasks) {
-                        LOG.debug("send task: type[{}], backend[{}], 
signature[{}]",
-                                task.getTaskType(), backendId, 
task.getSignature());
+                    if (agentTaskRequests.size() >= batchSize) {
+                        submitTasks(backendId, client, agentTaskRequests);
+                        agentTaskRequests.clear();
                     }
                 }
+                submitTasks(backendId, client, agentTaskRequests);
                 ok = true;
             } catch (Exception e) {
                 LOG.warn("task exec error. backend[{}]", backendId, e);
@@ -198,6 +205,19 @@ public class AgentBatchTask implements Runnable {
         } // end for backend
     }
 
+    private static void submitTasks(long backendId,
+            BackendService.Client client, List<TAgentTaskRequest> 
agentTaskRequests) throws TException {
+        if (!agentTaskRequests.isEmpty()) {
+            client.submitTasks(agentTaskRequests);
+        }
+        if (LOG.isDebugEnabled()) {
+            for (TAgentTaskRequest req : agentTaskRequests) {
+                LOG.debug("send task: type[{}], backend[{}], signature[{}]",
+                        req.getTaskType(), backendId, req.getSignature());
+            }
+        }
+    }
+
     private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
         TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
         tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to