This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c1f9bfd73cc branch-3.0: [fix](agent) cancel agent task when it is 
rejected by agent-task-pool #51138 (#51211)
c1f9bfd73cc is described below

commit c1f9bfd73cc17b9c52ba5ba249e79e2f79740745
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun May 25 19:18:59 2025 +0800

    branch-3.0: [fix](agent) cancel agent task when it is rejected by 
agent-task-pool #51138 (#51211)
    
    Cherry-picked from #51138
    
    Co-authored-by: walter <maoch...@selectdb.com>
---
 .../java/org/apache/doris/master/ReportHandler.java   | 10 +---------
 .../main/java/org/apache/doris/task/AgentTask.java    | 11 +++++++++++
 .../java/org/apache/doris/task/AgentTaskExecutor.java | 19 ++++++++++++++++---
 .../java/org/apache/doris/task/AgentTaskQueue.java    | 13 +++++++++++++
 4 files changed, 41 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 14bc9839b7e..60dcb3097a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -642,14 +642,7 @@ public class ReportHandler extends Daemon {
         AgentBatchTask batchTask = new 
AgentBatchTask(Config.report_resend_batch_task_num_per_rpc);
         long taskReportTime = System.currentTimeMillis();
         for (AgentTask task : diffTasks) {
-            // these tasks no need to do diff
-            // 1. CREATE
-            // 2. SYNC DELETE
-            // 3. CHECK_CONSISTENCY
-            // 4. STORAGE_MDEIUM_MIGRATE
-            if (task.getTaskType() == TTaskType.CREATE
-                    || task.getTaskType() == TTaskType.CHECK_CONSISTENCY
-                    || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) 
{
+            if (!task.isNeedResendType()) {
                 continue;
             }
 
@@ -658,7 +651,6 @@ public class ReportHandler extends Daemon {
                 
MetricRepo.COUNTER_AGENT_TASK_RESEND_TOTAL.getOrAdd(task.getTaskType().toString()).increase(1L);
                 batchTask.addTask(task);
             }
-
         }
 
         if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
index f29ab3ef2b4..b5cc8fc2085 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
@@ -145,6 +145,17 @@ public abstract class AgentTask {
         return isFinished;
     }
 
+    public boolean isNeedResendType() {
+        // these tasks no need to do diff
+        // 1. CREATE
+        // 2. SYNC DELETE
+        // 3. CHECK_CONSISTENCY
+        // 4. STORAGE_MEDIUM_MIGRATE
+        return !(taskType == TTaskType.CREATE
+                || taskType == TTaskType.CHECK_CONSISTENCY
+                || taskType == TTaskType.STORAGE_MEDIUM_MIGRATE);
+    }
+
     public boolean shouldResend(long currentTimeMillis) {
         return createTime == -1 || currentTimeMillis - createTime > 
Config.agent_task_resend_wait_time_ms;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
index 8297ef2fff8..c8f5c977848 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
@@ -21,21 +21,34 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 
 public class AgentTaskExecutor {
 
-    private static final ExecutorService EXECUTOR = 
ThreadPoolManager.newDaemonCacheThreadPool(
+    private static final ExecutorService EXECUTOR = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
             Config.max_agent_task_threads_num, "agent-task-pool", true);
 
     public AgentTaskExecutor() {
-
     }
 
     public static void submit(AgentBatchTask task) {
         if (task == null) {
             return;
         }
-        EXECUTOR.submit(task);
+        try {
+            EXECUTOR.submit(task);
+        } catch (RejectedExecutionException e) {
+            String msg = "Task is rejected, because the agent-task-pool is 
full, "
+                    + "consider increasing the max_agent_task_threads_num 
config";
+            for (AgentTask t : task.getAllTasks()) {
+                // Skip the task if it is a resend type and already exists in 
the queue, because it will be
+                // re-submit to the executor later.
+                if (t.isNeedResendType() && AgentTaskQueue.contains(t)) {
+                    continue;
+                }
+                t.failedWithMsg(msg);
+            }
+        }
     }
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index bd68d87f191..97e1a3cc676 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -132,6 +132,19 @@ public class AgentTaskQueue {
         }
     }
 
+    public static synchronized boolean contains(AgentTask task) {
+        long backendId = task.getBackendId();
+        TTaskType type = task.getTaskType();
+        long signature = task.getSignature();
+
+        if (!tasks.contains(backendId, type)) {
+            return false;
+        }
+
+        Map<Long, AgentTask> signatureMap = tasks.get(backendId, type);
+        return signatureMap.containsKey(signature);
+    }
+
     public static synchronized AgentTask getTask(long backendId, TTaskType 
type, long signature) {
         if (!tasks.contains(backendId, type)) {
             return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to