This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new c1f9bfd73cc branch-3.0: [fix](agent) cancel agent task when it is rejected by agent-task-pool #51138 (#51211) c1f9bfd73cc is described below commit c1f9bfd73cc17b9c52ba5ba249e79e2f79740745 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Sun May 25 19:18:59 2025 +0800 branch-3.0: [fix](agent) cancel agent task when it is rejected by agent-task-pool #51138 (#51211) Cherry-picked from #51138 Co-authored-by: walter <maoch...@selectdb.com> --- .../java/org/apache/doris/master/ReportHandler.java | 10 +--------- .../main/java/org/apache/doris/task/AgentTask.java | 11 +++++++++++ .../java/org/apache/doris/task/AgentTaskExecutor.java | 19 ++++++++++++++++--- .../java/org/apache/doris/task/AgentTaskQueue.java | 13 +++++++++++++ 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 14bc9839b7e..60dcb3097a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -642,14 +642,7 @@ public class ReportHandler extends Daemon { AgentBatchTask batchTask = new AgentBatchTask(Config.report_resend_batch_task_num_per_rpc); long taskReportTime = System.currentTimeMillis(); for (AgentTask task : diffTasks) { - // these tasks no need to do diff - // 1. CREATE - // 2. SYNC DELETE - // 3. CHECK_CONSISTENCY - // 4. STORAGE_MDEIUM_MIGRATE - if (task.getTaskType() == TTaskType.CREATE - || task.getTaskType() == TTaskType.CHECK_CONSISTENCY - || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) { + if (!task.isNeedResendType()) { continue; } @@ -658,7 +651,6 @@ public class ReportHandler extends Daemon { MetricRepo.COUNTER_AGENT_TASK_RESEND_TOTAL.getOrAdd(task.getTaskType().toString()).increase(1L); batchTask.addTask(task); } - } if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java index f29ab3ef2b4..b5cc8fc2085 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java @@ -145,6 +145,17 @@ public abstract class AgentTask { return isFinished; } + public boolean isNeedResendType() { + // these tasks no need to do diff + // 1. CREATE + // 2. SYNC DELETE + // 3. CHECK_CONSISTENCY + // 4. STORAGE_MEDIUM_MIGRATE + return !(taskType == TTaskType.CREATE + || taskType == TTaskType.CHECK_CONSISTENCY + || taskType == TTaskType.STORAGE_MEDIUM_MIGRATE); + } + public boolean shouldResend(long currentTimeMillis) { return createTime == -1 || currentTimeMillis - createTime > Config.agent_task_resend_wait_time_ms; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java index 8297ef2fff8..c8f5c977848 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java @@ -21,21 +21,34 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; public class AgentTaskExecutor { - private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool( + private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( Config.max_agent_task_threads_num, "agent-task-pool", true); public AgentTaskExecutor() { - } public static void submit(AgentBatchTask task) { if (task == null) { return; } - EXECUTOR.submit(task); + try { + EXECUTOR.submit(task); + } catch (RejectedExecutionException e) { + String msg = "Task is rejected, because the agent-task-pool is full, " + + "consider increasing the max_agent_task_threads_num config"; + for (AgentTask t : task.getAllTasks()) { + // Skip the task if it is a resend type and already exists in the queue, because it will be + // re-submit to the executor later. + if (t.isNeedResendType() && AgentTaskQueue.contains(t)) { + continue; + } + t.failedWithMsg(msg); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java index bd68d87f191..97e1a3cc676 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java @@ -132,6 +132,19 @@ public class AgentTaskQueue { } } + public static synchronized boolean contains(AgentTask task) { + long backendId = task.getBackendId(); + TTaskType type = task.getTaskType(); + long signature = task.getSignature(); + + if (!tasks.contains(backendId, type)) { + return false; + } + + Map<Long, AgentTask> signatureMap = tasks.get(backendId, type); + return signatureMap.containsKey(signature); + } + public static synchronized AgentTask getTask(long backendId, TTaskType type, long signature) { if (!tasks.contains(backendId, type)) { return null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org