This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5eaa09d165e [improve](task) Support splitting agent batch tasks automatically (#42703) 5eaa09d165e is described below commit 5eaa09d165e5d36bd6bf5e52275dbbfa5644c94d Author: walter <w41te...@gmail.com> AuthorDate: Wed Oct 30 11:10:14 2024 +0800 [improve](task) Support splitting agent batch tasks automatically (#42703) --- .../main/java/org/apache/doris/common/Config.java | 7 +++++ .../java/org/apache/doris/backup/BackupJob.java | 9 +++--- .../java/org/apache/doris/backup/RestoreJob.java | 14 +++++----- .../java/org/apache/doris/task/AgentBatchTask.java | 32 ++++++++++++++++++---- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 97bcb39403a..b017935021b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2709,6 +2709,13 @@ public class Config extends ConfigBase { }) public static int restore_download_task_num_per_be = 3; + @ConfField(mutable = true, masterOnly = true, description = { + "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。", + "The max number of batched tasks per RPC assigned to each be during the backup/restore process, " + + "the default value is 10000." + }) + public static int backup_restore_batch_task_num_per_rpc = 10000; + @ConfField(description = {"是否开启通过http接口获取log文件的功能", "Whether to enable the function of getting log files through http interface"}) public static boolean enable_get_log_file_api = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 1ba9fb129b2..b14fae1ed3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -208,11 +208,10 @@ public class BackupJob extends AbstractJob { task.getIndexId(), task.getTabletId(), task.getVersion(), task.getSchemaHash(), timeoutMs, false /* not restore task */); - AgentBatchTask batchTask = new AgentBatchTask(); - batchTask.addTask(newTask); unfinishedTaskIds.put(tablet.getId(), replica.getBackendIdWithoutException()); //send task + AgentBatchTask batchTask = new AgentBatchTask(newTask); AgentTaskQueue.addTask(newTask); AgentTaskExecutor.submit(batchTask); @@ -474,7 +473,7 @@ public class BackupJob extends AbstractJob { // copy all related schema at this moment List<Table> copiedTables = Lists.newArrayList(); List<Resource> copiedResources = Lists.newArrayList(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (TableRef tableRef : tableRefs) { String tblName = tableRef.getName().getTbl(); Table tbl = db.getTableNullable(tblName); @@ -729,7 +728,7 @@ public class BackupJob extends AbstractJob { beToSnapshots.put(info.getBeId(), info); } - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (Long beId : beToSnapshots.keySet()) { List<SnapshotInfo> infos = beToSnapshots.get(beId); int totalNum = infos.size(); @@ -892,7 +891,7 @@ public class BackupJob extends AbstractJob { } // we do not care about the release snapshot tasks' success or failure, // the GC thread on BE will sweep the snapshot, finally. - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (SnapshotInfo info : snapshotInfos.values()) { ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(), info.getTabletId(), info.getPath()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 7549bfb4b44..bf7a787ead4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -894,7 +894,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId()); if (batchTask == null) { - batchTask = new AgentBatchTask(); + batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); batchTaskPerTable.put(localTbl.getId(), batchTask); } createReplicas(db, batchTask, localTbl, restorePart); @@ -910,7 +910,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { for (Partition restorePart : restoreOlapTable.getPartitions()) { AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId()); if (batchTask == null) { - batchTask = new AgentBatchTask(); + batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); batchTaskPerTable.put(restoreTbl.getId(), batchTask); } createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases); @@ -1167,7 +1167,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { taskProgress.clear(); taskErrMsg.clear(); Multimap<Long, Long> bePathsMap = HashMultimap.create(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); db.readLock(); try { for (Map.Entry<IdChain, IdChain> entry : fileMapping.getMapping().entrySet()) { @@ -1652,7 +1652,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (long dbId : dbToSnapshotInfos.keySet()) { List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId); @@ -1812,7 +1812,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (long dbId : dbToSnapshotInfos.keySet()) { List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId); @@ -1992,7 +1992,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); // tablet id->(be id -> download info) for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) { SnapshotInfo info = cell.getValue(); @@ -2184,7 +2184,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { } // we do not care about the release snapshot tasks' success or failure, // the GC thread on BE will sweep the snapshot, finally. - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (SnapshotInfo info : snapshotInfos.values()) { ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(), info.getTabletId(), info.getPath()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index bf73f9b83fe..be698776cac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -55,6 +55,7 @@ import org.apache.doris.thrift.TVisibleVersionReq; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.util.HashMap; import java.util.LinkedList; @@ -67,6 +68,8 @@ import java.util.Map; public class AgentBatchTask implements Runnable { private static final Logger LOG = LogManager.getLogger(AgentBatchTask.class); + private int batchSize = Integer.MAX_VALUE; + // backendId -> AgentTask List private Map<Long, List<AgentTask>> backendIdToTasks; @@ -74,6 +77,12 @@ public class AgentBatchTask implements Runnable { this.backendIdToTasks = new HashMap<Long, List<AgentTask>>(); } + public AgentBatchTask(int batchSize) { + this.backendIdToTasks = new HashMap<Long, List<AgentTask>>(); + this.batchSize = batchSize; + assert batchSize > 0; + } + public AgentBatchTask(AgentTask singleTask) { this(); addTask(singleTask); @@ -172,14 +181,12 @@ public class AgentBatchTask implements Runnable { List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); for (AgentTask task : tasks) { agentTaskRequests.add(toAgentTaskRequest(task)); - } - client.submitTasks(agentTaskRequests); - if (LOG.isDebugEnabled()) { - for (AgentTask task : tasks) { - LOG.debug("send task: type[{}], backend[{}], signature[{}]", - task.getTaskType(), backendId, task.getSignature()); + if (agentTaskRequests.size() >= batchSize) { + submitTasks(backendId, client, agentTaskRequests); + agentTaskRequests.clear(); } } + submitTasks(backendId, client, agentTaskRequests); ok = true; } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backendId, e); @@ -198,6 +205,19 @@ public class AgentBatchTask implements Runnable { } // end for backend } + private static void submitTasks(long backendId, + BackendService.Client client, List<TAgentTaskRequest> agentTaskRequests) throws TException { + if (!agentTaskRequests.isEmpty()) { + client.submitTasks(agentTaskRequests); + } + if (LOG.isDebugEnabled()) { + for (TAgentTaskRequest req : agentTaskRequests) { + LOG.debug("send task: type[{}], backend[{}], signature[{}]", + req.getTaskType(), backendId, req.getSignature()); + } + } + } + private TAgentTaskRequest toAgentTaskRequest(AgentTask task) { TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest(); tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org